Мне очень жаль, что я не могу воспроизвести ошибку на более простом примере, а мой код слишком сложен для публикации.Если я запускаю программу в оболочке IPython вместо обычного Python, все работает хорошо.
Я просмотрел некоторые предыдущие заметки по этой проблеме. Все они были вызваны использованием пула для вызова функции, определенной в функции класса. Но это не ко мне.
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Буду признателен за любую помощь.
Обновление : функция I pickle определена на верхнем уровне модуля. Хотя он вызывает функцию, содержащую вложенную функцию. т.е. f ()
вызывает g ()
вызывает h ()
, который имеет вложенную функцию i ()
, и я вызываю pool.apply_async (f)
. f ()
, g ()
, h ()
все определены на верхнем уровне. Я пробовал более простой пример с этим шаблоном, но он работает.
Опираясь на решение @rocksportrocker, было бы целесообразно укропить при отправке и получении результатов.
import dill
import itertools
def run_dill_encoded(payload):
fun, args = dill.loads(payload)
res = fun(*args)
res = dill.dumps(res)
return res
def dill_map_async(pool, fun, args_list,
as_tuple=True,
**kw):
if as_tuple:
args_list = ((x,) for x in args_list)
it = itertools.izip(
itertools.cycle([fun]),
args_list)
it = itertools.imap(dill.dumps, it)
return pool.map_async(run_dill_encoded, it, **kw)
if __name__ == '__main__':
import multiprocessing as mp
import sys,os
p = mp.Pool(4)
res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
[lambda x:x+1]*10,)
res = res.get(timeout=100)
res = map(dill.loads,res)
print(res)
Когда эта проблема придумывает multiprocessing
, простое решение состоит в том, чтобы переключиться от Pool
до ThreadPool
. Это может быть сделано без изменения кода кроме импорта -
from multiprocessing.pool import ThreadPool as Pool
Это работает, потому что ThreadPool совместно использует память с основным потоком, вместо того, чтобы создать новый процесс - это означает, что соление не требуется.