Поточная обработка пула, подобного многопроцессорному Пулу?

Существует ли класс Пула для рабочих потоков, подобных классу Пула многопроцессорного модуля?

Мне нравится, например, простой способ параллелизировать функцию карты

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

однако я хотел бы сделать это без издержек создания новых процессов.

Я знаю о GIL. Однако в моем варианте использования, функция будет IO-bound C функция, для которой обертка Python выпустит GIL перед фактическим вызовом функции.

Я должен записать свое собственное объединение поточной обработки?

325
задан martineau 19 May 2017 в 04:29
поделиться

3 ответа

Я только что узнал, что на самом деле является поточным интерфейсом пула в модуле multiprocessing , однако он несколько скрыт и не задокументирован должным образом .

Его можно импортировать через

from multiprocessing.pool import ThreadPool

. Он реализован с использованием фиктивного класса Process, обертывающего поток Python. Этот основанный на потоках класс Process можно найти в multiprocessing.dummy , который кратко упоминается в docs . Этот фиктивный модуль предположительно предоставляет весь многопроцессорный интерфейс, основанный на потоках.

425
ответ дан 23 November 2019 в 00:51
поделиться

Накладные расходы на создание новых процессов минимальны, особенно когда их всего 4. Я сомневаюсь, что это проблема производительности вашего приложения. Сохраняйте простоту, оптимизируйте, где вам нужно и на что указывают результаты профилирования.

2
ответ дан 23 November 2019 в 00:51
поделиться

Нет встроенного пула на основе потоков. Однако можно очень быстро реализовать очередь производителя / потребителя с классом Queue .

От: https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done
1
ответ дан 23 November 2019 в 00:51
поделиться
Другие вопросы по тегам:

Похожие вопросы: