Самый простой ответ - простой вопрос - нужны ли вам результаты, которые можно было бы повторить? Если да, то NOLOCKS не подходит ни при каких обстоятельствах.
Если вам не нужна повторяемость, то nolocks могут быть полезны, особенно если вы не контролируете все процессы, подключающиеся к целевой базе данных.
Эта ошибка связана с тем, что print
не является функцией в Python 2.6.
Вы можете либо определить вспомогательный UDF, который выполняет печать, либо использовать библиотеку __ future __ для обработки print
как функции:
>>> from operator import add
>>> f = sc.textFile("README.md")
>>> def g(x):
... print x
...
>>> wc.foreach(g)
или
>>> from __future__ import print_function
>>> wc.foreach(print)
Однако, я думаю, было бы лучше использовать collect()
, чтобы вернуть содержимое RDD драйвер, потому что foreach
выполняется на рабочих узлах, и выходы могут не обязательно отображаться в вашем драйвере / оболочке (вероятно, он будет работать в режиме local
, но не при работе в кластере).
>>> for x in wc.collect():
... print x
В Spark 2.0 (я не тестировал более ранние версии). Просто:
print myRDD.take(n)
Где n - количество строк, а myRDD - wc в вашем случае.
Если вы хотите увидеть содержимое RDD, тогда yes collect - это один параметр, но он извлекает все данные в драйвер, поэтому может возникнуть проблема
<rdd.name>.take(<num of elements you want to fetch>)
Лучше, если вы хотите просто увидеть образец
Запуск foreach и попытка печати, я не рекомендую это, потому что если вы используете это в кластере, тогда журналы печати будут локальными для исполнителя и будут печатать для данных, доступных этому исполнителю. print заявление не меняет состояние, следовательно, это не логически неправильно. Чтобы получить все журналы, вам нужно будет сделать что-то вроде
**Pseudocode**
collect
foreach print
. Но это может привести к поломке задания, так как сбор всех данных на драйвере может привести к его краху. Я бы предложил использовать команду take или если вы хотите ее проанализировать, тогда используйте образец для сбора в драйвере или напишите в файл, а затем проанализируйте его.
Попробуйте это:
data = f.flatMap(lambda x: x.split(' '))
map = data.map(lambda x: (x, 1))
mapreduce = map.reduceByKey(lambda x,y: x+y)
result = mapreduce.collect()
Обратите внимание, что при запуске collect () RDD - это распределенный набор данных, агрегируется в узле драйвера и по существу преобразуется в список. Таким образом, очевидно, что собирать () набор данных 2T не рекомендуется. Если вам нужно всего несколько образцов из вашего RDD, используйте take (10).
В последнем документе вы можете использовать rdd.collect (). foreach (println) в драйвере для отображения всех, но это может вызвать проблемы с памятью в драйвере, лучше всего использовать rdd.take (wish_number)
https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html
Чтобы распечатать все элементы в драйвере, один может использовать метод collect (), чтобы сначала привести RDD к узлу драйвера таким образом: rdd.collect (). foreach (println). Это может привести к тому, что драйвер исчерпает память, потому что collect () извлекает весь RDD на одну машину; если вам нужно всего лишь напечатать несколько элементов RDD, более безопасный подход - использовать take (): rdd.take (100) .foreach (println).