Подпроцесс завершается, но все еще не завершается, что приводит к тупиковой ситуации.

Хорошо, поскольку в настоящее время нет ответов, я не чувствую себя слишком плохо, делая это. Хотя меня все еще интересует, что на самом деле происходит за кулисами, вызывая эту проблему, мои самые неотложные вопросы - это те, которые указаны в обновлении 2. Это:

Каковы различия между JoinableQueue и Manager (). Queue () (и когда лучше использовать одно вместо другого?). И, что немаловажно, безопасно ли в этом примере заменить один на другой?


В следующем коде у меня есть простой пул процессов. Каждому процессу передается очередь процессов ( pq ) для извлечения данных для обработки и очередь возвращаемых значений ( rq ) для передачи возвращенных значений обработки обратно в основной поток. Если я не добавляю в очередь возвращаемых значений, это работает, но как только я это сделаю, по какой-то причине процессы заблокированы от остановки. В обоих случаях процессы запускают методы return, поэтому он не помещает в блокировку очереди возврата, но во втором случае сами процессы не завершаются, поэтому программа заходит в тупик, когда я присоединиться к по процессам. Почему это должно быть?

Обновления:

  1. Кажется, что-то связано с количеством элементов в очереди.

    По крайней мере, на моей машине у меня может быть до 6570 элементов в очереди, и это действительно работает, но если больше, то возникает тупик.

  2. Кажется, работает с Manager (). Queue () .

    Будь то ограничение JoinableQueue или просто мое непонимание различий между двумя объектами, я обнаружил, что если я заменю очередь возврата на Manager (). Queue () , все работает как положено. В чем разница между ними и когда следует использовать одно вместо другого?

  3. Ошибка не возникает, если я использую rq

    Ой. На мгновение здесь был ответ, и пока я его комментировал, он исчез. В любом случае, одна из вещей, которые он сказал, заключалась в том, чтобы спросить, будет ли эта ошибка возникать, если я добавлю потребителя. Я пробовал это, и ответ - нет.

    Другая вещь, которую он упомянул, - это цитата из документации по многопроцессорной обработке как возможный ключ к проблеме. Ссылаясь на JoinableQueue , он говорит:

    ... семафор, используемый для подсчета количества незавершенных задач, может в конечном итоге переполниться, вызывая исключение.


import multiprocessing

class _ProcSTOP:
    pass

class Proc(multiprocessing.Process):

    def __init__(self, pq, rq):
        self._pq = pq
        self._rq = rq
        super().__init__()
        print('++', self.name)

    def run(self):
        dat = self._pq.get()

        while not dat is _ProcSTOP:
#            self._rq.put(dat)        # uncomment me for deadlock
            self._pq.task_done()
            dat = self._pq.get()

        self._pq.task_done() 
        print('==', self.name)

    def __del__(self):
        print('--', self.name)

if __name__ == '__main__':

    pq = multiprocessing.JoinableQueue()
    rq = multiprocessing.JoinableQueue()
    pool = []

    for i in range(4):
        p = Proc(pq, rq) 
        p.start()
        pool.append(p)

    for i in range(10000):
        pq.put(i)

    pq.join()

    for i in range(4):
       pq.put(_ProcSTOP)

    pq.join()

    while len(pool) > 0:
        print('??', pool)
        pool.pop().join()    # hangs here (if using rq)

    print('** complete')

Пример вывода без использования очереди возврата:

++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-4
== Proc-3
== Proc-1
?? [, , , ]
== Proc-2
?? [, , ]
-- Proc-3
?? [, ]
-- Proc-2
?? []
-- Proc-1
** complete
-- Proc-4

Пример вывода с использованием очереди возврата:

++ Proc-1
++ Proc-2
++ Proc-3
++ Proc-4
== Proc-2
== Proc-4
== Proc-1
?? [, , , ]
== Proc-3
# here it hangs

6
задан tjm 6 November 2011 в 15:22
поделиться