сельдерей — вызов функции при выполнении задачи

Я использую celery с django и rabbitmq для создания очереди сообщений. У меня также есть рабочий, который происходит от другой машины. В представлении django я запускаю такой процесс:

def processtask(request, name):
  args = ["ls", "-l"]
  MyTask.delay(args)
  return HttpResponse("Task set to execute.")

Моя задача настроена следующим образом:

class MyTask(Task):
  def run(self, args):
    p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    (out, err) = p.communicate()
    return out

Теперь мой вопрос заключается в том, как брокер (мой проект django) теперь может получать вывод от «ls -l» команда, которую работник выполнил на своем компьютере. Я предполагаю, что лучше всего, если бы worker вызывал функцию в брокере всякий раз, когда он готов отправить вывод из выполненной команды.

Я хотел бы получить вывод от worker асинхронно, а затем обновить веб-страницу выводом, но об этом в другой раз. На данный момент я хотел бы получать только результат от worker.

Обновление

Сейчас я добавил HTTP-запрос GET, который запускается в конце задачи, уведомляя веб-приложение о том, что задача выполнена — я также отправляю task_id в http GET. Метод http GET вызывает представление django, которое создает AsyncResult и получает результат, но проблема в том, что при вызове result.get() я получаю следующую ошибку:

/usr/lib64/python2.6/site-packages/django_celery-2.5.1-py2.6.egg/djcelery/managers.py:178: TxIsolationWarning: Polling results with transaction isolation level repeatable-read within the same transaction may give outdated results. Be sure to commit the transaction for each poll iteration.
  "Polling results with transaction isolation level"

Есть идеи почему? Я не использую базу данных, потому что использую rabbitmq с AMQP.

Обновить.

Я бы очень хотел использовать третий вариант, который кажется лучшим вариантом - для малых и больших возвращаемых значений.Вся моя задача выглядит следующим образом:

class MyTask(Task):
  def __call__(self, *args, **kwargs):
    return self.run(*args, **kwargs)

  def after_return(self, status, retval, task_id, args, kwargs, einfo):
    if self.webhost is not None:
      conn = httplib.HTTPConnection(self.webhost, self.webport)
      conn.request("HEAD", "/vuln/task/output/"+task_id)

  def run(self, args, webhost=None, webport=None):
    self.webhost = webhost
    self.webport = webport
    r = "This is a basic result string used for code clarity"
    return r

Итак, я переопределил функцию after_return, которая также должна снять блокировку с моей задачи, поскольку функция run() задачи уже вернула значение. В запросе HEAD я в основном вызываю функцию django, которая вызывает AsyncResult для task_id, что должно предоставить результат задачи. В моем случае я использовал произвольный результат для целей тестирования, поскольку он предназначен только для тестирования.

Я хотел бы знать, почему приведенный выше код не работает. Я могу использовать on_success, но не думаю, что это будет иметь значение — или будет?

11
задан eleanor 11 March 2012 в 18:01
поделиться