Другой способ - использовать протокол, который также позволяет вам возвращать Self
.
protocol StoryboardGeneratable {
}
extension UIViewController: StoryboardGeneratable {
}
extension StoryboardGeneratable where Self: UIViewController
{
static func initialize(storyboardName: String, storyboardId: String) -> Self
{
let storyboad = UIStoryboard(name: storyboardName, bundle: nil)
let controller = storyboad.instantiateViewController(withIdentifier: storyboardId) as! Self
return controller
}
}
Что вы пытаетесь написать UDAF (User Defined Aggregate Function), а не UDF (функция, определяемая пользователем). UDAF - это функции, которые работают с данными, сгруппированными по ключу. В частности, им нужно определить, как объединить несколько значений в группе в одном разделе, а затем как объединить результаты по разделам для ключа. В настоящее время в python нет возможности реализовать UDAF, они могут быть реализованы только в Scala.
Но вы можете обойти это в Python. Вы можете использовать набор сбора, чтобы собрать свои сгруппированные значения, а затем использовать обычный UDF для выполнения того, что вы хотите с ними. Единственное предостережение - collect_set работает только с примитивными значениями, поэтому вам нужно будет закодировать их до строки.
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, collect_list, concat_ws, udf
def myFunc(data_list):
for val in data_list:
b, c = data.split(',')
# do something
return <whatever>
myUdf = udf(myFunc, StringType())
df.withColumn('data', concat_ws(',', col('B'), col('C'))) \
.groupBy('A').agg(collect_list('data').alias('data'))
.withColumn('data', myUdf('data'))
Используйте команду collect_set, если вы хотите дедуплировать. Кроме того, если у вас много значений для некоторых ваших ключей, это будет медленным, потому что все значения для ключа нужно будет собирать в одном разделе где-то в вашем кластере. Если ваш конечный результат - это значение, которое вы создаете, комбинируя значения для ключа каким-либо образом (например, суммируя их), это может быть быстрее реализовать с помощью метода RDD aggregateByKey , который позволяет построить промежуточное значение для каждого ключа в разделе перед перемещением данных вокруг.
Я собираюсь расширить ответ.
Таким образом, вы можете реализовать такую же логику, как pandas.groupby (). применить в pyspark с помощью @pandas_udf и который является методом векторизации и быстрее, чем простой udf.
from pyspark.sql.functions import pandas_udf,PandasUDFType
df3 = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)
from pyspark.sql.types import *
schema = StructType([
StructField("key", StringType()),
StructField("avg_value1", DoubleType()),
StructField("avg_value2", DoubleType()),
StructField("sum_avg", DoubleType()),
StructField("sub_avg", DoubleType())
])
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
gr = df['key'].iloc[0]
x = df.value1.mean()
y = df.value2.mean()
w = df.value1.mean() + df.value2.mean()
z = df.value1.mean() - df.value2.mean()
return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])
df3.groupby("key").apply(g).show()
Вы получите результат ниже:
+---+----------+----------+-------+-------+
|key|avg_value1|avg_value2|sum_avg|sub_avg|
+---+----------+----------+-------+-------+
| b| 6.5| -1.5| 5.0| 8.0|
| a| 0.0| 21.0| 21.0| -21.0|
+---+----------+----------+-------+-------+
Итак, вы можете делать больше вычислений между другими полями в сгруппированных данных и добавлять их в dataframe в формате списка.
Начиная с Spark 2.3 (теперь в разработке) вы можете использовать pandas_udf
. Групповые агрегатные варианты берут функцию, которая отображает из Pandas DataFrame
той же формы, что и вход, на выход DataFrame
. Например, если данные выглядят следующим образом:
df = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)
, и вы хотите вычислить среднее значение попарного минимума между value1
value2
, вам необходимо определить схему вывода:
from pyspark.sql.types import *
schema = StructType([
StructField("key", StringType()),
StructField("avg_min", DoubleType())
])
pandas_udf
:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
result = pd.DataFrame(df.groupby(df.key).apply(
lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean()
))
result.reset_index(inplace=True, drop=False)
return result
и примените его:
df.groupby("key").apply(g).show()
+---+-------+
|key|avg_min|
+---+-------+
| b| -1.5|
| a| -0.5|
+---+-------+
Исключая определение схемы и декоратор, ваш текущий код Pandas можно применять как есть .