пул python apply_async и map_async не блокирует полную очередь

Я новичок в python. Я использую многопроцессорный модуль для чтения строк текста на стандартном вводе, каким-то образом преобразовываю их и записываю в базу данных. Вот фрагмент моего кода:

batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
    batch.append(content)
    if len(batch) >= 10000:
        pool.apply_async(insert, args=(batch,i+1))
        batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()

Теперь все работает нормально, пока я не обработаю огромные входные файлы (сотни миллионов строк), которые я передаю в свою программу на Python. В какой-то момент, когда моя база данных становится медленнее, я вижу, что память заполняется.

Поиграв немного, оказалось, что pool.apply_async, как и pool.map_async никогда не блокируются, так что очередь вызовов для обработки становится все больше и больше.

Каков правильный подход к моей проблеме? Я ожидаю, что параметр, который я могу установить, будет блокировать вызов pool.apply_async, как только будет достигнута определенная длина очереди. AFAIR в Java для этой цели можно предоставить ThreadPoolExecutor BlockingQueue с фиксированной длиной.

Спасибо!

10
задан noxdafox 16 March 2018 в 08:15
поделиться