Другое событие NullPointerException
возникает, когда объявляется массив объектов, а затем сразу же пытается разыменовать его внутри.
String[] phrases = new String[10];
String keyPhrase = "Bird";
for(String phrase : phrases) {
System.out.println(phrase.equals(keyPhrase));
}
Этот конкретный NPE можно избежать, если порядок сравнения отменяется ; а именно, использовать .equals
для гарантированного непустого объекта.
Все элементы внутри массива инициализируются их общим начальным значением ; для любого типа массива объектов, это означает, что все элементы null
.
Вы должны инициализировать элементы в массиве перед доступом или разыменованием их.
String[] phrases = new String[] {"The bird", "A bird", "My bird", "Bird"};
String keyPhrase = "Bird";
for(String phrase : phrases) {
System.out.println(phrase.equals(keyPhrase));
}
Это больше идея, чем полное решение, и я еще не тестировал ее.
Вы можете начать с извлечения конвейера обработки данных в функцию.
def pipeline(f: String, n: Int) = {
sqlContext
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.load(f)
.repartition(n)
.groupBy(...)
.agg(...)
.cache // Cache so we can force computation later
}
Если ваши файлы малы, вы можете отрегулировать параметр n
, чтобы использовать как можно меньшее количество разделов для соответствия данным из одного файла и избегать перетасовки. Это означает, что вы ограничиваете параллелизм, но мы вернемся к этой проблеме позже.
val n: Int = ???
Затем вам нужно получить список входных файлов. Этот шаг зависит от источника данных, но большую часть времени он более или менее прост:
val files: Array[String] = ???
Затем вы можете сопоставить вышеуказанный список с помощью функции pipeline
:
val rdds = files.map(f => pipeline(f, n))
Поскольку мы ограничиваем параллелизм на уровне одного файла, который мы хотим компенсировать, отправив несколько заданий. Давайте добавим простой помощник, который заставляет оценивать и обертывает его с помощью Future
import scala.concurrent._
import ExecutionContext.Implicits.global
def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
df.rdd.foreach(_ => ()) // Force computation
df
}
. Наконец, мы можем использовать вспомогательный помощник на rdds
:
val result = Future.sequence(
rdds.map(rdd => pipelineToFuture(rdd)).toList
)
В зависимости от вашего требования вы можете добавить onComplete
обратные вызовы или использовать реактивные потоки для сбора результатов.
Если у вас много файлов, и каждый файл невелик (вы скажете, что 300 МБ выше, что я считаю малым для Spark), вы можете попробовать использовать SparkContext.wholeTextFiles
, который создаст RDD, где каждая запись будет целым файлом.
sqlContext
используется только для драйвера, поэтому нет причин для сериализации. – zero323 10 August 2015 в 12:55org.apache.spark.rdd.AsyncRDDActions
– zero323 9 August 2016 в 23:17