SPARK-5063 относится к лучшим сообщениям об ошибках при попытке вложить RDD-операции, которые не поддерживаются.
Это проблема юзабилити, а не функциональная. Основной причиной является вложенность операций RDD, и решение должно разбить это.
Здесь мы пытаемся присоединиться к dRDD
и mRDD
. Если размер mRDD
велико, rdd.join
был бы рекомендованным способом, если mRDD
невелик, то есть подходит в памяти каждого исполнителя, мы могли бы его собирать, транслировать и делать «карту» 'join.
Простое объединение будет выглядеть следующим образом:
val rdd = sc.parallelize(Seq(Array("one","two","three"), Array("four", "five", "six")))
val map = sc.parallelize(Seq("one" -> 1, "two" -> 2, "three" -> 3, "four" -> 4, "five" -> 5, "six"->6))
val flat = rdd.flatMap(_.toSeq).keyBy(x=>x)
val res = flat.join(map).map{case (k,v) => v}
Если мы хотим использовать широковещательную рассылку, нам сначала нужно собрать значение таблицы разрешений локально для b / c, что для всех исполнителей. ПРИМЕЧАНИЕ: RDD, который будет транслироваться ДОЛЖЕН , вписывается в память драйвера, а также каждого исполнителя.
val rdd = sc.parallelize(Seq(Array("one","two","three"), Array("four", "five", "six")))
val map = sc.parallelize(Seq("one" -> 1, "two" -> 2, "three" -> 3, "four" -> 4, "five" -> 5, "six"->6)))
val bcTable = sc.broadcast(map.collectAsMap)
val res2 = rdd.flatMap{arr => arr.map(elem => (elem, bcTable.value(elem)))}