Это пример:
for(var i=0; i<=3; i++) {
window['p'+i] = "hello " + i;
}
alert(p0); // hello 0
alert(p1); // hello 1
alert(p2); // hello 2
alert(p3); // hello 3
Другой пример:
var myVariable = 'coco';
window[myVariable] = 'riko';
alert(coco); // display : riko
Итак, значение « coco » myVariable становится переменной coco.
Поскольку все переменные глобальной области действия являются свойствами объекта Window.
Ваши таблицы 1-1 или 1-много? Если они один ко многим, то ваши объединения приведут к появлению гораздо большего количества строк, чем вы, вероятно, хотите. Если это так, один из вариантов - сначала выполнить groupBy для каждой таблицы, к которой вы собираетесь присоединиться. Рассмотрите этот пример:
val df1 = Seq(1, 2).toDF("id")
val df2 = Seq(
(1, "a", true),
(1, "b", false),
(2, "c", true)
).toDF("id", "C2", "B2")
val df3 = Seq(
(1, "x", false),
(1, "y", true),
(2, "z", false)
).toDF("id", "C3", "B3")
// Left outer join without accounting for 1-Many relationship. Results in cartesian
// joining on each ID value!
df1.
join(df2, Seq("id"), "left_outer").
join(df3, Seq("id"), "left_outer").show()
+---+---+-----+---+-----+
| id| C2| B2| C3| B3|
+---+---+-----+---+-----+
| 1| b|false| y| true|
| 1| b|false| x|false|
| 1| a| true| y| true|
| 1| a| true| x|false|
| 2| c| true| z|false|
+---+---+-----+---+-----+
В качестве альтернативы, если вы группируете строки перед объединением, чтобы ваши отношения всегда 1-1, вы не получите добавляемых записей
val df2Grouped = df2.groupBy("id").agg(collect_list(struct($"C2", $"B2")) as "df2")
val df3Grouped = df3.groupBy("id").agg(collect_list(struct($"C3", $"B3")) as "df3")
val result = df1.
join(df2Grouped, Seq("id"), "left_outer").
join(df3Grouped, Seq("id"), "left_outer")
result.printSchema
result.show(10, false)
scala> result.printSchema
root
|-- id: integer (nullable = false)
|-- df2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- C2: string (nullable = true)
| | |-- B2: boolean (nullable = false)
|-- df3: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- C3: string (nullable = true)
| | |-- B3: boolean (nullable = false)
scala> result.show(10, false)
+---+-----------------------+-----------------------+
|id |df2 |df3 |
+---+-----------------------+-----------------------+
|1 |[[a, true], [b, false]]|[[x, false], [y, true]]|
|2 |[[c, true]] |[[z, false]] |
+---+-----------------------+-----------------------+
так случилось, что базовые данные в корзине s3, которые я использую для создания фрейма данных, содержат несколько папок, и я фильтровал определенную папку как часть моего фильтра. пример: spark.read.parquet (s3 bucket) .filter ('folder_name = "val"). Похоже, что спарк загружает все данные из корзины s3 в память исполнителя, а затем запускает фильтр. Вот почему это был взрыв, когда та же логика, что и запрос улья, выполняемый для внешней таблицы улья, указывающего на местоположение s3 с папкой в виде столбца раздела, работала просто отлично. Мне пришлось удалить фильтр и прочитать конкретную папку, чтобы решить проблему .. spark.read.parquet (s3 bucket / folder = value) ..
У меня было похожее состояние, когда у меня было несколько объединений, и в конце мне пришлось записать окончательный кадр данных в таблицу HDFS / Hive (формат Parquet).
Spark работает над механизмом Lazy Execution, который означает, что когда ваш 53-й кадр данных активирован (Сохранить / записать как паркет), Spark затем возвращается ко всем соединениям и выполняет их, что вызывает огромную перетасовку данных и, в конечном итоге, вашу работу. контейнеры дают сбой и выбрасывают ошибки памяти.
Предложение: Вы можете сначала записать каждый присоединенный кадр данных в HDFS. Я хочу сказать, что после того, как вы объединили 2 (может быть больше 2, но ограничьте их) данных, запишите объединенный кадр данных в HDFS / Hive и используйте select * 'hive parquet table
val refinedDF1 = df1.join(df2 ,condition,'join_type')
refinedDF1.write.parquet("location") or refinedDF1.write.mode("overwrite").saveAsTable("dbname.refine1")
val refinedDF1 = hc.sql("select * from dbname.refine1")
val refinedDF2 = refinedDF1.join(df3)
refinedDF2.write.parquet("location") or refinedDF1.write.mode("overwrite").saveAsTable("dbname.refine2")
val refinedDF2 = hc.sql("select * from dbname.refine2")
Теперь вы часто записываете свои соединения в hdfs, это означает, что искра не должна будет выполнять их при вызове окончательного соединения, она будет использовать только вывод 52-го соединения что вы сохранили в виде таблицы.
С использованием этой методики мой сценарий сократился с 22 часов (включая ошибки памяти контейнера) до 15–30 (без исключения памяти / ошибок).
Несколько советов:
1) Исключите записи, в которых ваше объединение key
имеет значение null, spark не дает хорошей производительности по сравнению с объединениями, имеющими условие null = null
, поэтому удалите их перед объединением с фреймами данных
3) После выполнения скрипта вам придется очистить промежуточные кадры данных, которые вы сохраняете в Hive / Hdfs.