Использование Tornado с Pika для асинхронного мониторинга очередей

У меня есть сервер AMQP ( RabbitMQ ), который я хотел бы публиковать и читать на веб-сервере Tornado . Для этого , Я решил использовать асинхронную библиотеку python amqp, в частности Pika (ее вариант, который предположительно поддерживает Tornado).

Я написал код, который, похоже, успешно читает из очереди, за исключением того, что в конце запроса я получаю исключение (браузер возвращает нормально):

[E 101219 01:07:35 web:868] Uncaught exception GET / (127.0.0.1)
    HTTPRequest(protocol='http', host='localhost:5000', method='GET', uri='/', version='HTTP/1.1', remote_ip='127.0.0.1', remote_ip='127.0.0.1', body='', headers={'Host': 'localhost:5000', 'Accept-Language': 'en-us,en;q=0.5', 'Accept-Encoding': 'gzip,deflate', 'Keep-Alive': '115', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'User-Agent': 'Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.13) Gecko/20101206 Ubuntu/10.10 (maverick) Firefox/3.6.13', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=0', 'If-None-Match': '"58f554b64ed24495235171596351069588d0260e"'})
    Traceback (most recent call last):
      File "/home/dave/devel/lib/python2.6/site-packages/tornado/web.py", line 810, in _stack_context
        yield
      File "/home/dave/devel/lib/python2.6/site-packages/tornado/stack_context.py", line 77, in StackContext
        yield
      File "/usr/lib/python2.6/contextlib.py", line 113, in nested
        yield vars
      File "/home/dave/lib/python2.6/site-packages/tornado/stack_context.py", line 126, in wrapped
        callback(*args, **kwargs)
      File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 42, in _handle_events
        self._handle_read()
      File "/home/dave/devel/src/pika/pika/tornado_adapter.py", line 66, in _handle_read
        self.on_data_available(chunk)
      File "/home/dave/devel/src/pika/pika/connection.py", line 521, in on_data_available
        self.channels[frame.channel_number].frame_handler(frame)
    KeyError: 1

Я не совсем уверен, что использую эту библиотеку правильно, поэтому могу делать что-то явно не так. Основной поток моего кода:

  1. Запрос приходит
  2. Создать соединение с RabbitMQ с помощью TornadoConnection; указать обратный вызов
  3. В обратном вызове соединения создать канал, объявить / привязать мою очередь и вызвать basic_consume; укажите обратный вызов
  4. В обратном вызове потребления закройте канал и вызовите функцию завершения Tornado.
  5. См. исключение.

У меня несколько вопросов:

  1. Этот поток вообще правильный? Я не уверен, какова цель обратного вызова соединения, за исключением того, что он не работает, если я его не использую.
  2. Должен ли я создавать одно соединение AMQP для каждого веб-запроса? Документация RabbitMQ предполагает, что нет, Я не должен, а должен придерживаться только создания каналов. Но где мне создать соединение и как мне попытаться восстановить соединение, если оно ненадолго отключится?
  3. Если я создаю одно соединение AMQP для каждого веб-запроса, где я должен его закрыть? Вызов amqp.close () в моем обратном вызове, кажется, еще больше портит ситуацию.

Я попытаюсь привести пример кода немного позже, но шаги, описанные выше, довольно полно раскрывают потребляющую сторону вещей. У меня тоже есть проблемы с издательской стороной, но слишком много очередей.

8
задан dave mankoff 19 December 2010 в 14:45
поделиться