Как мне реализовать обратный вызов для набора задач в celery

Вопрос

Я использую сельдерей для запуска наборов задач, которые выглядят следующим образом:

  1. Я выполняю пакет задач, которые можно запускать параллельно, количество задач в этом пакете варьируется от десятков до пары тысячи.
  2. Я объединяю результаты этих задач в один ответ, затем что-то делаю с этим ответом — например, сохраняю в базу данных, сохраняю в специальный файл результатов и так далее. По сути, после выполнения задач мне нужно вызвать функцию, которая имеет следующую сигнатуру:

    def callback(result_file_name, task_result_list):
     #сохранить в файле
    
    обратный вызов def (entity_key, task_result_list):
     #хранить в БД
    

На данный момент шаг 1 выполняется в очереди Celery, а шаг 2 выполняется вне celery:

    tasks = []

    # add taksks to tasks list 

    task_group = group()
    task_group.tasks = tasks

    result = task_group.apply_async()

    res = result.join()

    # Aggregate results 

    # Save results to file, database whatever

Этот подход громоздок, так как мне приходится останавливать один поток, пока не будут выполнены все задачи (что может занять пару часов).

Я хотел бы каким-то образом перенести шаг 2 и на сельдерей --- по сути, мне нужно было бы добавить обратный вызов ко всему набору задач (насколько я знаю, он не поддерживается в Celery) или отправить задачу, которая выполняется после всех этих подзадачи.

Кто-нибудь знает, как это сделать? Я использую его в среде django, поэтому я могу хранить некоторое состояние в базе данных.

Подводя итог моим недавним выводам

Аккорды не годятся

Я не могу использовать аккорды напрямую, потому что аккорды позволяют мне создавать обратные вызовы, которые выглядят следующим образом:

    def callback(task_result_list): 
        #store in file

нет очевидного способа передать дополнительные параметры для обратного вызова (особенно потому, что эти обратные вызовы не могут быть локальными функциями).

Используя базу данных,

я могу сохранять результаты, используя TaskSetMeta, но у этого объекта нет поля статуса --- поэтому, даже если бы я добавил сигнал в TaskSetMeta, мне пришлось бы объединять результаты задачи которые могут иметь значительные накладные расходы.

6
задан jb. 30 May 2012 в 18:38
поделиться