Одновременно оптимизируйте обработку Spark [duplicate]

Другое событие NullPointerException возникает, когда объявляется массив объектов, а затем сразу же пытается разыменовать его внутри.

String[] phrases = new String[10];
String keyPhrase = "Bird";
for(String phrase : phrases) {
    System.out.println(phrase.equals(keyPhrase));
}

Этот конкретный NPE можно избежать, если порядок сравнения отменяется ; а именно, использовать .equals для гарантированного непустого объекта.

Все элементы внутри массива инициализируются их общим начальным значением ; для любого типа массива объектов, это означает, что все элементы null.

Вы должны инициализировать элементы в массиве перед доступом или разыменованием их.

String[] phrases = new String[] {"The bird", "A bird", "My bird", "Bird"};
String keyPhrase = "Bird";
for(String phrase : phrases) {
    System.out.println(phrase.equals(keyPhrase));
}

3
задан zero323 11 August 2015 в 16:52
поделиться

2 ответа

Это больше идея, чем полное решение, и я еще не тестировал ее.

Вы можете начать с извлечения конвейера обработки данных в функцию.

def pipeline(f: String, n: Int) = {
    sqlContext
        .read
        .format("com.databricks.spark.csv")
        .option("header", "true")
        .load(f)
        .repartition(n)
        .groupBy(...)
        .agg(...)
        .cache // Cache so we can force computation later
}

Если ваши файлы малы, вы можете отрегулировать параметр n, чтобы использовать как можно меньшее количество разделов для соответствия данным из одного файла и избегать перетасовки. Это означает, что вы ограничиваете параллелизм, но мы вернемся к этой проблеме позже.

val n: Int = ??? 

Затем вам нужно получить список входных файлов. Этот шаг зависит от источника данных, но большую часть времени он более или менее прост:

val files: Array[String] = ???

Затем вы можете сопоставить вышеуказанный список с помощью функции pipeline:

val rdds = files.map(f => pipeline(f, n))

Поскольку мы ограничиваем параллелизм на уровне одного файла, который мы хотим компенсировать, отправив несколько заданий. Давайте добавим простой помощник, который заставляет оценивать и обертывает его с помощью Future

import scala.concurrent._
import ExecutionContext.Implicits.global

def pipelineToFuture(df: org.apache.spark.sql.DataFrame) = future {
    df.rdd.foreach(_ => ()) // Force computation
    df
}

. Наконец, мы можем использовать вспомогательный помощник на rdds:

val result = Future.sequence(
   rdds.map(rdd => pipelineToFuture(rdd)).toList
)

В зависимости от вашего требования вы можете добавить onComplete обратные вызовы или использовать реактивные потоки для сбора результатов.

7
ответ дан zero323 19 August 2018 в 15:59
поделиться
  • 1
    Ну, насколько мне известно, нет необходимости в обходном пути, потому что здесь совершенно неважно. sqlContext используется только для драйвера, поэтому нет причин для сериализации. – zero323 10 August 2015 в 12:55
  • 2
    @AlexNaspo Не полностью, но я использовал подобный подход один или два раза. Если у вас далеко не много памяти, имеет смысл выполнять фактическое действие, а не в зависимости от кеширования. Если вас интересует общий принцип, взгляните на org.apache.spark.rdd.AsyncRDDActions – zero323 9 August 2016 в 23:17

Если у вас много файлов, и каждый файл невелик (вы скажете, что 300 МБ выше, что я считаю малым для Spark), вы можете попробовать использовать SparkContext.wholeTextFiles, который создаст RDD, где каждая запись будет целым файлом.

0
ответ дан mattinbits 19 August 2018 в 15:59
поделиться
Другие вопросы по тегам:

Похожие вопросы: