Многопроцессорная обработка Python PicklingError: Can't pickle

Мне очень жаль, что я не могу воспроизвести ошибку на более простом примере, а мой код слишком сложен для публикации.Если я запускаю программу в оболочке IPython вместо обычного Python, все работает хорошо.

Я просмотрел некоторые предыдущие заметки по этой проблеме. Все они были вызваны использованием пула для вызова функции, определенной в функции класса. Но это не ко мне.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Буду признателен за любую помощь.

Обновление : функция I pickle определена на верхнем уровне модуля. Хотя он вызывает функцию, содержащую вложенную функцию. т.е. f () вызывает g () вызывает h () , который имеет вложенную функцию i () , и я вызываю pool.apply_async (f) . f () , g () , h () все определены на верхнем уровне. Я пробовал более простой пример с этим шаблоном, но он работает.

215
задан kevins_1 9 October 2019 в 16:19
поделиться

2 ответа

Опираясь на решение @rocksportrocker, было бы целесообразно укропить при отправке и получении результатов.

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)
0
ответ дан 23 November 2019 в 04:21
поделиться

Когда эта проблема придумывает multiprocessing, простое решение состоит в том, чтобы переключиться от Pool до ThreadPool. Это может быть сделано без изменения кода кроме импорта -

from multiprocessing.pool import ThreadPool as Pool

Это работает, потому что ThreadPool совместно использует память с основным потоком, вместо того, чтобы создать новый процесс - это означает, что соление не требуется.

0
ответ дан 23 November 2019 в 04:21
поделиться
Другие вопросы по тегам:

Похожие вопросы: