Пользовательские функции pyspark для группировки объектов [дубликат]

Другой способ - использовать протокол, который также позволяет вам возвращать Self.

protocol StoryboardGeneratable {

}

extension UIViewController: StoryboardGeneratable {

}

extension StoryboardGeneratable where Self: UIViewController
{
    static func initialize(storyboardName: String, storyboardId: String) -> Self
    {
        let storyboad = UIStoryboard(name: storyboardName, bundle: nil)
        let controller = storyboad.instantiateViewController(withIdentifier: storyboardId) as! Self
        return controller
    }
}
12
задан arosner09 12 October 2016 в 19:01
поделиться

3 ответа

Что вы пытаетесь написать UDAF (User Defined Aggregate Function), а не UDF (функция, определяемая пользователем). UDAF - это функции, которые работают с данными, сгруппированными по ключу. В частности, им нужно определить, как объединить несколько значений в группе в одном разделе, а затем как объединить результаты по разделам для ключа. В настоящее время в python нет возможности реализовать UDAF, они могут быть реализованы только в Scala.

Но вы можете обойти это в Python. Вы можете использовать набор сбора, чтобы собрать свои сгруппированные значения, а затем использовать обычный UDF для выполнения того, что вы хотите с ними. Единственное предостережение - collect_set работает только с примитивными значениями, поэтому вам нужно будет закодировать их до строки.

from pyspark.sql.types import StringType
from pyspark.sql.functions import col, collect_list, concat_ws, udf

def myFunc(data_list):
    for val in data_list:
        b, c = data.split(',')
        # do something

    return <whatever>

myUdf = udf(myFunc, StringType())

df.withColumn('data', concat_ws(',', col('B'), col('C'))) \
  .groupBy('A').agg(collect_list('data').alias('data'))
  .withColumn('data', myUdf('data'))

Используйте команду collect_set, если вы хотите дедуплировать. Кроме того, если у вас много значений для некоторых ваших ключей, это будет медленным, потому что все значения для ключа нужно будет собирать в одном разделе где-то в вашем кластере. Если ваш конечный результат - это значение, которое вы создаете, комбинируя значения для ключа каким-либо образом (например, суммируя их), это может быть быстрее реализовать с помощью метода RDD aggregateByKey , который позволяет построить промежуточное значение для каждого ключа в разделе перед перемещением данных вокруг.

23
ответ дан Fokko Driesprong 19 August 2018 в 05:03
поделиться

Я собираюсь расширить ответ.

Таким образом, вы можете реализовать такую ​​же логику, как pandas.groupby (). применить в pyspark с помощью @pandas_udf и который является методом векторизации и быстрее, чем простой udf.

from pyspark.sql.functions import pandas_udf,PandasUDFType

df3 = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_value1", DoubleType()),
    StructField("avg_value2", DoubleType()),
    StructField("sum_avg", DoubleType()),
    StructField("sub_avg", DoubleType())
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    gr = df['key'].iloc[0]
    x = df.value1.mean()
    y = df.value2.mean()
    w = df.value1.mean() + df.value2.mean()
    z = df.value1.mean() - df.value2.mean()
    return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])

df3.groupby("key").apply(g).show()

Вы получите результат ниже:

+---+----------+----------+-------+-------+
|key|avg_value1|avg_value2|sum_avg|sub_avg|
+---+----------+----------+-------+-------+
|  b|       6.5|      -1.5|    5.0|    8.0|
|  a|       0.0|      21.0|   21.0|  -21.0|
+---+----------+----------+-------+-------+

Итак, вы можете делать больше вычислений между другими полями в сгруппированных данных и добавлять их в dataframe в формате списка.

1
ответ дан Mayur Dangar 19 August 2018 в 05:03
поделиться

Начиная с Spark 2.3 (теперь в разработке) вы можете использовать pandas_udf. Групповые агрегатные варианты берут функцию, которая отображает из Pandas DataFrame той же формы, что и вход, на выход DataFrame. Например, если данные выглядят следующим образом:

df = spark.createDataFrame(
    [("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
    ("key", "value1", "value2")
)

, и вы хотите вычислить среднее значение попарного минимума между value1 value2, вам необходимо определить схему вывода:

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_min", DoubleType())
])

pandas_udf:

import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    result = pd.DataFrame(df.groupby(df.key).apply(
        lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean()
    ))
    result.reset_index(inplace=True, drop=False)
    return result

и примените его:

df.groupby("key").apply(g).show()
+---+-------+
|key|avg_min|
+---+-------+
|  b|   -1.5|
|  a|   -0.5|
+---+-------+

Исключая определение схемы и декоратор, ваш текущий код Pandas можно применять как есть .

10
ответ дан user8371915 19 August 2018 в 05:03
поделиться
Другие вопросы по тегам:

Похожие вопросы: