Я пытаюсь смоделировать сеть приложений, работающих с использованием Twisted. В рамках моего моделирования я хотел бы синхронизировать определенные события и иметь возможность передавать каждому процессу большие объемы данных. Я решил использовать многопроцессорные события и очереди. Однако мои процессы зависают.
Я написал пример кода ниже, чтобы проиллюстрировать проблему. В частности, (примерно в 95% случаев на моей машине с песчаным мостом ), функция «выполнить _в потоке _» завершается, однако обратный вызов «печать _сделана» не выполняется. вызывается до тех пор, пока я не нажму Ctrl -C.
Кроме того, я могу изменить несколько вещей в примере кода, чтобы сделать эту работу более надежной, например, :уменьшить количество порожденных процессов, вызвать self.ready.set из реактора _ready или изменить задержку deferLater.
Я предполагаю, что где-то между искривленным реактором и блокировкой многопроцессорных вызовов, таких как Queue.get ()или Event.wait (), существует состояние гонки?
В чем именно проблема, с которой я столкнулся? Есть ли ошибка в моем коде, которую мне не хватает? Могу ли я это исправить или это скрученная несовместимость с многопроцессорными событиями/очередями?
Во-вторых, будет ли что-то вроде spawnProcess или Ampoule рекомендуемой альтернативой? (как предложено в Mix Python Twisted с многопроцессорностью?)
Редактирует (по запросу):
Я столкнулся с проблемами со всеми реакторами, которые пробовал glib2reactor, selectreactor, pollreactor и epollreactor. Кажется, что epollreactor дает наилучшие результаты и отлично работает для примера, приведенного ниже, но по-прежнему дает мне ту же (или аналогичную )проблему в моем приложении. Я продолжу расследование.
Я использую Gentoo Linux с ядром 3.3 и 3.4, Python 2.7 и пробовал Twisted 10.2.0, 11.0.0, 11.1.0, 12.0.0 и 12.1.0.
В дополнение к моей машине Sandy Bridge я вижу ту же проблему на моей двухъядерной машине AMD.
#!/usr/bin/python
# -*- coding: utf-8 *-*
from twisted.internet import reactor
from twisted.internet import threads
from twisted.internet import task
from multiprocessing import Process
from multiprocessing import Event
class TestA(Process):
def __init__(self):
super(TestA, self).__init__()
self.ready = Event()
self.ready.clear()
self.start()
def run(self):
reactor.callWhenRunning(self.reactor_ready)
reactor.run()
def reactor_ready(self, *args):
task.deferLater(reactor, 1, self.node_ready)
return args
def node_ready(self, *args):
print 'node_ready'
self.ready.set()
return args
def reactor_running():
print 'reactor_running'
df = threads.deferToThread(run_in_thread)
df.addCallback(print_done)
def run_in_thread():
print 'run_in_thread'
for n in processes:
n.ready.wait()
def print_done(dfResult=None):
print 'print_done'
reactor.stop()
if __name__ == '__main__':
processes = [TestA() for i in range(8)]
reactor.callWhenRunning(reactor_running)
reactor.run()