Установка периодических задач в Celery (celerybeat) динамически с помощью add_periodic_task

Я использую Celery 4.0.1 с Django 1.10, и у меня возникают проблемы с планированием задач (выполнение задачи работает нормально). Вот конфигурация сельдерея:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

app.conf.BROKER_URL = 'amqp://{}:{}@{}'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST)
app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery'
app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default'
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_IGNORE_RESULT = True
app.conf.CELERY_DISABLE_RATE_LIMITS = True
app.conf.BROKER_POOL_LIMIT = 2

app.conf.CELERY_QUEUES = (
    Queue('myapp.celery_default'),
    Queue('myapp.queue1'),
    Queue('myapp.queue2'),
    Queue('myapp.queue3'),
)

Затем в tasks.py у меня есть:

@app.task(queue='myapp.queue1')
def my_task(some_id):
    print("Doing something with", some_id)

В views.py я хочу запланировать эту задачу:

def my_view(request, id):
    app.add_periodic_task(10, my_task.s(id))

Затем я выполняю команды:

sudo systemctl start rabbitmq.service
celery -A myapp.celery_app beat -l debug
celery worker -A myapp.celery_app

Но задание никогда не запланировано. Я ничего не вижу в журналах. Задача работает, потому что, на мой взгляд, я делаю:

def my_view(request, id):
    my_task.delay(id)

Задача выполнена.

Если в моем файле конфигурации, если я планирую задачу вручную, это работает следующим образом:

app.conf.CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.my_task',
        'schedule': 10.0,
        'args': (66,)
    },
}

Я просто не могу планировать задачу динамически. Есть идеи?

9
задан DhiaTN 22 December 2016 в 09:33
поделиться