колба сельдерея получить задание статус задачи в реальном времени [дубликат]

Вот решение

MAIL_DRIVER = 'smtp

MAIL_HOST =' smtp.gmail.com '

MAIL_PORT = 587

MAIL_USERNAME='sample@gmail.com '

MAIL_FROM_ADDRESS='sample@gmail.com'

MAIL_FROM_NAME = 'Некоторое имя'

MAIL_PASSWORD = ' XXXXX '

MAIL_ENCRYPTION =' tls '

68
задан Marcin 27 January 2012 в 15:49
поделиться

9 ответов

Каждый объект Task имеет свойство .request, которое содержит объект AsyncRequest. Соответственно, следующая строка дает состояние задачи task:

task.AsyncResult(task.request.id).state
51
ответ дан Marcin 26 August 2018 в 10:54
поделиться

Старый вопрос, но я недавно столкнулся с этой проблемой.

Если вы пытаетесь получить task_id, вы можете сделать это следующим образом:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

Теперь вы точно знаете, что такое task_id, и теперь можете использовать его для получения AsyncResult :

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4
7
ответ дан Cesar Rios 26 August 2018 в 10:54
поделиться

Верните task_id (который указан из .delay ()) и спросите экземпляр celery после этого о состоянии:

x = method.delay(1,2)
print x.task_id

При запросе получите новый AsyncResult, используя этот task_id:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
72
ответ дан Gregor 26 August 2018 в 10:54
поделиться

Создание объекта AsyncResult из идентификатора задачи является способом, рекомендованным в FAQ , чтобы получить статус задачи, когда единственное, что у вас есть, - это идентификатор задачи.

Однако, с Celery 3.x, есть существенные оговорки, которые могут укусить людей, если они не обратят на них внимания. Это действительно зависит от конкретного сценария использования.

По умолчанию сельдерей не записывает «запущенное» состояние.

Чтобы сельдерей мог записывать, что задача запущена, вы должны установить task_track_started на True. Вот простая задача, которая проверяет это:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

Когда task_track_started - False, по умолчанию это показ состояния PENDING, даже если задача запущена. Если вы установите task_track_started на True, тогда состояние будет STARTED.

Состояние PENDING означает «Я не знаю».

AsyncResult с состоянием PENDING не означает ничего, кроме того, что сельдерей не знает статуса задачи. Это может быть связано с любым числом причин.

С одной стороны, AsyncResult может быть сконструирован с недопустимыми идентификаторами задач. Такие «задачи» будут считаться ожидаемыми сельдерей:

>>> task.AsyncResult("invalid").status
'PENDING'

Хорошо, поэтому никто не собирается кормить , очевидно , недопустимыми идентификаторами AsyncResult. Достаточно справедливо, но также имеет эффект, что AsyncResult также рассмотрит задачу, которая успешно выполняется, но что Сельдерей забыл как PENDING. Опять же, в некоторых сценариях использования это может быть проблемой. Часть проблемы зависит от того, как Celery настроен на сохранение результатов задач, потому что это зависит от наличия «надгробных камней» в базе данных результатов. («Надгробные плиты» - это термин, используемый в документации по сельдеру для блоков данных, которые записывают, как закончилась задача.) Использование AsyncResult не будет работать вообще, если task_ignore_result - True. Более неприятная проблема заключается в том, что сельдерей по умолчанию разрушает надгробные плиты. По умолчанию установка result_expires установлена ​​на 24 часа. Поэтому, если вы запускаете задачу и записываете идентификатор в долгосрочном хранилище, а более 24 часов спустя, вы создаете с ним AsyncResult, статус будет PENDING.

Все «реальные задачи» начинаются в состоянии PENDING. Поэтому получение PENDING в задаче может означать, что задача была запрошена, но никогда не продвигалась дальше этого (по какой-либо причине). Или это могло означать, что задача была выполнена, но Сельдерей забыл о своем состоянии.

Ой! AsyncResult не будет работать для меня. Что еще я могу сделать?

Я предпочитаю отслеживать цели , чем отслеживать сами задачи . Я сохраняю некоторую информацию о задаче, но она действительно является вторичной для отслеживания целей. Цели хранятся в хранилище независимо от Сельдерея. Когда запрос должен выполнять вычисления, зависит от достижения определенной цели, он проверяет, была ли достигнута цель, если да, то она использует эту кешированную цель, в противном случае она запускает задачу, которая будет влиять на цель, и отправляет клиент, который сделал HTTP-запрос ответом, который указывает, что он должен ждать результата.


Названия переменных и гиперссылок выше для Celery 4.x. В 3.x соответствующие переменные и гиперссылки: CELERY_TRACK_STARTED , CELERY_IGNORE_RESULT , CELERY_TASK_RESULT_EXPIRES .

35
ответ дан Louis 26 August 2018 в 10:54
поделиться

Вы также можете создавать пользовательские состояния и обновлять выполнение задания на выполнение задания значения. Этот пример из docs:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

14
ответ дан msangel 26 August 2018 в 10:54
поделиться

Помимо выше Программный подход Использование функции «Цветочная задача» можно легко увидеть.

Мониторинг в режиме реального времени с использованием событий Celery. Цветок - это веб-инструмент для мониторинга и администрирования кластеров сельдерея.

  1. Выполнение задачи и история
  2. Возможность отображения сведений о задачах (аргументы, время начала, время выполнения и т. Д.)
  3. Графы и статистика

Официальный документ: Цветок - инструмент для мониторинга сельдерея

Установка:

$ pip install flower

Использование:

http://localhost:5555
1
ответ дан Roshan Bagdiya 26 August 2018 в 10:54
поделиться

Попробуйте:

task.AsyncResult(task.request.id).state

это обеспечит статус задачи Сельдерея. Если Задача Сельдерея уже находится в состоянии FAILURE, она выкинет исключение:

raised unexpected: KeyError('exc_type',)

0
ответ дан spicyramen 26 August 2018 в 10:54
поделиться

для простых задач, мы можем использовать http://flower.readthedocs.io/en/latest/screenshots.html и http://policystat.github.io/jobtastic/ для мониторинга.

и для сложных задач, скажем, задача, которая имеет дело с большим количеством других модулей. Мы рекомендуем вручную записывать прогресс и сообщение на конкретном блоке задач.

0
ответ дан taotao.li 26 August 2018 в 10:54
поделиться

Я нашел полезную информацию в

Руководстве по работе с сельдерейщиками, проверяющих работников

В моем случае я проверяю, работает ли сельдерей ,

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

Вы можете играть с проверкой, чтобы получить ваши потребности.

0
ответ дан zerocog 26 August 2018 в 10:54
поделиться
Другие вопросы по тегам:

Похожие вопросы: