Я новичок в python. Я использую многопроцессорный модуль для чтения строк текста на стандартном вводе, каким-то образом преобразовываю их и записываю в базу данных. Вот фрагмент моего кода:
batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
batch.append(content)
if len(batch) >= 10000:
pool.apply_async(insert, args=(batch,i+1))
batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()
Теперь все работает нормально, пока я не обработаю огромные входные файлы (сотни миллионов строк), которые я передаю в свою программу на Python. В какой-то момент, когда моя база данных становится медленнее, я вижу, что память заполняется.
Поиграв немного, оказалось, что pool.apply_async, как и pool.map_async никогда не блокируются, так что очередь вызовов для обработки становится все больше и больше.
Каков правильный подход к моей проблеме? Я ожидаю, что параметр, который я могу установить, будет блокировать вызов pool.apply_async, как только будет достигнута определенная длина очереди. AFAIR в Java для этой цели можно предоставить ThreadPoolExecutor BlockingQueue с фиксированной длиной.
Спасибо!