Эффективная многопроцессорная обработка массивной, грубой силы максимизации в Python 3

Это расширение моего недавнего вопроса Избегание условий гонки в многопроцессорных очередях Python 3. Надеюсь, эта версия вопроса более конкретна.

TL;DR: В многопроцессорной модели, где рабочие процессы загружаются из очереди с помощью multiprocessing.Queue, почему мои рабочие процессы так бездействуют?У каждого процесса есть собственная входная очередь, поэтому они Мы не боремся друг с другом за блокировку общей очереди, но очереди проводят много времени на самом деле просто пустыми. Основной процесс выполняет поток, связанный с вводом-выводом, — замедляет ли это заполнение входных очередей, связанное с процессором?

Я пытаюсь найти максимальный элемент декартова произведения N множеств, каждое из которых содержит M_i элементов (при 0 is_feasibleвозвращает True. В моей задаче я пытаюсь найти комбинацию, элементы которой имеют наибольший вес: sum(element.weight для элемента в комбинации).

Размер моей задачи велик, как и сервер моей компании. Я пытаюсь переписать следующий последовательный алгоритм как параллельный.

from operator import itemgetter
from itertools import product # Cartesian product function from the std lib
def optimize(sets):
    """Return the largest (total-weight, combination) tuple from all
    possible combinations of the elements in the several sets, subject
    to the constraint that is_feasible(combo) returns True."""
    return max(
                map(
                    lambda combination: (
                        sum(element.weight for element in combination),
                        combination
                    ),
                    filter(
                        is_feasible, # Returns True if combo meets constraint
                        product(*sets)
                    )
                ),
                key=itemgetter(0) # Only maximize based on sum of weight
            )

Мой текущий многопроцессорный подход заключается в создании рабочих процессов и подаче им комбинаций с входной очередью. Когда рабочие получают ядовитую таблетку, они помещают наилучшую комбинацию, которую они видели, в выходную очередь и выходят. Я заполняю входную очередь из основного потока основного процесса. Одним из преимуществ этого метода является то, что я могу создать вторичный поток из основного процесса для запуска инструмента мониторинга (просто REPL, который я могу использовать, чтобы увидеть, сколько комбинаций уже обработано и насколько заполнены очереди).

                    +-----------+
            in_q0   |   worker0 |----\
            /-------+-----------+     \
+-----------+   in_q1   +-----------+  \ out_q  +-----------+
|   main    |-----------|   worker1 |-----------|   main    |
+-----------+           +-----------+  /        +-----------+
            \-------+-----------+     /
            in_q2   |   worker2 |----/
                    +-----------+

Первоначально все рабочие процессы считывались из одной входной очереди, но я обнаружил, что ни один из них не нагружает ЦП. Решив, что они тратят все свое время на ожидание разблокировки queue.get(), я дал им свои собственные очереди. Это увеличило нагрузку на ЦП, поэтому я решил, что рабочие активны чаще. Однако очереди большую часть времени пустуют! (Я знаю это из мониторинга REPL, о котором я упоминал).Это говорит мне о том, что основной цикл, заполняющий очереди, работает медленно. Вот этот цикл:

from itertools import cycle
main():
    # (Create workers, each with its own input queue)
    # Cycle through each worker's queue and add a combination to that queue
    for combo, worker in zip(product(*sets), cycle(workers)):
        worker.in_q.put(combo)
    # (Collect results and return)

Я предполагаю, что узким местом является worker.in_q.put(). Как мне сделать это быстрее? Моим первым побуждением было сделать рабочие процессы медленнее, но это просто не имеет смысла... Проблема в том, что поток монитора слишком часто останавливает цикл? Как бы я мог сказать?

В качестве альтернативы, есть ли другой способ реализовать это, который не требует так много ожидания на блокировках?

8
задан Community 23 May 2017 в 12:27
поделиться