крутится несовместимость с многопроцессорностью событий и очередей?

Я пытаюсь смоделировать сеть приложений, работающих с использованием Twisted. В рамках моего моделирования я хотел бы синхронизировать определенные события и иметь возможность передавать каждому процессу большие объемы данных. Я решил использовать многопроцессорные события и очереди. Однако мои процессы зависают.

Я написал пример кода ниже, чтобы проиллюстрировать проблему. В частности, (примерно в 95% случаев на моей машине с песчаным мостом ), функция «выполнить _в потоке _» завершается, однако обратный вызов «печать _сделана» не выполняется. вызывается до тех пор, пока я не нажму Ctrl -C.

Кроме того, я могу изменить несколько вещей в примере кода, чтобы сделать эту работу более надежной, например, :уменьшить количество порожденных процессов, вызвать self.ready.set из реактора _ready или изменить задержку deferLater.

Я предполагаю, что где-то между искривленным реактором и блокировкой многопроцессорных вызовов, таких как Queue.get ()или Event.wait (), существует состояние гонки?

В чем именно проблема, с которой я столкнулся? Есть ли ошибка в моем коде, которую мне не хватает? Могу ли я это исправить или это скрученная несовместимость с многопроцессорными событиями/очередями?

Во-вторых, будет ли что-то вроде spawnProcess или Ampoule рекомендуемой альтернативой? (как предложено в Mix Python Twisted с многопроцессорностью?)

Редактирует (по запросу):

Я столкнулся с проблемами со всеми реакторами, которые пробовал glib2reactor, selectreactor, pollreactor и epollreactor. Кажется, что epollreactor дает наилучшие результаты и отлично работает для примера, приведенного ниже, но по-прежнему дает мне ту же (или аналогичную )проблему в моем приложении. Я продолжу расследование.

Я использую Gentoo Linux с ядром 3.3 и 3.4, Python 2.7 и пробовал Twisted 10.2.0, 11.0.0, 11.1.0, 12.0.0 и 12.1.0.

В дополнение к моей машине Sandy Bridge я вижу ту же проблему на моей двухъядерной машине AMD.

#!/usr/bin/python
# -*- coding: utf-8 *-*

from twisted.internet import reactor
from twisted.internet import threads
from twisted.internet import task

from multiprocessing import Process
from multiprocessing import Event

class TestA(Process):
    def __init__(self):
        super(TestA, self).__init__()
        self.ready = Event()
        self.ready.clear()
        self.start()

    def run(self):
        reactor.callWhenRunning(self.reactor_ready)
        reactor.run()

    def reactor_ready(self, *args):
        task.deferLater(reactor, 1, self.node_ready)
        return args

    def node_ready(self, *args):
        print 'node_ready'
        self.ready.set()
        return args

def reactor_running():
    print 'reactor_running'
    df = threads.deferToThread(run_in_thread)
    df.addCallback(print_done)

def run_in_thread():
    print 'run_in_thread'
    for n in processes:
        n.ready.wait()

def print_done(dfResult=None):
    print 'print_done'
    reactor.stop()

if __name__ == '__main__':
    processes = [TestA() for i in range(8)]
    reactor.callWhenRunning(reactor_running)
    reactor.run()

6
задан Community 23 May 2017 в 12:00
поделиться