Вы добавляете «файл заголовка», который описывает интерфейс для модуля source1.cpp:
source1.h
#ifndef SOURCE1_H_
#define SOURCE1_H_
extern int global;
#endif
source2.h
#ifndef SOURCE2_H_
#define SOURCE2_H_
int function();
#endif
и добавьте оператор #include в каждый файл, который использует эту переменную, и (важный), который определяет переменную.
source1.cpp
#include "source1.h"
#include "source2.h"
int global;
int main()
{
global=42;
function();
return 0;
}
source2.cpp
#include "source1.h"
#include "source2.h"
int function()
{
if(global==42)
return 42;
return 0;
}
Хотя это необязательно, я предлагаю имя source1.h для файла показать, что он описывает публичный интерфейс к модулю source1.cpp. Точно так же source2.h описывает, что доступно в source2.cpp.
Думаю, удалось выяснить причину разного результата в Python и Scala.
Причина в оптимизации вещания. Если spark-shell запускается с отключенной трансляцией, Python и Scala работают одинаково.
./spark-shell --conf spark.sql.autoBroadcastJoinThreshold=-1
val df1 = Seq(
(1, 1, 1)
).toDF("key1", "key2", "time").repartition(3, col("key1"), col("key2"))
val df2 = Seq(
(1, 1, 1),
(2, 2, 2)
).toDF("key1", "key2", "time").repartition(3, col("key1"), col("key2"))
val x = df1.join(df2, usingColumns = Seq("key1", "key2", "time"))
x.rdd.getNumPartitions == 200
Похоже, искра 2.4.0 не способна оптимизировать описанный случай из коробки, и необходимо расширение оптимизатора катализатора, как предложено @ user10938362.
Кстати. Вот информация о написании расширений оптимизатора катализатора https://developer.ibm.com/code/2017/11/30/learn-extension-points-apache-spark-extend-spark-catalyst-optimizer/
Поведение Catalyst Optimizer различается для pyspark и Scala (по крайней мере, для Spark 2.4).
Я управлял обоими и получил два разных плана.
Действительно, вы получаете 200 разделов в pyspark, если вы не укажете явно для pyspark:
spark.conf.set("spark.sql.shuffle.partitions", 3)
Затем обрабатываются 3 раздела, и, таким образом, 3 остаются в pyspark.
Немного удивлен, поскольку я думал, что под капотом это будет обычным делом. Так что люди продолжают говорить мне. Это просто показывает.
Физический план для pyspark с параметром, заданным через conf:
== Physical Plan ==
*(5) Project [key1#344L, key2#345L, time#346L]
+- SortMergeJoin [key1#344L, key2#345L, time#346L], [key1#350L, key2#351L, time#352L], LeftOuter
:- *(2) Sort [key1#344L ASC NULLS FIRST, key2#345L ASC NULLS FIRST, time#346L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key1#344L, key2#345L, time#346L, 3)
: +- *(1) Scan ExistingRDD[key1#344L,key2#345L,time#346L]
+- *(4) Sort [key1#350L ASC NULLS FIRST, key2#351L ASC NULLS FIRST, time#352L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key1#350L, key2#351L, time#352L, 3)
+- *(3) Filter ((isnotnull(key1#350L) && isnotnull(key2#351L)) && isnotnull(time#352L))
+- *(3) Scan ExistingRDD[key1#350L,key2#351L,time#352L]