Использование Celery в качестве канала управления для приложений Twisted

Я пытаюсь использовать Celery в качестве канала управления для приложения Twisted. My Twisted приложение - это уровень абстракции, который предоставляет стандартный интерфейс для различных локально запущенных процессов (через ProcessProtocol). Я хотел бы использовать Celery для управления этим удаленно - AMQP кажется идеальным методом управления многими приложениями Twisted из центра, и я хотел бы воспользоваться преимуществами функций Celery, основанных на задачах , например, повторные попытки задачи, подзадачи и т. д.

Это не работает, как я планировал, и я надеюсь, что кто-то может помочь мне указать в правильном направлении, чтобы это сработало.

При запуске сценария я пытаюсь добиться следующего:

  • Запустить слегка измененный celeryd (см. ниже)
  • Ожидать задачи Celery
  • Когда запускается процесс 'задача получена, порождает ProcessProtocol
  • При получении других задач запускает функцию по протоколу Twisted и возвращает результат с помощью Deferreds

' Слегка измененный celeryd '- это celeryd с небольшая модификация, которая позволяет задачам получать доступ к Twisted реактору через self.app.twisted и порожденному процессу через self.app.process. Чтобы не усложнять задачу, я использую реализацию «соло» пула процессов Celery, которая не разветвляет новый процесс для рабочих задач.

Моя проблема возникает, когда я пытаюсь использовать задачу Celery для инициализации ProcessProtocol (т. Е. Запуска внешнего процесса). Процесс запускается правильно, но childDataReceived ProcessProtocol никогда не вызывается. Я думаю, это связано с тем, что дескрипторы файлов не наследуются / не устанавливаются правильно.

Ниже приведен пример кода, основанный на примере wc в документации ProcessProtocol. Он включает в себя две задачи Celery - одну для запуска процесса wc, а другую для подсчета слов в некотором тексте (с использованием ранее запущенного процесса wc).

Этот пример довольно надуманный, но если я смогу заставить его работать, он послужит хорошей отправной точкой для реализации моих протоколов ProcessProtocols, которые представляют собой длительные процессы, которые будут отвечать на команды, записанные в stdin.

Я тестирую это, сначала запустив демон Celery:

python2.6 mycelery.py -l info -P solo

Затем в другом окне выполняется сценарий, который отправляет две задачи:

python2.6 command_test.py

Ожидаемое поведение command_test.py - выполнение двух команд: один запускает процесс wc, а другой отправляет текст в CountWordsTask. На самом деле происходит следующее:

  • StartProcTask порождает процесс и получает «процесс запущен» в качестве ответа через Deffered
  • CountWordsTask никогда не получает результат, потому что childDataReceived никогда не вызывается

Может ли кто-нибудь пролить свет на это или предложить несколько советов о том, как лучше всего использовать Celery в качестве канала управления для Twisted ProcessProtocols?

Было бы лучше написать реализацию ProcessPool с поддержкой Twisted-backed для Celery? Является ли мой метод вызова WorkerCommand.execute_from_commandline через «response.call» правильным подходом, чтобы гарантировать, что все происходит в потоке Twisted?

Я читал об AMPoule, который, как мне кажется, может предоставить некоторые из этих функций, но хотел бы придерживаться Celery если возможно, так как я использую его в других частях своего приложения.

Любая помощь будет принята с благодарностью!

myceleryd.py

from functools import partial
from celery.app import App
from celery.bin.celeryd import WorkerCommand
from twisted.internet import reactor


class MyCeleryApp(App):
    def __init__(self, twisted, *args, **kwargs):
        self.twisted = twisted
        super(MyCeleryApp, self).__init__(*args, **kwargs)

def main():
    get_my_app = partial(MyCeleryApp, reactor)
    worker = WorkerCommand(get_app=get_my_app)
    reactor.callLater(1, worker.execute_from_commandline)
    reactor.run()

if __name__ == '__main__':
    main()

protocol.py

from twisted.internet import protocol
from twisted.internet.defer import Deferred

class WCProcessProtocol(protocol.ProcessProtocol):

    def __init__(self, text):
        self.text = text
        self._waiting = {} # Dict to contain deferreds, keyed by command name

    def connectionMade(self):
        if 'startup' in self._waiting:
            self._waiting['startup'].callback('process started')

    def outReceived(self, data):
        fieldLength = len(data) / 3
        lines = int(data[:fieldLength])
        words = int(data[fieldLength:fieldLength*2])
        chars = int(data[fieldLength*2:])
        self.transport.loseConnection()
        self.receiveCounts(lines, words, chars)

        if 'countWords' in self._waiting:
            self._waiting['countWords'].callback(words)

    def processExited(self, status):
        print 'exiting'


    def receiveCounts(self, lines, words, chars):
        print >> sys.stderr, 'Received counts from wc.'
        print >> sys.stderr, 'Lines:', lines
        print >> sys.stderr, 'Words:', words
        print >> sys.stderr, 'Characters:', chars

    def countWords(self, text):
        self._waiting['countWords'] = Deferred()
        self.transport.write(text)
        return self._waiting['countWords']

tasks.py

from celery.task import Task
from protocol import WCProcessProtocol
from twisted.internet.defer import Deferred
from twisted.internet import reactor

class StartProcTask(Task):
    def run(self):
        self.app.proc = WCProcessProtocol('testing')
        self.app.proc._waiting['startup'] = Deferred()
        self.app.twisted.spawnProcess(self.app.proc,
                                      'wc',
                                      ['wc'],
                                      usePTY=True)
        return self.app.proc._waiting['startup']

class CountWordsTask(Task):
    def run(self):
        return self.app.proc.countWords('test test')

11
задан Mike Ryan 15 November 2011 в 13:45
поделиться