Это расширение моего недавнего вопроса Избегание условий гонки в многопроцессорных очередях Python 3. Надеюсь, эта версия вопроса более конкретна.
TL;DR: В многопроцессорной модели, где рабочие процессы загружаются из очереди с помощью multiprocessing.Queue
, почему мои рабочие процессы так бездействуют?У каждого процесса есть собственная входная очередь, поэтому они Мы не боремся друг с другом за блокировку общей очереди, но очереди проводят много времени на самом деле просто пустыми. Основной процесс выполняет поток, связанный с вводом-выводом, — замедляет ли это заполнение входных очередей, связанное с процессором?
Я пытаюсь найти максимальный элемент декартова произведения N множеств, каждое из которых содержит M_i элементов (при 0 is_feasibleвозвращает True
. В моей задаче я пытаюсь найти комбинацию, элементы которой имеют наибольший вес: sum(element.weight для элемента в комбинации)
.
Размер моей задачи велик, как и сервер моей компании. Я пытаюсь переписать следующий последовательный алгоритм как параллельный.
from operator import itemgetter
from itertools import product # Cartesian product function from the std lib
def optimize(sets):
"""Return the largest (total-weight, combination) tuple from all
possible combinations of the elements in the several sets, subject
to the constraint that is_feasible(combo) returns True."""
return max(
map(
lambda combination: (
sum(element.weight for element in combination),
combination
),
filter(
is_feasible, # Returns True if combo meets constraint
product(*sets)
)
),
key=itemgetter(0) # Only maximize based on sum of weight
)
Мой текущий многопроцессорный подход заключается в создании рабочих процессов и подаче им комбинаций с входной очередью. Когда рабочие получают ядовитую таблетку, они помещают наилучшую комбинацию, которую они видели, в выходную очередь и выходят. Я заполняю входную очередь из основного потока основного процесса. Одним из преимуществ этого метода является то, что я могу создать вторичный поток из основного процесса для запуска инструмента мониторинга (просто REPL, который я могу использовать, чтобы увидеть, сколько комбинаций уже обработано и насколько заполнены очереди).
+-----------+
in_q0 | worker0 |----\
/-------+-----------+ \
+-----------+ in_q1 +-----------+ \ out_q +-----------+
| main |-----------| worker1 |-----------| main |
+-----------+ +-----------+ / +-----------+
\-------+-----------+ /
in_q2 | worker2 |----/
+-----------+
Первоначально все рабочие процессы считывались из одной входной очереди, но я обнаружил, что ни один из них не нагружает ЦП. Решив, что они тратят все свое время на ожидание разблокировки queue.get(), я дал им свои собственные очереди. Это увеличило нагрузку на ЦП, поэтому я решил, что рабочие активны чаще. Однако очереди большую часть времени пустуют! (Я знаю это из мониторинга REPL, о котором я упоминал).Это говорит мне о том, что основной цикл, заполняющий очереди, работает медленно. Вот этот цикл:
from itertools import cycle
main():
# (Create workers, each with its own input queue)
# Cycle through each worker's queue and add a combination to that queue
for combo, worker in zip(product(*sets), cycle(workers)):
worker.in_q.put(combo)
# (Collect results and return)
Я предполагаю, что узким местом является worker.in_q.put()
. Как мне сделать это быстрее? Моим первым побуждением было сделать рабочие процессы медленнее, но это просто не имеет смысла... Проблема в том, что поток монитора слишком часто останавливает цикл? Как бы я мог сказать?
В качестве альтернативы, есть ли другой способ реализовать это, который не требует так много ожидания на блокировках?