Вы также можете использовать метод compareTo()
для сравнения двух строк. Если результат compareTo равен 0, то две строки равны, в противном случае сравниваемые строки не равны.
==
сравнивает ссылки и не сравнивает фактические строки. Если вы создали каждую строку, используя new String(somestring).intern()
, вы можете использовать оператор ==
для сравнения двух строк, в противном случае могут использоваться только методы equals () или compareTo.
Вы не можете получить доступ к любым абстракциям «Исправлена» (RDDs, DataFrames, Datasets, SparkSession ...) Spark из функции, переданной в одну из преобразований DataFrame / RDD Spark. Вы также не можете обновлять измененные объекты на стороне драйвера из этих функций.
В вашем случае - вы пытаетесь использовать prodRows
и selection
(оба являются DataFrames) в функции, переданной в DataFrame.foreach
. Вы также пытаетесь обновить listOfProducts
(локальную драйверную переменную) из этой же функции.
Почему?
Как вы можете это решить? При работе с Spark, особенно с DataFrames, вы должны стараться избегать «итерации» над данными и вместо этого использовать декларативные операции DataFrame. В большинстве случаев, когда вы хотите ссылаться на данные другого DataFrame для каждой записи в вашем DataFrame, вы хотите использовать join
для создания нового DataFrame с записями, объединяющими данные из двух DataFrames.
В этом конкретном случае, это примерно эквивалентное решение, которое делает то, что вы пытаетесь сделать, если мне удастся это сделать правильно. Попытайтесь использовать это и прочитайте документацию DataFrame, чтобы выяснить детали:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
val numRecProducts = 10
val result = prodRows.as("left")
// self-join by gender:
.join(prodRows.as("right"), $"left.gender_PK" === $"right.gender_PK" || $"right.gender_PK" === "UNISEX")
// limit to 10 results per record:
.withColumn("rn", row_number().over(Window.partitionBy($"left.product_PK").orderBy($"right.product_PK")))
.filter($"rn" <= numRecProducts).drop($"rn")
// group and collect_list to create products column:
.groupBy($"left.product_PK" as "product_PK")
.agg(collect_list(struct($"right.product_PK", lit(1))) as "products")
Проблема в том, что вы пытаетесь получить доступ к prodRows
изнутри prodRows.foreach
. Вы не можете использовать фреймворк данных в рамках преобразования, в драйвере существуют только фреймы данных.
product_pk
(каждая строка DataFrame) мне нужно назначить подмножество соответствующих product_pks
. Этот поднабор вычисляется с использованием where
и передачи параметров фактической строки.
– Markus
17 November 2017 в 21:51
WHERE (gender_PK = 'WOMEN' OR gender_PK = 'UNISEX') AND family_PK = 'A' AND left.product_PK != right.product_PK
– Markus 17 November 2017 в 22:21