Я пытаюсь использовать Celery в качестве канала управления для приложения Twisted. My Twisted приложение - это уровень абстракции, который предоставляет стандартный интерфейс для различных локально запущенных процессов (через ProcessProtocol). Я хотел бы использовать Celery для управления этим удаленно - AMQP кажется идеальным методом управления многими приложениями Twisted из центра, и я хотел бы воспользоваться преимуществами функций Celery, основанных на задачах , например, повторные попытки задачи, подзадачи и т. д.
Это не работает, как я планировал, и я надеюсь, что кто-то может помочь мне указать в правильном направлении, чтобы это сработало.
При запуске сценария я пытаюсь добиться следующего:
' Слегка измененный celeryd '- это celeryd с небольшая модификация, которая позволяет задачам получать доступ к Twisted реактору через self.app.twisted и порожденному процессу через self.app.process. Чтобы не усложнять задачу, я использую реализацию «соло» пула процессов Celery, которая не разветвляет новый процесс для рабочих задач.
Моя проблема возникает, когда я пытаюсь использовать задачу Celery для инициализации ProcessProtocol (т. Е. Запуска внешнего процесса). Процесс запускается правильно, но childDataReceived ProcessProtocol никогда не вызывается. Я думаю, это связано с тем, что дескрипторы файлов не наследуются / не устанавливаются правильно.
Ниже приведен пример кода, основанный на примере wc в документации ProcessProtocol. Он включает в себя две задачи Celery - одну для запуска процесса wc, а другую для подсчета слов в некотором тексте (с использованием ранее запущенного процесса wc).
Этот пример довольно надуманный, но если я смогу заставить его работать, он послужит хорошей отправной точкой для реализации моих протоколов ProcessProtocols, которые представляют собой длительные процессы, которые будут отвечать на команды, записанные в stdin.
Я тестирую это, сначала запустив демон Celery:
python2.6 mycelery.py -l info -P solo
Затем в другом окне выполняется сценарий, который отправляет две задачи:
python2.6 command_test.py
Ожидаемое поведение command_test.py - выполнение двух команд: один запускает процесс wc, а другой отправляет текст в CountWordsTask. На самом деле происходит следующее:
Может ли кто-нибудь пролить свет на это или предложить несколько советов о том, как лучше всего использовать Celery в качестве канала управления для Twisted ProcessProtocols?
Было бы лучше написать реализацию ProcessPool с поддержкой Twisted-backed для Celery? Является ли мой метод вызова WorkerCommand.execute_from_commandline через «response.call» правильным подходом, чтобы гарантировать, что все происходит в потоке Twisted?
Я читал об AMPoule, который, как мне кажется, может предоставить некоторые из этих функций, но хотел бы придерживаться Celery если возможно, так как я использую его в других частях своего приложения.
Любая помощь будет принята с благодарностью!
from functools import partial
from celery.app import App
from celery.bin.celeryd import WorkerCommand
from twisted.internet import reactor
class MyCeleryApp(App):
def __init__(self, twisted, *args, **kwargs):
self.twisted = twisted
super(MyCeleryApp, self).__init__(*args, **kwargs)
def main():
get_my_app = partial(MyCeleryApp, reactor)
worker = WorkerCommand(get_app=get_my_app)
reactor.callLater(1, worker.execute_from_commandline)
reactor.run()
if __name__ == '__main__':
main()
from twisted.internet import protocol
from twisted.internet.defer import Deferred
class WCProcessProtocol(protocol.ProcessProtocol):
def __init__(self, text):
self.text = text
self._waiting = {} # Dict to contain deferreds, keyed by command name
def connectionMade(self):
if 'startup' in self._waiting:
self._waiting['startup'].callback('process started')
def outReceived(self, data):
fieldLength = len(data) / 3
lines = int(data[:fieldLength])
words = int(data[fieldLength:fieldLength*2])
chars = int(data[fieldLength*2:])
self.transport.loseConnection()
self.receiveCounts(lines, words, chars)
if 'countWords' in self._waiting:
self._waiting['countWords'].callback(words)
def processExited(self, status):
print 'exiting'
def receiveCounts(self, lines, words, chars):
print >> sys.stderr, 'Received counts from wc.'
print >> sys.stderr, 'Lines:', lines
print >> sys.stderr, 'Words:', words
print >> sys.stderr, 'Characters:', chars
def countWords(self, text):
self._waiting['countWords'] = Deferred()
self.transport.write(text)
return self._waiting['countWords']
from celery.task import Task
from protocol import WCProcessProtocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor
class StartProcTask(Task):
def run(self):
self.app.proc = WCProcessProtocol('testing')
self.app.proc._waiting['startup'] = Deferred()
self.app.twisted.spawnProcess(self.app.proc,
'wc',
['wc'],
usePTY=True)
return self.app.proc._waiting['startup']
class CountWordsTask(Task):
def run(self):
return self.app.proc.countWords('test test')