Мультипроцессинг в конвейере, сделанный правильно

Я хотел бы узнать, как правильно выполняется мультипроцессирование. Предположим, у меня есть список [1,2,3,4,5], созданный функцией f1, который записывается в Queue (левый зеленый кружок). Теперь я запускаю два процесса, извлекающих данные из этой очереди (выполняя f2 в процессах). Они обрабатывают данные, скажем: удваивают значение, и записывают его во вторую очередь. Теперь функция f3 считывает эти данные и распечатывает их.

layout of the data flow

Внутри функций есть своего рода цикл, который пытается вечно читать из очереди. Как остановить этот процесс?

Идея 1

f1 отправляет не только список, но и объект None или объект custon, class PipelineTerminator: pass или что-то подобное, что просто распространяется по всему пути вниз. f3 теперь ожидает прихода None, когда он приходит, он выходит из цикла. Проблема: возможно, что один из двух f2считывает и распространяет None, в то время как другой все еще обрабатывает число. Тогда последнее значение теряется.

Идея 2

f3 является f1. Итак, функция f1 генерирует данные и трубы, порождает процессы с помощью f2 и подает все данные. После порождения и подачи данных он слушает вторую трубу, просто подсчитывая и обрабатывая полученные объекты. Поскольку он знает, сколько данных было скормлено, он может завершить процессы, выполняющие f2. Но если целью является создание конвейера обработки, то различные шаги должны быть разделимы. Так f1, f2 и f3 являются различными элементами конвейера, а дорогостоящие этапы выполняются параллельно.

Идея 3

pipeline idea 3

Каждый элемент конвейера - это функция, эта функция порождает процессы по своему усмотрению и отвечает за управление ими. Она знает, сколько данных поступило и сколько данных было возвращено (с помощью yield, возможно). Поэтому безопасно распространять объект None.

setup child processes 

execute thread one and two and wait until both finished

thread 1:
    while True:
        pull from input queue
        if None: break and set finished_flag
        else: push to queue1 and increment counter1

thread 2:
    while True:
        pull from queue2
        increment counter2
        yield result
        if counter1 == counter2 and finished_flag: break

when both threads finished: kill process pool and return.

(Вместо использования потоков, возможно, можно придумать более умное решение.)

Итак ...

Я реализовал решение, следуя идее 2, подавая данные и ожидая результатов, но это был не совсем конвейер с независимыми функциями, соединенными вместе. Это работало для задачи, с которой я должен был справиться, но было трудно поддерживать.

Я хотел бы услышать от вас, как вы реализуете конвейеры (легко в одном процессе с функциями генератора и так далее, но в нескольких процессах?) и обычно управляете ими.

14
задан wal-o-mat 26 November 2011 в 10:13
поделиться