После запуска базового примера для flask-celery (насколько я могу судить, работает нормально) я пытаюсь интегрировать его в свой собственный проект. В основном, я использую это ниже:
from flask import Blueprint, jsonify, request, session
from flask.views import MethodView
from celery.decorators import task
blueprint = Blueprint('myapi', __name__)
class MyAPI(MethodView):
def get(self, tag):
return get_resource.apply_async(tag)
@task(name="get_task")
def get_resource(tag):
pass
с той же настройкой, что и в примере, я получаю эту ошибку:
Traceback (most recent call last):
File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1518, in __call__
return self.wsgi_app(environ, start_response)
File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1506, in wsgi_app
response = self.make_response(self.handle_exception(e))
File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1504, in wsgi_app
response = self.full_dispatch_request()
File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1264, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1262, in full_dispatch_request
rv = self.dispatch_request()
File "/x/venv/lib/python2.7/site-packages/flask/app.py", line 1248, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/x/venv/lib/python2.7/site-packages/flask/views.py", line 84, in view
return self.dispatch_request(*args, **kwargs)
File "/x/venv/lib/python2.7/site-packages/flask/views.py", line 151, in dispatch_request
return meth(*args, **kwargs)
File "/x/api/modules/document/document.py", line 14, in get
return get_resource.apply_async(tag)
File "/x/venv/lib/python2.7/site-packages/celery/app/task/__init__.py", line 449, in apply_async
publish = publisher or self.app.amqp.publisher_pool.acquire(block=True)
File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 657, in acquire
R = self.prepare(R)
File "/x/venv/lib/python2.7/site-packages/kombu/pools.py", line 54, in prepare
p = p()
File "/x/venv/lib/python2.7/site-packages/kombu/pools.py", line 45, in
return lambda: self.create_producer()
File "/x/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 265, in create_producer
pub = self.app.amqp.TaskPublisher(conn, auto_declare=False)
File "/x/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 328, in TaskPublisher
return TaskPublisher(*args, **self.app.merge(defaults, kwargs))
File "/x/venv/lib/python2.7/site-packages/celery/app/amqp.py", line 158, in __init__
super(TaskPublisher, self).__init__(*args, **kwargs)
File "/x/venv/lib/python2.7/site-packages/kombu/compat.py", line 61, in __init__
super(Publisher, self).__init__(connection, self.exchange, **kwargs)
File "/x/venv/lib/python2.7/site-packages/kombu/messaging.py", line 79, in __init__
self.revive(self.channel)
File "/x/venv/lib/python2.7/site-packages/kombu/messaging.py", line 168, in revive
channel = channel.default_channel
File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 581, in default_channel
self.connection
File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 574, in connection
self._connection = self._establish_connection()
File "/x/venv/lib/python2.7/site-packages/kombu/connection.py", line 533, in _establish_connection
conn = self.transport.establish_connection()
File "/x/venv/lib/python2.7/site-packages/kombu/transport/amqplib.py", line 279, in establish_connection
connect_timeout=conninfo.connect_timeout)
File "/x/venv/lib/python2.7/site-packages/kombu/transport/amqplib.py", line 89, in __init__
super(Connection, self).__init__(*args, **kwargs)
File "/x/venv/lib/python2.7/site-packages/amqplib/client_0_8/connection.py", line 129, in __init__
self.transport = create_transport(host, connect_timeout, ssl)
File "/x/venv/lib/python2.7/site-packages/amqplib/client_0_8/transport.py", line 281, in create_transport
return TCPTransport(host, connect_timeout)
File "/x/venv/lib/python2.7/site-packages/amqplib/client_0_8/transport.py", line 85, in __init__
raise socket.error, msg
error: [Errno 111] Connection refused
-->
Я использую Redis, и если я устанавливаю rabbitmq, я получаю другую ошибку, но я делаю не понимаю этого прямо сейчас - брокер должен быть redis, но он его не находит или что? Может ли кто-нибудь дать мне больше информации о том, что здесь происходит? Нужно ли мне импортировать что-то еще и т. д. Дело в том, что кроме примера с голыми костями очень мало, и это не имеет для меня смысла.
Максимум, что я смог определить, так это то, что в модуле API нет доступа к «сельдерее», и когда он пытается поместить туда данные, когда на уровне приложения, сельдерей там попадает в некоторые значения по умолчанию. , которые не установлены, потому что я указываю на Redis. Просто предположение. Я не смог импортировать информацию в модуль, только определил, что вызов чего-либо «сельдерей» (например, вывод celery.conf) из приложения вызывает ошибку, хотя я мог импортировать celery.task.
Это конфигурация брокера, используемая приложением, прямо из примера:
BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = "redis"
CELERY_REDIS_HOST = "localhost"
CELERY_REDIS_PORT = 6379
CELERY_REDIS_DB = 0
РЕДАКТИРОВАТЬ:
Если вы хотите увидеть демонстрацию: https://github.com/thrisp/flask -celery-example
КАК оказывается, наличие BROKER_TRANSPORT = 'redis' в ваших настройках важно для того, что я передаю (для настройки, которую я указал здесь и в примере с git), я' Я не совсем уверен, почему его нет в битах примера, но он есть в тех, которые я добавил, но он есть - без этого он хочет сбросить все в очередь ampq по умолчанию.
РЕДАКТИРОВАТЬ 2:
Кроме того, это довольно большое дело, поскольку использование следующей версии Celery упрощает 10 000 проблем при использовании его с Flask, что делает все это ненужным.