Это больше идея, чем полное решение, и я еще не тестировал ее.
Вы можете начать с извлечения конвейера обработки данных в функцию.
def pipeline(f: String, n: Int) = {
sqlContext
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load(f)
.repartition(n)
.groupBy(...)
.agg(...)
.cache // Cache so we can force computation later
}
Если ваши файлы малы, вы можете отрегулировать параметр n
, чтобы использовать как можно меньшее количество разделов для соответствия данным из одного файла и избегать перетасовки. Это означает, что вы ограничиваете параллелизм, но мы вернемся к этой проблеме позже.
val n: Int = ???
Затем вам нужно получить список входных файлов. Этот шаг зависит от источника данных, но большую часть времени он более или менее прост:
val files: Array[String] = ???
Затем вы можете сопоставить вышеуказанный список с помощью функции pipeline
:
val rdds = files.map(f => pipeline(f, n))
Поскольку мы ограничиваем параллелизм на уровне одного файла, который мы хотим компенсировать, отправив несколько заданий. Давайте добавим простой помощник, который заставляет оценивать и обертывает его с помощью Future
import scala.concurrent._
import ExecutionContext.Implicits.global
def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
df.rdd.foreach(_ => ()) // Force computation
df
}
. Наконец, мы можем использовать вспомогательный помощник на rdds
:
val result = Future.sequence(
rdds.map(rdd => pipelineToFuture(rdd)).toList
)
В зависимости от вашего требования вы можете добавить onComplete
обратные вызовы или использовать реактивные потоки для сбора результатов.
Это - сокращение от объявления лямбда-выражения, которое не берет аргументов.
() => 42; // Takes no arguments returns 42
x => 42; // Takes 1 argument and returns 42
(x) => 42; // Identical to above
От MSDN. Лямбда Выражения принимает форму (исходные данные) => выражение. Так лямбда как () =>, который обозначает выражение, нет никаких входных параметров. Который подпись для Действия не берет параметров
I think of lambas like this:
(x) => { return x * 2; }
But only this is important:
(x) => { return x * 2; }
We need the => to know that it's a lambda instead of casting, and thus we get this:
x => x * 2
(sorry for not formatting code as code, that's because you can't make things bold in code..)
Это обозначает анонимную функцию без параметра.
Что делает эта строка должно добавить анонимное Действие к списку с помощью лямбда-выражений, который не берет параметра (это - причина, почему () там), и ничего не возвращает, вследствие того, что это печатает только фактическое значение счетчика.