Я пытаюсь найти максимальный вес около 6,1 миллиарда (пользовательских) элементов, и я хотел бы сделать это с помощью параллельной обработки. Для моего конкретного приложения есть лучшие алгоритмы, которые не требуют повторения более 6,1 миллиарда элементов, но учебник, объясняющий их, мне не по плечу, а мой босс хочет, чтобы это было сделано за 4 дня. Я решил, что у меня есть лучшие шансы с модным сервером моей компании и параллельной обработкой. Однако все, что я знаю о параллельной обработке, я получил из документации по Python . Иными словами, я совсем запутался...
Моя текущая теория состоит в том, чтобы настроить питающий процесс, входную очередь, целую группу (скажем, 30) рабочих процессов и выходную очередь (нахождение максимального элемент в выходной очереди будет тривиальным).Чего я не понимаю, так это того, как процесс подачи может сообщить рабочим процессам, когда прекратить ожидание прохождения элементов через входную очередь.
Я думал об использовании multiprocessing.Pool.map_async
для моей итерации элементов 6.1E9, но только на итерацию элементов без каких-либо действий с ними уходит почти 10 минут. Если я чего-то не понимаю... , map_async
перебирает их, чтобы назначить их процессам, когда процессы начинают свою работу. (Pool
также предоставляет imap
, но в документации говорится, что он похож на map
, который не работает асинхронно. Я хочу асинхронный режим, верно?)
Связанные вопросы: Хочу ли я использовать concurrent.futures
вместо multiprocessing
? Я не мог быть первым, кто внедрил систему с двумя очередями (именно так работают очереди в каждом гастрономе в Америке...), так что есть ли более Pythonic/встроенный способ сделать это?
Вот схема того, что я пытаюсь сделать. См. блок комментариев посередине.
import multiprocessing as mp
import queue
def faucet(items, bathtub):
"""Fill bathtub, a process-safe queue, with 6.1e9 items"""
for item in items:
bathtub.put(item)
bathtub.close()
def drain_filter(bathtub, drain):
"""Put maximal item from bathtub into drain.
Bathtub and drain are process-safe queues.
"""
max_weight = 0
max_item = None
while True:
try:
current_item = bathtub.get()
# The following line three lines are the ones that I can't
# quite figure out how to trigger without a race condition.
# What I would love is to trigger them AFTER faucet calls
# bathtub.close and the bathtub queue is empty.
except queue.Empty:
drain.put((max_weight, max_item))
return
else:
bathtub.task_done()
if not item.is_relevant():
continue
current_weight = item.weight
if current_weight > max_weight:
max_weight = current_weight
max_item = current_item
def parallel_max(items, nprocs=30):
"""The elements of items should have a method `is_relevant`
and an attribute `weight`. `items` itself is an immutable
iterator object.
"""
bathtub_q = mp.JoinableQueue()
drain_q = mp.Queue()
faucet_proc = mp.Process(target=faucet, args=(items, bathtub_q))
worker_procs = mp.Pool(processes=nprocs)
faucet_proc.start()
worker_procs.apply_async(drain_filter, bathtub_q, drain_q)
finalists = []
for i in range(nprocs):
finalists.append(drain_q.get())
return max(finalists)
Я нашел очень подробный ответ на свой вопрос, а также краткое введение в многозадачность от директора по коммуникациям Python Foundation Дуга Хеллмана. То, что я хотел, было шаблоном «ядовитой таблетки». Проверьте это здесь: http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html
Спасибо @MRAB за публикацию ядра этой концепции.