Вместо input.nextLine()
используйте input.next()
, что должно решить проблему.
Измененный код:
public static Scanner input = new Scanner(System.in);
public static void main(String[] args)
{
System.out.print("Insert a number: ");
int number = input.nextInt();
System.out.print("Text1: ");
String text1 = input.next();
System.out.print("Text2: ");
String text2 = input.next();
}
Это больше идея, чем полное решение, и я еще не тестировал ее.
Вы можете начать с извлечения конвейера обработки данных в функцию.
def pipeline(f: String, n: Int) = {
sqlContext
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load(f)
.repartition(n)
.groupBy(...)
.agg(...)
.cache // Cache so we can force computation later
}
Если ваши файлы малы, вы можете отрегулировать параметр n
, чтобы использовать как можно меньшее количество разделов для соответствия данным из одного файла и избегать перетасовки. Это означает, что вы ограничиваете параллелизм, но мы вернемся к этой проблеме позже.
val n: Int = ???
Затем вам нужно получить список входных файлов. Этот шаг зависит от источника данных, но большую часть времени он более или менее прост:
val files: Array[String] = ???
Затем вы можете сопоставить вышеуказанный список с помощью функции pipeline
:
val rdds = files.map(f => pipeline(f, n))
Поскольку мы ограничиваем параллелизм на уровне одного файла, который мы хотим компенсировать, отправив несколько заданий. Давайте добавим простой помощник, который заставляет оценивать и обертывает его с помощью Future
import scala.concurrent._
import ExecutionContext.Implicits.global
def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
df.rdd.foreach(_ => ()) // Force computation
df
}
. Наконец, мы можем использовать вспомогательный помощник на rdds
:
val result = Future.sequence(
rdds.map(rdd => pipelineToFuture(rdd)).toList
)
В зависимости от вашего требования вы можете добавить onComplete
обратные вызовы или использовать реактивные потоки для сбора результатов.
Если у вас много файлов, и каждый файл невелик (вы скажете, что 300 МБ выше, что я считаю малым для Spark), вы можете попробовать использовать SparkContext.wholeTextFiles
, который создаст RDD, где каждая запись будет целым файлом.