Я использую сельдерей для запуска наборов задач, которые выглядят следующим образом:
Я объединяю результаты этих задач в один ответ, затем что-то делаю с этим ответом — например, сохраняю в базу данных, сохраняю в специальный файл результатов и так далее. По сути, после выполнения задач мне нужно вызвать функцию, которая имеет следующую сигнатуру:
def callback(result_file_name, task_result_list):
#сохранить в файле
обратный вызов def (entity_key, task_result_list):
#хранить в БД
На данный момент шаг 1 выполняется в очереди Celery, а шаг 2 выполняется вне celery:
tasks = []
# add taksks to tasks list
task_group = group()
task_group.tasks = tasks
result = task_group.apply_async()
res = result.join()
# Aggregate results
# Save results to file, database whatever
Этот подход громоздок, так как мне приходится останавливать один поток, пока не будут выполнены все задачи (что может занять пару часов).
Я хотел бы каким-то образом перенести шаг 2 и на сельдерей --- по сути, мне нужно было бы добавить обратный вызов ко всему набору задач (насколько я знаю, он не поддерживается в Celery) или отправить задачу, которая выполняется после всех этих подзадачи.
Кто-нибудь знает, как это сделать? Я использую его в среде django, поэтому я могу хранить некоторое состояние в базе данных.
Я не могу использовать аккорды напрямую, потому что аккорды позволяют мне создавать обратные вызовы, которые выглядят следующим образом:
def callback(task_result_list):
#store in file
нет очевидного способа передать дополнительные параметры для обратного вызова (особенно потому, что эти обратные вызовы не могут быть локальными функциями).
я могу сохранять результаты, используя TaskSetMeta
, но у этого объекта нет поля статуса --- поэтому, даже если бы я добавил сигнал в TaskSetMeta, мне пришлось бы объединять результаты задачи которые могут иметь значительные накладные расходы.