Избегание условий гонки в многопроцессорных очередях Python 3

Я пытаюсь найти максимальный вес около 6,1 миллиарда (пользовательских) элементов, и я хотел бы сделать это с помощью параллельной обработки. Для моего конкретного приложения есть лучшие алгоритмы, которые не требуют повторения более 6,1 миллиарда элементов, но учебник, объясняющий их, мне не по плечу, а мой босс хочет, чтобы это было сделано за 4 дня. Я решил, что у меня есть лучшие шансы с модным сервером моей компании и параллельной обработкой. Однако все, что я знаю о параллельной обработке, я получил из документации по Python . Иными словами, я совсем запутался...

Моя текущая теория состоит в том, чтобы настроить питающий процесс, входную очередь, целую группу (скажем, 30) рабочих процессов и выходную очередь (нахождение максимального элемент в выходной очереди будет тривиальным).Чего я не понимаю, так это того, как процесс подачи может сообщить рабочим процессам, когда прекратить ожидание прохождения элементов через входную очередь.

Я думал об использовании multiprocessing.Pool.map_asyncдля моей итерации элементов 6.1E9, но только на итерацию элементов без каких-либо действий с ними уходит почти 10 минут. Если я чего-то не понимаю... , map_asyncперебирает их, чтобы назначить их процессам, когда процессы начинают свою работу. (Poolтакже предоставляет imap, но в документации говорится, что он похож на map, который не работает асинхронно. Я хочу асинхронный режим, верно?)

Связанные вопросы: Хочу ли я использовать concurrent.futuresвместо multiprocessing? Я не мог быть первым, кто внедрил систему с двумя очередями (именно так работают очереди в каждом гастрономе в Америке...), так что есть ли более Pythonic/встроенный способ сделать это?

Вот схема того, что я пытаюсь сделать. См. блок комментариев посередине.

import multiprocessing as mp
import queue

def faucet(items, bathtub):
    """Fill bathtub, a process-safe queue, with 6.1e9 items"""
    for item in items:
        bathtub.put(item)
    bathtub.close()

def drain_filter(bathtub, drain):
    """Put maximal item from bathtub into drain.
    Bathtub and drain are process-safe queues.
    """
    max_weight = 0
    max_item = None
    while True:
        try:
            current_item = bathtub.get()
        # The following line three lines are the ones that I can't
        # quite figure out how to trigger without a race condition.
        # What I would love is to trigger them AFTER faucet calls
        # bathtub.close and the bathtub queue is empty.
        except queue.Empty:
            drain.put((max_weight, max_item))
            return
        else:
            bathtub.task_done()
        if not item.is_relevant():
            continue
        current_weight = item.weight
        if current_weight > max_weight:
            max_weight = current_weight
            max_item = current_item

def parallel_max(items, nprocs=30):
    """The elements of items should have a method `is_relevant`
    and an attribute `weight`. `items` itself is an immutable
    iterator object.
    """
    bathtub_q = mp.JoinableQueue()
    drain_q = mp.Queue()

    faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q))
    worker_procs = mp.Pool(processes=nprocs)

    faucet_proc.start()
    worker_procs.apply_async(drain_filter, bathtub_q, drain_q)

    finalists = []
    for i in range(nprocs):
        finalists.append(drain_q.get())

    return max(finalists)


ВОТ ОТВЕТ

Я нашел очень подробный ответ на свой вопрос, а также краткое введение в многозадачность от директора по коммуникациям Python Foundation Дуга Хеллмана. То, что я хотел, было шаблоном «ядовитой таблетки». Проверьте это здесь: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html

Спасибо @MRAB за публикацию ядра этой концепции.

12
задан wkschwartz 17 May 2012 в 02:50
поделиться