Spark: Параллельное создание файлов файловой системы [дубликат]

Вместо input.nextLine() используйте input.next(), что должно решить проблему.

Измененный код:

public static Scanner input = new Scanner(System.in);

public static void main(String[] args)
{
    System.out.print("Insert a number: ");
    int number = input.nextInt();
    System.out.print("Text1: ");
    String text1 = input.next();
    System.out.print("Text2: ");
    String text2 = input.next();
}
3
задан zero323 11 August 2015 в 16:52
поделиться

2 ответа

Это больше идея, чем полное решение, и я еще не тестировал ее.

Вы можете начать с извлечения конвейера обработки данных в функцию.

def pipeline(f: String, n: Int) = {
    sqlContext
        .read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(f)
        .repartition(n)
        .groupBy(...)
        .agg(...)
        .cache // Cache so we can force computation later
}

Если ваши файлы малы, вы можете отрегулировать параметр n, чтобы использовать как можно меньшее количество разделов для соответствия данным из одного файла и избегать перетасовки. Это означает, что вы ограничиваете параллелизм, но мы вернемся к этой проблеме позже.

val n: Int = ??? 

Затем вам нужно получить список входных файлов. Этот шаг зависит от источника данных, но большую часть времени он более или менее прост:

val files: Array[String] = ???

Затем вы можете сопоставить вышеуказанный список с помощью функции pipeline:

val rdds = files.map(f => pipeline(f, n))

Поскольку мы ограничиваем параллелизм на уровне одного файла, который мы хотим компенсировать, отправив несколько заданий. Давайте добавим простой помощник, который заставляет оценивать и обертывает его с помощью Future

import scala.concurrent._
import ExecutionContext.Implicits.global

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
    df.rdd.foreach(_ => ()) // Force computation
    df
}

. Наконец, мы можем использовать вспомогательный помощник на rdds:

val result = Future.sequence(
   rdds.map(rdd => pipelineToFuture(rdd)).toList
)

В зависимости от вашего требования вы можете добавить onComplete обратные вызовы или использовать реактивные потоки для сбора результатов.

7
ответ дан zero323 23 August 2018 в 19:41
поделиться

Если у вас много файлов, и каждый файл невелик (вы скажете, что 300 МБ выше, что я считаю малым для Spark), вы можете попробовать использовать SparkContext.wholeTextFiles, который создаст RDD, где каждая запись будет целым файлом.

0
ответ дан mattinbits 23 August 2018 в 19:41
поделиться
Другие вопросы по тегам:

Похожие вопросы: