Вопрос о присоединении датафреймов в Spark

Вы добавляете «файл заголовка», который описывает интерфейс для модуля source1.cpp:

source1.h

#ifndef SOURCE1_H_
#define SOURCE1_H_

extern int global;

#endif

source2.h

#ifndef SOURCE2_H_
#define SOURCE2_H_

int function();

#endif

и добавьте оператор #include в каждый файл, который использует эту переменную, и (важный), который определяет переменную.

source1.cpp

#include "source1.h"
#include "source2.h"

int global;     

int main()     
{     
    global=42;     
    function();     
    return 0;     
}

source2.cpp

#include "source1.h"
#include "source2.h"

int function()            
{            
    if(global==42)            
        return 42;            
    return 0;            
}

Хотя это необязательно, я предлагаю имя source1.h для файла показать, что он описывает публичный интерфейс к модулю source1.cpp. Точно так же source2.h описывает, что доступно в source2.cpp.

4
задан thebluephantom 18 March 2019 в 21:03
поделиться

2 ответа

Думаю, удалось выяснить причину разного результата в Python и Scala.

Причина в оптимизации вещания. Если spark-shell запускается с отключенной трансляцией, Python и Scala работают одинаково.

./spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1

val df1 = Seq(
  (1, 1, 1)
).toDF("key1", "key2", "time").repartition(3, col("key1"), col("key2"))

val df2 = Seq(
  (1, 1, 1),
  (2, 2, 2)
).toDF("key1", "key2", "time").repartition(3, col("key1"), col("key2"))

val x = df1.join(df2, usingColumns = Seq("key1", "key2", "time"))

x.rdd.getNumPartitions == 200

Похоже, искра 2.4.0 не способна оптимизировать описанный случай из коробки, и необходимо расширение оптимизатора катализатора, как предложено @ user10938362.

Кстати. Вот информация о написании расширений оптимизатора катализатора https://developer.ibm.com/code/2017/11/30/learn-extension-points-apache-spark-extend-spark-catalyst-optimizer/

0
ответ дан Artem Bergkamp 18 March 2019 в 21:03
поделиться

Поведение Catalyst Optimizer различается для pyspark и Scala (по крайней мере, для Spark 2.4).

Я управлял обоими и получил два разных плана.

Действительно, вы получаете 200 разделов в pyspark, если вы не укажете явно для pyspark:

 spark.conf.set("spark.sql.shuffle.partitions", 3)

Затем обрабатываются 3 раздела, и, таким образом, 3 остаются в pyspark.

Немного удивлен, поскольку я думал, что под капотом это будет обычным делом. Так что люди продолжают говорить мне. Это просто показывает.

Физический план для pyspark с параметром, заданным через conf:

== Physical Plan ==
*(5) Project [key1#344L, key2#345L, time#346L]
+- SortMergeJoin [key1#344L, key2#345L, time#346L], [key1#350L, key2#351L, time#352L], LeftOuter
   :- *(2) Sort [key1#344L ASC NULLS FIRST, key2#345L ASC NULLS FIRST, time#346L ASC NULLS FIRST], false, 0
    :  +- Exchange hashpartitioning(key1#344L, key2#345L, time#346L, 3)
    :     +- *(1) Scan ExistingRDD[key1#344L,key2#345L,time#346L]
    +- *(4) Sort [key1#350L ASC NULLS FIRST, key2#351L ASC NULLS FIRST, time#352L ASC NULLS FIRST], false, 0
       +- Exchange hashpartitioning(key1#350L, key2#351L, time#352L, 3)
         +- *(3) Filter ((isnotnull(key1#350L) && isnotnull(key2#351L)) && isnotnull(time#352L))
             +- *(3) Scan ExistingRDD[key1#350L,key2#351L,time#352L]
0
ответ дан thebluephantom 18 March 2019 в 21:03
поделиться
Другие вопросы по тегам:

Похожие вопросы: