Вот решение
MAIL_DRIVER = 'smtp
MAIL_HOST =' smtp.gmail.com '
MAIL_PORT = 587
MAIL_USERNAME='sample@gmail.com '
MAIL_FROM_ADDRESS='sample@gmail.com'
MAIL_FROM_NAME = 'Некоторое имя'
MAIL_PASSWORD = ' XXXXX '
MAIL_ENCRYPTION =' tls '
Каждый объект Task
имеет свойство .request
, которое содержит объект AsyncRequest
. Соответственно, следующая строка дает состояние задачи task
:
task.AsyncResult(task.request.id).state
Старый вопрос, но я недавно столкнулся с этой проблемой.
Если вы пытаетесь получить task_id, вы можете сделать это следующим образом:
import celery
from celery_app import add
from celery import uuid
task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)
Теперь вы точно знаете, что такое task_id, и теперь можете использовать его для получения AsyncResult :
# grab the AsyncResult
result = celery.result.AsyncResult(task_id)
# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf
# print the AsyncResult's status
print result.status
SUCCESS
# print the result returned
print result.result
4
Верните task_id (который указан из .delay ()) и спросите экземпляр celery после этого о состоянии:
x = method.delay(1,2)
print x.task_id
При запросе получите новый AsyncResult, используя этот task_id:
from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
Создание объекта AsyncResult
из идентификатора задачи является способом, рекомендованным в FAQ , чтобы получить статус задачи, когда единственное, что у вас есть, - это идентификатор задачи.
Однако, с Celery 3.x, есть существенные оговорки, которые могут укусить людей, если они не обратят на них внимания. Это действительно зависит от конкретного сценария использования.
Чтобы сельдерей мог записывать, что задача запущена, вы должны установить task_track_started
на True
. Вот простая задача, которая проверяет это:
@app.task(bind=True)
def test(self):
print self.AsyncResult(self.request.id).state
Когда task_track_started
- False
, по умолчанию это показ состояния PENDING
, даже если задача запущена. Если вы установите task_track_started
на True
, тогда состояние будет STARTED
.
PENDING
означает «Я не знаю». AsyncResult
с состоянием PENDING
не означает ничего, кроме того, что сельдерей не знает статуса задачи. Это может быть связано с любым числом причин.
С одной стороны, AsyncResult
может быть сконструирован с недопустимыми идентификаторами задач. Такие «задачи» будут считаться ожидаемыми сельдерей:
>>> task.AsyncResult("invalid").status
'PENDING'
Хорошо, поэтому никто не собирается кормить , очевидно , недопустимыми идентификаторами AsyncResult
. Достаточно справедливо, но также имеет эффект, что AsyncResult
также рассмотрит задачу, которая успешно выполняется, но что Сельдерей забыл как PENDING
. Опять же, в некоторых сценариях использования это может быть проблемой. Часть проблемы зависит от того, как Celery настроен на сохранение результатов задач, потому что это зависит от наличия «надгробных камней» в базе данных результатов. («Надгробные плиты» - это термин, используемый в документации по сельдеру для блоков данных, которые записывают, как закончилась задача.) Использование AsyncResult
не будет работать вообще, если task_ignore_result
- True
. Более неприятная проблема заключается в том, что сельдерей по умолчанию разрушает надгробные плиты. По умолчанию установка result_expires
установлена на 24 часа. Поэтому, если вы запускаете задачу и записываете идентификатор в долгосрочном хранилище, а более 24 часов спустя, вы создаете с ним AsyncResult
, статус будет PENDING
.
Все «реальные задачи» начинаются в состоянии PENDING
. Поэтому получение PENDING
в задаче может означать, что задача была запрошена, но никогда не продвигалась дальше этого (по какой-либо причине). Или это могло означать, что задача была выполнена, но Сельдерей забыл о своем состоянии.
AsyncResult
не будет работать для меня. Что еще я могу сделать? Я предпочитаю отслеживать цели , чем отслеживать сами задачи . Я сохраняю некоторую информацию о задаче, но она действительно является вторичной для отслеживания целей. Цели хранятся в хранилище независимо от Сельдерея. Когда запрос должен выполнять вычисления, зависит от достижения определенной цели, он проверяет, была ли достигнута цель, если да, то она использует эту кешированную цель, в противном случае она запускает задачу, которая будет влиять на цель, и отправляет клиент, который сделал HTTP-запрос ответом, который указывает, что он должен ждать результата.
Названия переменных и гиперссылок выше для Celery 4.x. В 3.x соответствующие переменные и гиперссылки: CELERY_TRACK_STARTED
, CELERY_IGNORE_RESULT
, CELERY_TASK_RESULT_EXPIRES
.
Вы также можете создавать пользовательские состояния и обновлять выполнение задания на выполнение задания значения. Этот пример из docs:
@app.task(bind=True)
def upload_files(self, filenames):
for i, file in enumerate(filenames):
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(filenames)})
http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states
Помимо выше Программный подход Использование функции «Цветочная задача» можно легко увидеть.
Мониторинг в режиме реального времени с использованием событий Celery. Цветок - это веб-инструмент для мониторинга и администрирования кластеров сельдерея.
Официальный документ: Цветок - инструмент для мониторинга сельдерея
Установка:
$ pip install flower
Использование:
http://localhost:5555
Попробуйте:
task.AsyncResult(task.request.id).state
это обеспечит статус задачи Сельдерея. Если Задача Сельдерея уже находится в состоянии FAILURE, она выкинет исключение:
raised unexpected: KeyError('exc_type',)
для простых задач, мы можем использовать http://flower.readthedocs.io/en/latest/screenshots.html и http://policystat.github.io/jobtastic/ для мониторинга.
и для сложных задач, скажем, задача, которая имеет дело с большим количеством других модулей. Мы рекомендуем вручную записывать прогресс и сообщение на конкретном блоке задач.
Я нашел полезную информацию в
Руководстве по работе с сельдерейщиками, проверяющих работников
В моем случае я проверяю, работает ли сельдерей ,
inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
state = 'FAILURE'
else:
state = str(task.state)
Вы можете играть с проверкой, чтобы получить ваши потребности.