spark flatMapValues, которые не могут выделяться в разных линиях [дублировать]

В Java все переменные, которые вы объявляете, на самом деле являются «ссылками» на объекты (или примитивы), а не самими объектами.

При попытке выполнить один метод объекта , ссылка просит живой объект выполнить этот метод. Но если ссылка ссылается на NULL (ничего, нуль, void, nada), то нет способа, которым метод будет выполнен. Тогда runtime сообщит вам об этом, выбросив исключение NullPointerException.

Ваша ссылка «указывает» на нуль, таким образом, «Null -> Pointer».

Объект живет в памяти виртуальной машины пространство и единственный способ доступа к нему - использовать ссылки this. Возьмем этот пример:

public class Some {
    private int id;
    public int getId(){
        return this.id;
    }
    public setId( int newId ) {
        this.id = newId;
    }
}

И в другом месте вашего кода:

Some reference = new Some();    // Point to a new object of type Some()
Some otherReference = null;     // Initiallly this points to NULL

reference.setId( 1 );           // Execute setId method, now private var id is 1

System.out.println( reference.getId() ); // Prints 1 to the console

otherReference = reference      // Now they both point to the only object.

reference = null;               // "reference" now point to null.

// But "otherReference" still point to the "real" object so this print 1 too...
System.out.println( otherReference.getId() );

// Guess what will happen
System.out.println( reference.getId() ); // :S Throws NullPointerException because "reference" is pointing to NULL remember...

Это важно знать - когда больше нет ссылок на объект (в пример выше, когда reference и otherReference оба указывают на null), тогда объект «недоступен». Мы не можем работать с ним, поэтому этот объект готов к сбору мусора, и в какой-то момент VM освободит память, используемую этим объектом, и выделит другую.

10
задан Knows Not Much 2 March 2015 в 08:32
поделиться

2 ответа

Метод RDD.foreach в Spark работает на кластере, поэтому каждый рабочий, который содержит эти записи, выполняет операции в foreach. То есть ваш код запущен, но они печатаются на рабочем столе Spark, а не в сеансе драйвера / вашей оболочки. Если вы посмотрите на вывод (stdout) для ваших работников Spark, вы увидите, что они напечатаны на консоли.

Вы можете просмотреть stdout для рабочих, перейдя в веб-gui для каждого запущенного исполнителя. Пример URL-адреса: http: // workerIp: workerPort / logPage /? AppId = app-20150303023103-0043 & amp; executorId = 1 & amp; logType = stdout

Spark Executor Stdout [/g1]

В этом примере Spark решит поместить все записи RDD в один раздел.

Это имеет смысл, если вы думаете об этом - посмотрите на подпись функции для foreach - она ​​ничего не возвращает.

/**
 * Applies a function f to all elements of this RDD.
 */
def foreach(f: T => Unit): Unit

Это действительно цель foreach в scala - ее используется для побочного эффекта.

Когда вы собираете записи, вы возвращаете их обратно в драйвер, так что логически собирать / выполнять операции просто выполняются в коллекции Scala в драйвере Spark - вы можете увидеть выход журнала в качестве искрового драйвера / искровой оболочки это то, что печатает на stdout в вашей сессии.

Пример использования foreach может показаться неразумным, например: если для каждой записи в RDD вы хотели бы сделать какое-то внешнее поведение, например, вызвать REST api, вы можете сделать это в foreach, то каждый работник Spark отправил бы вызов серверу API со значением. Если foreach действительно возвращал записи, вы могли бы легко сдуть память в процессе драйвера / оболочки. Таким образом, вы избегаете этих проблем и можете делать побочные эффекты для всех элементов в RDD над кластером.

Если вы хотите увидеть, что в RDD я использую,

array.collect.foreach(println) 
//Instead of collect, use take(...) or takeSample(...) if the RDD is large
26
ответ дан NightWolf 19 August 2018 в 01:50
поделиться
  • 1
    Foreach отлично работает, когда вам нужно обновить аккумулятор внутри функции и хотите, чтобы действие гарантировало, что оно будет обновляться только один раз. Насколько я могу судить, это единственное действие в Spark, которое позволяет мне чисто запустить функцию на RDD. ( Per the Spark Docs Для обновлений аккумуляторов, выполненных только внутри действий, Spark гарантирует, что обновление каждой задачи к аккумулятору будет применяться только один раз, то есть перезапущенные задачи не будут обновлять значение) – JimLohse 8 March 2016 в 23:56
  • 2
    Итак, будет ли foreach фактически выполняться в рабочих узлах? Запуск в рабочих узлах - это намерение в искрах. Я видел места, где люди упоминали, & quot; Не использовать RDD.foreach () в кластерной среде & quot ;. Это правда? – Hari Ram 13 May 2018 в 12:44

Вы можете использовать RDD.toLocalIterator () для переноса данных в драйвер (один раздел RDD за раз):

val array = sc.parallelize(List(1, 2, 3, 4))
for(rec <- array.toLocalIterator) { println(rec) }

См. также

3
ответ дан Community 19 August 2018 в 01:50
поделиться
Другие вопросы по тегам:

Похожие вопросы: