Вывод функции foreach на Spark DataFrame [дубликат]

Самый простой ответ - простой вопрос - нужны ли вам результаты, которые можно было бы повторить? Если да, то NOLOCKS не подходит ни при каких обстоятельствах.

Если вам не нужна повторяемость, то nolocks могут быть полезны, особенно если вы не контролируете все процессы, подключающиеся к целевой базе данных.

32
задан lmart999 13 August 2014 в 21:13
поделиться

5 ответов

Эта ошибка связана с тем, что print не является функцией в Python 2.6.

Вы можете либо определить вспомогательный UDF, который выполняет печать, либо использовать библиотеку __ future __ для обработки print как функции:

>>> from operator import add
>>> f = sc.textFile("README.md")
>>> def g(x):
...     print x
...
>>> wc.foreach(g)

или

>>> from __future__ import print_function
>>> wc.foreach(print)

Однако, я думаю, было бы лучше использовать collect(), чтобы вернуть содержимое RDD драйвер, потому что foreach выполняется на рабочих узлах, и выходы могут не обязательно отображаться в вашем драйвере / оболочке (вероятно, он будет работать в режиме local, но не при работе в кластере).

>>> for x in wc.collect():
...     print x
37
ответ дан Josh Rosen 24 August 2018 в 18:12
поделиться

В Spark 2.0 (я не тестировал более ранние версии). Просто:

print myRDD.take(n)

Где n - количество строк, а myRDD - wc в вашем случае.

14
ответ дан Frederico Oliveira 24 August 2018 в 18:12
поделиться

Если вы хотите увидеть содержимое RDD, тогда yes collect - это один параметр, но он извлекает все данные в драйвер, поэтому может возникнуть проблема

<rdd.name>.take(<num of elements you want to fetch>)

Лучше, если вы хотите просто увидеть образец

Запуск foreach и попытка печати, я не рекомендую это, потому что если вы используете это в кластере, тогда журналы печати будут локальными для исполнителя и будут печатать для данных, доступных этому исполнителю. print заявление не меняет состояние, следовательно, это не логически неправильно. Чтобы получить все журналы, вам нужно будет сделать что-то вроде

**Pseudocode**
collect
foreach print

. Но это может привести к поломке задания, так как сбор всех данных на драйвере может привести к его краху. Я бы предложил использовать команду take или если вы хотите ее проанализировать, тогда используйте образец для сбора в драйвере или напишите в файл, а затем проанализируйте его.

5
ответ дан iec2011007 24 August 2018 в 18:12
поделиться

Попробуйте это:

data = f.flatMap(lambda x: x.split(' '))
map = data.map(lambda x: (x, 1))
mapreduce = map.reduceByKey(lambda x,y: x+y)
result = mapreduce.collect()

Обратите внимание, что при запуске collect () RDD - это распределенный набор данных, агрегируется в узле драйвера и по существу преобразуется в список. Таким образом, очевидно, что собирать () набор данных 2T не рекомендуется. Если вам нужно всего несколько образцов из вашего RDD, используйте take (10).

4
ответ дан Jeevs 24 August 2018 в 18:12
поделиться

В последнем документе вы можете использовать rdd.collect (). foreach (println) в драйвере для отображения всех, но это может вызвать проблемы с памятью в драйвере, лучше всего использовать rdd.take (wish_number)

https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html

Чтобы распечатать все элементы в драйвере, один может использовать метод collect (), чтобы сначала привести RDD к узлу драйвера таким образом: rdd.collect (). foreach (println). Это может привести к тому, что драйвер исчерпает память, потому что collect () извлекает весь RDD на одну машину; если вам нужно всего лишь напечатать несколько элементов RDD, более безопасный подход - использовать take (): rdd.take (100) .foreach (println).

0
ответ дан YDD9 24 August 2018 в 18:12
поделиться
Другие вопросы по тегам:

Похожие вопросы: