С DataFrames
и UDF:
from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf
zip_ = udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
# Adjust types to reflect data types
StructField("first", IntegerType()),
StructField("second", IntegerType())
]))
)
(df
.withColumn("tmp", zip_("b", "c"))
# UDF output cannot be directly passed to explode
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))
С RDDs
:
(df
.rdd
.flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
.toDF(["a", "b", "c", "d"]))
Оба решения неэффективны из-за накладных расходов на Python. Если размер данных исправлен, вы можете сделать что-то вроде этого:
from functools import reduce
from pyspark.sql import DataFrame
# Length of array
n = 3
# For legacy Python you'll need a separate function
# in place of method accessor
reduce(
DataFrame.unionAll,
(df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
for i in range(n))
).toDF("a", "b", "c", "d")
или даже:
from pyspark.sql.functions import array, struct
# SQL level zip of arrays of known size
# followed by explode
tmp = explode(array(*[
struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
for i in range(n)
]))
(df
.withColumn("tmp", tmp)
.select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))
Это должно быть значительно быстрее по сравнению с UDF или RDD. Обобщены для поддержки произвольного количества столбцов:
# This uses keyword only arguments
# If you use legacy Python you'll have to change signature
# Body of the function can stay the same
def zip_and_explode(*colnames, n):
return explode(array(*[
struct(*[col(c).getItem(i).alias(c) for c in colnames])
for i in range(n)
]))
df.withColumn("tmp", zip_and_explode("b", "c", n=3))
В принципе, поскольку [10..1]
переведен на enumFromTo 10 1
, который сам имеет семантику для создания списка, беря все элементы меньше 1
, которые являются результатом подсчета вверх (с шагом +1
) из ( в том числе) 10
.
В то время как [10, 9..1]
переведен на enumFromToThen 10 9 1
, который явно устанавливает размер шага счета как 9-10
, т.е. -1
(который жестко закодирован до +1
для enumFromTo
)
Более точная спецификация может быть найдена в отчете Haskell (6.3.4 Класс Enum):
enumFrom :: a -> [a] -- [n..]
enumFromThen :: a -> a -> [a] -- [n,n'..]
enumFromTo :: a -> a -> [a] -- [n..m]
enumFromThenTo :: a -> a -> a -> [a] -- [n,n'..m]
Для типов
Int
] иInteger
, функции перечисления имеют следующее значение:blockquote>
- Последовательность
enumFrom e1
представляет собой список[e1,e1+1,e1+2,...]
.- Последовательность
enumFromThen e1 e2
является list[e1,e1+i,e1+2i,...]
, где приращение i является e2-e1. Инкремент может быть нулем или отрицательным. Если приращение равно нулю, все элементы списка одинаковы.- Последовательность
enumFromTo e1 e3
- это список[e1,e1+1,e1+2,...e3]
. Список пуст, еслиe1 > e3
.- Последовательность
enumFromThenTo e1 e2 e3
- это список[e1,e1+i,e1+2i,...e3]
, где приращениеi
равноe2-e1
. Если приращение положительное или нулевое, список заканчивается, когда следующий элемент будет больше, чемe3
; список пуст, еслиe1 > e3
. Если приращение отрицательное, список заканчивается, когда следующий элемент будет меньшеe3
; список пуст, еслиe1 < e3
.
Если вы хотите сгенерировать список из a
в b
независимо от того, a < b
, вы можете использовать следующее:
[a, a + (signum $ b - a)..b]
Нотация арифметической последовательности - это просто синтаксический сахар для функций из класса Enum
.
[a..] = enumFrom a
[a..b] = enumFromTo a b
[a, b..] = enumFromThen a b
[a, b..c] = enumFromThenTo a b c
Что касается того, почему они не были определены для автоматического обращения, я могу только предположим, но вот несколько возможных причин:
a
и b
определены в другом месте, было бы сложнее сказать с первого взгляда, в каком направлении [a..b]
будет идти.