Хорошо, поскольку в настоящее время нет ответов, я не чувствую себя слишком плохо, делая это. Хотя меня все еще интересует, что на самом деле происходит за кулисами, вызывая эту проблему, мои самые неотложные вопросы - это те, которые указаны в обновлении 2. Это:
Каковы различия между JoinableQueue
и Manager (). Queue ()
(и когда лучше использовать одно вместо другого?). И, что немаловажно, безопасно ли в этом примере заменить один на другой?
В следующем коде у меня есть простой пул процессов. Каждому процессу передается очередь процессов ( pq
) для извлечения данных для обработки и очередь возвращаемых значений ( rq
) для передачи возвращенных значений обработки обратно в основной поток. Если я не добавляю в очередь возвращаемых значений, это работает, но как только я это сделаю, по какой-то причине процессы заблокированы от остановки. В обоих случаях процессы запускают
методы return, поэтому он не помещает
в блокировку очереди возврата, но во втором случае сами процессы не завершаются, поэтому программа заходит в тупик, когда я присоединиться к
по процессам. Почему это должно быть?
Обновления:
Кажется, что-то связано с количеством элементов в очереди.
По крайней мере, на моей машине у меня может быть до 6570 элементов в очереди, и это действительно работает, но если больше, то возникает тупик.
Кажется, работает с Manager (). Queue ()
.
Будь то ограничение JoinableQueue
или просто мое непонимание различий между двумя объектами, я обнаружил, что если я заменю очередь возврата на Manager (). Queue ()
, все работает как положено. В чем разница между ними и когда следует использовать одно вместо другого?
Ошибка не возникает, если я использую rq
Ой. На мгновение здесь был ответ, и пока я его комментировал, он исчез. В любом случае, одна из вещей, которые он сказал, заключалась в том, чтобы спросить, будет ли эта ошибка возникать, если я добавлю потребителя. Я пробовал это, и ответ - нет.
Другая вещь, которую он упомянул, - это цитата из документации по многопроцессорной обработке как возможный ключ к проблеме. Ссылаясь на JoinableQueue
, он говорит:
... семафор, используемый для подсчета количества незавершенных задач, может в конечном итоге переполниться, вызывая исключение.
import multiprocessing
class _ProcSTOP:
pass
class Proc(multiprocessing.Process):
def __init__(self, pq, rq):
self._pq = pq
self._rq = rq
super().__init__()
print('++', self.name)
def run(self):
dat = self._pq.get()
while not dat is _ProcSTOP:
# self._rq.put(dat) # uncomment me for deadlock
self._pq.task_done()
dat = self._pq.get()
self._pq.task_done()
print('==', self.name)
def __del__(self):
print('--', self.name)
if __name__ == '__main__':
pq = multiprocessing.JoinableQueue()
rq = multiprocessing.JoinableQueue()
pool = []
for i in range(4):
p = Proc(pq, rq)
p.start()
pool.append(p)
for i in range(10000):
pq.put(i)
pq.join()
for i in range(4):
pq.put(_ProcSTOP)
pq.join()
while len(pool) > 0:
print('??', pool)
pool.pop().join() # hangs here (if using rq)
print('** complete')
Пример вывода без использования очереди возврата:
++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-4
== Proc-3
== Proc-1
?? [, , , ]
== Proc-2
?? [, , ]
-- Proc-3
?? [, ]
-- Proc-2
?? []
-- Proc-1
** complete
-- Proc-4
Пример вывода с использованием очереди возврата:
++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-2
== Proc-4
== Proc-1
?? [, , , ]
== Proc-3
# here it hangs