Как то, как вы передаете DataFrame
и как вы обращаетесь к нему, неверны.
DataFrame
, вы должны использовать функции broadcast
, которые отмечают DataFrame
для трансляции: import org.apache.spark.sql.functions.broadcast
val countriesDf: DataFrame = ???
val tmp: DataFrame = broadcast(
countriesDf.withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries")
)
txnTable.as("df1").join(
broadcast(tmp), $"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
Внутренне это будет collect
tmp
без преобразования из внутреннего и трансляции впоследствии. SparkContext.broadcast
с распределенной структурой, значение широковещательной структуры оценивается локально до вызова join
. Вот почему ваша функция работает вообще, но не выполняет широковещательное соединение.