создать фрейм данных в цикле foreach apache spark [duplicate]

Вы также можете использовать метод compareTo() для сравнения двух строк. Если результат compareTo равен 0, то две строки равны, в противном случае сравниваемые строки не равны.

== сравнивает ссылки и не сравнивает фактические строки. Если вы создали каждую строку, используя new String(somestring).intern(), вы можете использовать оператор == для сравнения двух строк, в противном случае могут использоваться только методы equals () или compareTo.

1
задан Markus 17 November 2017 в 20:43
поделиться

2 ответа

Вы не можете получить доступ к любым абстракциям «Исправлена» (RDDs, DataFrames, Datasets, SparkSession ...) Spark из функции, переданной в одну из преобразований DataFrame / RDD Spark. Вы также не можете обновлять измененные объекты на стороне драйвера из этих функций.

В вашем случае - вы пытаетесь использовать prodRows и selection (оба являются DataFrames) в функции, переданной в DataFrame.foreach. Вы также пытаетесь обновить listOfProducts (локальную драйверную переменную) из этой же функции.

Почему?

  • DataFrames, RDD и SparkSession существуют только в приложении Driver. Они служат «ручкой» для доступа к данным, распределенным по кластеру рабочих машин.
  • Функции, переданные в преобразования RDD / DataFrame, получают serialized и отправляются в этот кластер, которые должны выполняться на разделах данных на каждой рабочей машине. Когда сериализованные DataFrames / RDD получаются десериализованными на этих машинах - они бесполезны, они не могут представлять данные в кластере, поскольку они представляют собой только полные копии тех, которые созданы в приложении драйвера, что фактически поддерживает соединение для кластерных машин
  • По той же причине попытка обновления драйверов переменных не будет выполнена: переменные (начинающиеся как пустые, в большинстве случаев) будут сериализованы, десериализованы на каждом из рабочие, обновляются локально на рабочих и остаются там ... исходная переменная со стороны водителя останется неизменной

Как вы можете это решить? При работе с Spark, особенно с DataFrames, вы должны стараться избегать «итерации» над данными и вместо этого использовать декларативные операции DataFrame. В большинстве случаев, когда вы хотите ссылаться на данные другого DataFrame для каждой записи в вашем DataFrame, вы хотите использовать join для создания нового DataFrame с записями, объединяющими данные из двух DataFrames.

В этом конкретном случае, это примерно эквивалентное решение, которое делает то, что вы пытаетесь сделать, если мне удастся это сделать правильно. Попытайтесь использовать это и прочитайте документацию DataFrame, чтобы выяснить детали:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._

val numRecProducts = 10

val result = prodRows.as("left")
  // self-join by gender:
  .join(prodRows.as("right"), $"left.gender_PK" === $"right.gender_PK" || $"right.gender_PK" === "UNISEX")
  // limit to 10 results per record:
  .withColumn("rn", row_number().over(Window.partitionBy($"left.product_PK").orderBy($"right.product_PK")))
  .filter($"rn" <= numRecProducts).drop($"rn")
  // group and collect_list to create products column:
  .groupBy($"left.product_PK" as "product_PK")
  .agg(collect_list(struct($"right.product_PK", lit(1))) as "products")
6
ответ дан Tzach Zohar 16 August 2018 в 04:43
поделиться
  • 1
    К сожалению, добавлен импорт. – Tzach Zohar 17 November 2017 в 21:57
  • 2
    Можно ли использовать этот подход с более сложным предложением where? например WHERE (gender_PK = 'WOMEN' OR gender_PK = 'UNISEX') AND family_PK = 'A' AND left.product_PK != right.product_PK – Markus 17 November 2017 в 22:21
  • 3
    Да, это - как и любое предложение ON в операторах SQL, это может быть любое условие, применяемое к двум сравниваемым записям (слева и справа). – Tzach Zohar 18 November 2017 в 15:40

Проблема в том, что вы пытаетесь получить доступ к prodRows изнутри prodRows.foreach. Вы не можете использовать фреймворк данных в рамках преобразования, в драйвере существуют только фреймы данных.

1
ответ дан Raphael Roth 16 August 2018 в 04:43
поделиться
  • 1
    Хорошо, но в этом случае я очень ценю, если вы можете предложить мне альтернативный подход. Для каждого product_pk (каждая строка DataFrame) мне нужно назначить подмножество соответствующих product_pks. Этот поднабор вычисляется с использованием where и передачи параметров фактической строки. – Markus 17 November 2017 в 21:51
Другие вопросы по тегам:

Похожие вопросы: