Я хотел бы узнать, как правильно выполняется мультипроцессирование. Предположим, у меня есть список [1,2,3,4,5]
, созданный функцией f1
, который записывается в Queue
(левый зеленый кружок). Теперь я запускаю два процесса, извлекающих данные из этой очереди (выполняя f2
в процессах). Они обрабатывают данные, скажем: удваивают значение, и записывают его во вторую очередь. Теперь функция f3
считывает эти данные и распечатывает их.
Внутри функций есть своего рода цикл, который пытается вечно читать из очереди. Как остановить этот процесс?
Идея 1
f1
отправляет не только список, но и объект None
или объект custon, class PipelineTerminator: pass
или что-то подобное, что просто распространяется по всему пути вниз. f3
теперь ожидает прихода None
, когда он приходит, он выходит из цикла. Проблема: возможно, что один из двух f2
считывает и распространяет None
, в то время как другой все еще обрабатывает число. Тогда последнее значение теряется.
Идея 2
f3
является f1
. Итак, функция f1
генерирует данные и трубы, порождает процессы с помощью f2
и подает все данные. После порождения и подачи данных он слушает вторую трубу, просто подсчитывая и обрабатывая полученные объекты. Поскольку он знает, сколько данных было скормлено, он может завершить процессы, выполняющие f2
. Но если целью является создание конвейера обработки, то различные шаги должны быть разделимы. Так f1
, f2
и f3
являются различными элементами конвейера, а дорогостоящие этапы выполняются параллельно.
Идея 3
Каждый элемент конвейера - это функция, эта функция порождает процессы по своему усмотрению и отвечает за управление ими. Она знает, сколько данных поступило и сколько данных было возвращено (с помощью yield
, возможно). Поэтому безопасно распространять объект None
.
setup child processes
execute thread one and two and wait until both finished
thread 1:
while True:
pull from input queue
if None: break and set finished_flag
else: push to queue1 and increment counter1
thread 2:
while True:
pull from queue2
increment counter2
yield result
if counter1 == counter2 and finished_flag: break
when both threads finished: kill process pool and return.
(Вместо использования потоков, возможно, можно придумать более умное решение.)
Итак ...
Я реализовал решение, следуя идее 2, подавая данные и ожидая результатов, но это был не совсем конвейер с независимыми функциями, соединенными вместе. Это работало для задачи, с которой я должен был справиться, но было трудно поддерживать.
Я хотел бы услышать от вас, как вы реализуете конвейеры (легко в одном процессе с функциями генератора и так далее, но в нескольких процессах?) и обычно управляете ими.