Я использую celery с django и rabbitmq для создания очереди сообщений. У меня также есть рабочий, который происходит от другой машины. В представлении django я запускаю такой процесс:
def processtask(request, name):
args = ["ls", "-l"]
MyTask.delay(args)
return HttpResponse("Task set to execute.")
Моя задача настроена следующим образом:
class MyTask(Task):
def run(self, args):
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(out, err) = p.communicate()
return out
Теперь мой вопрос заключается в том, как брокер (мой проект django) теперь может получать вывод от «ls -l» команда, которую работник выполнил на своем компьютере. Я предполагаю, что лучше всего, если бы worker вызывал функцию в брокере всякий раз, когда он готов отправить вывод из выполненной команды.
Я хотел бы получить вывод от worker асинхронно, а затем обновить веб-страницу выводом, но об этом в другой раз. На данный момент я хотел бы получать только результат от worker.
Обновление
Сейчас я добавил HTTP-запрос GET, который запускается в конце задачи, уведомляя веб-приложение о том, что задача выполнена — я также отправляю task_id в http GET. Метод http GET вызывает представление django, которое создает AsyncResult и получает результат, но проблема в том, что при вызове result.get() я получаю следующую ошибку:
/usr/lib64/python2.6/site-packages/django_celery-2.5.1-py2.6.egg/djcelery/managers.py:178: TxIsolationWarning: Polling results with transaction isolation level repeatable-read within the same transaction may give outdated results. Be sure to commit the transaction for each poll iteration.
"Polling results with transaction isolation level"
Есть идеи почему? Я не использую базу данных, потому что использую rabbitmq с AMQP.
Обновить.
Я бы очень хотел использовать третий вариант, который кажется лучшим вариантом - для малых и больших возвращаемых значений.Вся моя задача выглядит следующим образом:
class MyTask(Task):
def __call__(self, *args, **kwargs):
return self.run(*args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
if self.webhost is not None:
conn = httplib.HTTPConnection(self.webhost, self.webport)
conn.request("HEAD", "/vuln/task/output/"+task_id)
def run(self, args, webhost=None, webport=None):
self.webhost = webhost
self.webport = webport
r = "This is a basic result string used for code clarity"
return r
Итак, я переопределил функцию after_return, которая также должна снять блокировку с моей задачи, поскольку функция run() задачи уже вернула значение. В запросе HEAD я в основном вызываю функцию django, которая вызывает AsyncResult для task_id, что должно предоставить результат задачи. В моем случае я использовал произвольный результат для целей тестирования, поскольку он предназначен только для тестирования.
Я хотел бы знать, почему приведенный выше код не работает. Я могу использовать on_success, но не думаю, что это будет иметь значение — или будет?