Проблема SSIS CozyRoc Excel Source Plus с десятичной усечкой

Это больше идея, чем полное решение, и я еще не тестировал ее.

Вы можете начать с извлечения конвейера обработки данных в функцию.

def pipeline(f: String, n: Int) = {
    sqlContext
        .read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(f)
        .repartition(n)
        .groupBy(...)
        .agg(...)
        .cache // Cache so we can force computation later
}

Если ваши файлы малы, вы можете отрегулировать параметр n, чтобы использовать как можно меньшее количество разделов для соответствия данным из одного файла и избегать перетасовки. Это означает, что вы ограничиваете параллелизм, но мы вернемся к этой проблеме позже.

val n: Int = ??? 

Затем вам нужно получить список входных файлов. Этот шаг зависит от источника данных, но большую часть времени он более или менее прост:

val files: Array[String] = ???

Затем вы можете сопоставить вышеуказанный список с помощью функции pipeline:

val rdds = files.map(f => pipeline(f, n))

Поскольку мы ограничиваем параллелизм на уровне одного файла, который мы хотим компенсировать, отправив несколько заданий. Давайте добавим простой помощник, который заставляет оценивать и обертывает его с помощью Future

import scala.concurrent._
import ExecutionContext.Implicits.global

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
    df.rdd.foreach(_ => ()) // Force computation
    df
}

. Наконец, мы можем использовать вспомогательный помощник на rdds:

val result = Future.sequence(
   rdds.map(rdd => pipelineToFuture(rdd)).toList
)

В зависимости от вашего требования вы можете добавить onComplete обратные вызовы или использовать реактивные потоки для сбора результатов.

0
задан Pablo Boswell 13 July 2018 в 22:49
поделиться