У меня есть проект Джанго, и я пытаюсь использовать Сельдерей, чтобы представить задачи для фоновой обработки (http://ask.github.com/celery/introduction.html). Сельдерей объединяется хорошо с Джанго, и я был в состоянии представить свои таможенные задачи и возвратить результаты.
Единственная проблема состоит в том, что я не могу найти нормальный способ выполнить таможенную инициализацию в процессе демона. Я должен вызвать дорогую функцию, которая загружает большую память, прежде чем я начну обрабатывать задачи, и я не могу позволить себе вызвать ту функцию каждый раз.
У кого-либо была эта проблема прежде? Какие-либо идеи, как работать вокруг этого, не изменяя исходный код Сельдерея?
Спасибо
Вы можете либо написать пользовательский загрузчик или использовать сигналы.
Погрузчики имеют метод on_Task_init
, который называется при выполнении задачи,
и on_worker_init
, который называется основным процессом Celery + Celerybeat.
Использование сигналов, вероятно, самые простые, доступные сигналы:
0.8.x:
task_prerun (task_id, task, args, kwargs)
Отправляется, когда задача собирается выполнить работника (или локально
При использовании применяется
/ или если CELERY_ALWAY_AGEGER
был установлен).
task_postrun (task_id, task, args, kwargs, retval)
Отправляется после того, как задача была выполнена в тех же условиях, что и выше.
Task_sent (task_id, task, args, kwargs, eta, tasketset)
Вызывается, когда задача применяется (не хорошо для долгосрочных операций)
Дополнительные сигналы, доступные в 0.9.x (текущая ветвь Master GitHub):
Working_init ()
Вызывается, когда Celeryd начался (до того, как задача инициализируется, так что если на
Поддержка системы Вилка
, любые изменения памяти будут скопированы на ребенка
рабочие процессы).
Worker_ready ()
Вызывается, когда Celeryd может получать задачи.
Worker_Shutdown ()
Вызывается, когда Celeryd выключается.
Вот пример докактурирования чего-то в первую очередь, выполняется задача в процессе:
from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun
_precalc_table = {}
class PowersOfTwo(Task):
def run(self, x):
if x in _precalc_table:
return _precalc_table[x]
else:
return x ** 2
tasks.register(PowersOfTwo)
def _precalc_numbers(**kwargs):
if not _precalc_table: # it's empty, so haven't been generated yet
for i in range(1024):
_precalc_table[i] = i ** 2
# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
Если вы хотите, чтобы функция была запускаться для всех задач, просто пропустите аргумент Sender
.