Ну, он действительно добавляет к list1
, проблема не в том, о котором вы думаете. Каждая переменная, указанная в закрытии, сериализуется и отправляется работникам. Это также относится к list1
.
Каждый раздел получает свою собственную копию list1
, когда file_read
называется данными, добавляется к этой копии, и когда эта фаза карты завершается, выходит за рамки и отбрасывается.
Не особенно элегантный фрагмент кода, но вы должны увидеть, что это действительно то, что здесь происходит:
rdd = sc.parallelize(range(100), 5)
line1 = []
def file_read(line):
list1.append(line)
print len(list1)
return line
xs = rdd.map(file_read).collect()
Изменить
Spark предоставляет два типа общей переменной. Широковещательные переменные , которые читаются только с точки зрения рабочего, и аккумуляторы , которые записываются только с точки зрения драйвера.
По умолчанию аккумуляторы поддерживают только числовые переменные и предназначены для использования в основном как счетчики. Однако можно определить пользовательские аккумуляторы. Для этого вам необходимо расширить класс AccumulatorParam
и предоставить пользовательские zero
и addInPlace
реализации:
class ListParam(AccumulatorParam):
def zero(self, v):
return []
def addInPlace(self, acc1, acc2):
acc1.extend(acc2)
return acc1
Далее вы можете переопределить file_read
следующим образом:
def file_read1(line):
global list1 # Required otherwise the next line will fail
list1 += [line]
return line
Пример использования:
list1 = sc.accumulator([], ListParam())
rdd = sc.parallelize(range(10)).map(file_read1).collect()
list1.value
Даже если можно использовать такой аккумулятор, возможно, это к дорогостоящим, которые будут использоваться на практике, а в худшем случае это может привести к поломке водителя. Вместо этого вы можете просто использовать другое преобразование:
tmp = (inputData
.map(lambda line: line.split(","))
.filter(lambda line: len(line) >1 ))
def line_read2(line): return ... # Just a core logic
line1 = tmp.map(lambda line: line[10])
column_val = tmp.map(line_read2)
Боковое примечание:
Код, который вы предоставили, ничего не делает. Преобразования в Spark - это просто описания того, что нужно сделать, но пока вы не вызовете данные действия, ничего не выполняется.