спарк ETL с объединениями из нескольких источников

Это пример:

for(var i=0; i<=3; i++) {
    window['p'+i] = "hello " + i;
}

alert(p0); // hello 0
alert(p1); // hello 1
alert(p2); // hello 2
alert(p3); // hello 3

Другой пример:

var myVariable = 'coco';
window[myVariable] = 'riko';

alert(coco); // display : riko

Итак, значение « coco » myVariable становится переменной coco.

Поскольку все переменные глобальной области действия являются свойствами объекта Window.

0
задан BeeBeeGee 16 January 2019 в 16:52
поделиться

3 ответа

Ваши таблицы 1-1 или 1-много? Если они один ко многим, то ваши объединения приведут к появлению гораздо большего количества строк, чем вы, вероятно, хотите. Если это так, один из вариантов - сначала выполнить groupBy для каждой таблицы, к которой вы собираетесь присоединиться. Рассмотрите этот пример:

val df1 = Seq(1, 2).toDF("id")
val df2 = Seq(
  (1, "a", true),
  (1, "b", false),
  (2, "c", true)
).toDF("id", "C2", "B2")

val df3 = Seq(
  (1, "x", false),
  (1, "y", true),
  (2, "z", false)
).toDF("id", "C3", "B3")

// Left outer join without accounting for 1-Many relationship.  Results in cartesian
// joining on each ID value!
df1.
  join(df2, Seq("id"), "left_outer").
  join(df3, Seq("id"), "left_outer").show()

+---+---+-----+---+-----+
| id| C2|   B2| C3|   B3|
+---+---+-----+---+-----+
|  1|  b|false|  y| true|
|  1|  b|false|  x|false|
|  1|  a| true|  y| true|
|  1|  a| true|  x|false|
|  2|  c| true|  z|false|
+---+---+-----+---+-----+

В качестве альтернативы, если вы группируете строки перед объединением, чтобы ваши отношения всегда 1-1, вы не получите добавляемых записей

val df2Grouped = df2.groupBy("id").agg(collect_list(struct($"C2", $"B2")) as "df2")
val df3Grouped = df3.groupBy("id").agg(collect_list(struct($"C3", $"B3")) as "df3")

val result = df1.
  join(df2Grouped, Seq("id"), "left_outer").
  join(df3Grouped, Seq("id"), "left_outer")
result.printSchema
result.show(10, false)

scala> result.printSchema
root
 |-- id: integer (nullable = false)
 |-- df2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- C2: string (nullable = true)
 |    |    |-- B2: boolean (nullable = false)
 |-- df3: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- C3: string (nullable = true)
 |    |    |-- B3: boolean (nullable = false)


scala> result.show(10, false)
+---+-----------------------+-----------------------+
|id |df2                    |df3                    |
+---+-----------------------+-----------------------+
|1  |[[a, true], [b, false]]|[[x, false], [y, true]]|
|2  |[[c, true]]            |[[z, false]]           |
+---+-----------------------+-----------------------+
0
ответ дан Ryan Widmaier 16 January 2019 в 16:52
поделиться

так случилось, что базовые данные в корзине s3, которые я использую для создания фрейма данных, содержат несколько папок, и я фильтровал определенную папку как часть моего фильтра. пример: spark.read.parquet (s3 bucket) .filter ('folder_name = "val"). Похоже, что спарк загружает все данные из корзины s3 в память исполнителя, а затем запускает фильтр. Вот почему это был взрыв, когда та же логика, что и запрос улья, выполняемый для внешней таблицы улья, указывающего на местоположение s3 с папкой в ​​виде столбца раздела, работала просто отлично. Мне пришлось удалить фильтр и прочитать конкретную папку, чтобы решить проблему .. spark.read.parquet (s3 bucket / folder = value) ..

0
ответ дан BeeBeeGee 16 January 2019 в 16:52
поделиться

У меня было похожее состояние, когда у меня было несколько объединений, и в конце мне пришлось записать окончательный кадр данных в таблицу HDFS / Hive (формат Parquet).

Spark работает над механизмом Lazy Execution, который означает, что когда ваш 53-й кадр данных активирован (Сохранить / записать как паркет), Spark затем возвращается ко всем соединениям и выполняет их, что вызывает огромную перетасовку данных и, в конечном итоге, вашу работу. контейнеры дают сбой и выбрасывают ошибки памяти.

Предложение: Вы можете сначала записать каждый присоединенный кадр данных в HDFS. Я хочу сказать, что после того, как вы объединили 2 (может быть больше 2, но ограничьте их) данных, запишите объединенный кадр данных в HDFS / Hive и используйте select * 'hive parquet table

val refinedDF1 = df1.join(df2 ,condition,'join_type')
refinedDF1.write.parquet("location") or refinedDF1.write.mode("overwrite").saveAsTable("dbname.refine1")
val refinedDF1 = hc.sql("select * from dbname.refine1")

val refinedDF2 = refinedDF1.join(df3)
refinedDF2.write.parquet("location") or refinedDF1.write.mode("overwrite").saveAsTable("dbname.refine2")
val refinedDF2 = hc.sql("select * from dbname.refine2")

Теперь вы часто записываете свои соединения в hdfs, это означает, что искра не должна будет выполнять их при вызове окончательного соединения, она будет использовать только вывод 52-го соединения что вы сохранили в виде таблицы.

С использованием этой методики мой сценарий сократился с 22 часов (включая ошибки памяти контейнера) до 15–30 (без исключения памяти / ошибок).

Несколько советов:

1) Исключите записи, в которых ваше объединение key имеет значение null, spark не дает хорошей производительности по сравнению с объединениями, имеющими условие null = null, поэтому удалите их перед объединением с фреймами данных

[ 1111] 2) Используйте широковещательные объединения, когда у вас осталось много строк, а правый - искомый или несколько строк.

3) После выполнения скрипта вам придется очистить промежуточные кадры данных, которые вы сохраняете в Hive / Hdfs.

0
ответ дан Shahab Niaz 16 January 2019 в 16:52
поделиться
Другие вопросы по тегам:

Похожие вопросы: