Scala - функция вызова параллельно с различными параметрами [duplicate]

Поскольку вы знакомы с java, вы можете многому научиться в течение месяца. Несколько лет назад я также изучил php с точки зрения java. Самой трудной частью было думать гораздо более прямо. По сравнению с java-way php имеет тенденцию быть менее «академическим», который очень освежает, но определенно к чему-то я привык.

3
задан zero323 11 August 2015 в 16:52
поделиться

2 ответа

Это больше идея, чем полное решение, и я еще не тестировал ее.

Вы можете начать с извлечения конвейера обработки данных в функцию.

def pipeline(f: String, n: Int) = {
    sqlContext
        .read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(f)
        .repartition(n)
        .groupBy(...)
        .agg(...)
        .cache // Cache so we can force computation later
}

Если ваши файлы малы, вы можете отрегулировать параметр n, чтобы использовать как можно меньшее количество разделов для соответствия данным из одного файла и избегать перетасовки. Это означает, что вы ограничиваете параллелизм, но мы вернемся к этой проблеме позже.

val n: Int = ??? 

Затем вам нужно получить список входных файлов. Этот шаг зависит от источника данных, но большую часть времени он более или менее прост:

val files: Array[String] = ???

Затем вы можете сопоставить вышеуказанный список с помощью функции pipeline:

val rdds = files.map(f => pipeline(f, n))

Поскольку мы ограничиваем параллелизм на уровне одного файла, который мы хотим компенсировать, отправив несколько заданий. Давайте добавим простой помощник, который заставляет оценивать и обертывает его с помощью Future

import scala.concurrent._
import ExecutionContext.Implicits.global

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
    df.rdd.foreach(_ => ()) // Force computation
    df
}

. Наконец, мы можем использовать вспомогательный помощник на rdds:

val result = Future.sequence(
   rdds.map(rdd => pipelineToFuture(rdd)).toList
)

В зависимости от вашего требования вы можете добавить onComplete обратные вызовы или использовать реактивные потоки для сбора результатов.

7
ответ дан zero323 16 August 2018 в 05:50
поделиться
  • 1
    Ну, насколько мне известно, нет необходимости в обходном пути, потому что здесь совершенно неважно. sqlContext используется только для драйвера, поэтому нет причин для сериализации. – zero323 10 August 2015 в 12:55
  • 2
    @AlexNaspo Не полностью, но я использовал подобный подход один или два раза. Если у вас далеко не много памяти, имеет смысл выполнять фактическое действие, а не в зависимости от кеширования. Если вас интересует общий принцип, взгляните на org.apache.spark.rdd.AsyncRDDActions – zero323 9 August 2016 в 23:17

Если у вас много файлов, и каждый файл невелик (вы скажете, что 300 МБ выше, что я считаю малым для Spark), вы можете попробовать использовать SparkContext.wholeTextFiles, который создаст RDD, где каждая запись будет целым файлом.

0
ответ дан mattinbits 16 August 2018 в 05:50
поделиться
Другие вопросы по тегам:

Похожие вопросы: