Самое чистое решение - передать дополнительные аргументы с помощью закрытия:
def make_topic_word(topic_words):
return udf(lambda c: label_maker_topic(c, topic_words))
df = sc.parallelize([(["union"], )]).toDF(["tokens"])
(df.withColumn("topics", make_topic_word(keyword_list)(col("tokens")))
.show())
Это не требует никаких изменений в keyword_list
или функции, которую вы обертываете с помощью UDF. Вы также можете использовать этот метод для передачи произвольного объекта. Это может быть использовано для передачи, например, списка sets
для эффективных поисков.
Если вы хотите использовать свой текущий UDF и передать topic_words
прямо, вам придется преобразовать его в литерал столбца first:
from pyspark.sql.functions import array, lit
ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list])
df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show()
В зависимости от ваших данных и требований могут быть альтернативные, более эффективные решения, которые не требуют UDF (взорвать + агрегат + сбой) или поисковые запросы (хеширование + векторные операции).
Следующее работает отлично, когда любой внешний параметр может быть передан в UDF (измененный код, чтобы помочь кому-либо)
topicWord=udf(lambda tkn: label_maker_topic(tkn,topic_words),StringType())
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token))
topic_words
в момент определения udf. Поэтому изменение topic_words
и повторное использование udf позже не сработает - оно будет по-прежнему использовать значение topic_words
во время определения udf.
– CHP
24 February 2018 в 01:13
udf
вmake_topic_word
– user2739472 28 June 2018 в 05:24