Какой правильный способ обработки соединения Redis в Tornado? (Async - Pub/Sub)

Я использую Redis вместе с моим приложением Tornado с asyc клиентом Brukva, когда я посмотрел примеры приложений на сайте Brukva, они делают новое соединение на "init" метод в классе websocket

class MessagesCatcher(tornado.websocket.WebSocketHandler):
    def __init__(self, *args, **kwargs):
        super(MessagesCatcher, self).__init__(*args, **kwargs)
        self.client = brukva.Client()
        self.client.connect()
        self.client.subscribe('test_channel')

    def open(self):
        self.client.listen(self.on_message)

    def on_message(self, result):
        self.write_message(str(result.body))

    def close(self):
        self.client.unsubscribe('test_channel')
        self.client.disconnect()

это нормально в случае websocket, но как это обработать в обычном Tornado RequestHandler post метод скажем длинной операции опроса (модель publish-subscribe). Я создаю новое соединение с клиентом в каждом пост-методе обработчика обновлений, это правильный подход? Когда я проверяю в консоли redis, я вижу, что клиенты увеличиваются в каждой новой операции post.

enter image description here

Вот пример моего кода.

c = brukva.Client(host = '127.0.0.1')
c.connect()

class MessageNewHandler(BaseHandler):
    @tornado.web.authenticated
    def post(self):

        self.listing_id = self.get_argument("listing_id")
        message = {
            "id": str(uuid.uuid4()),
            "from": str(self.get_secure_cookie("username")),
            "body": str(self.get_argument("body")),
        }
        message["html"] = self.render_string("message.html", message=message)

        if self.get_argument("next", None):
            self.redirect(self.get_argument("next"))
        else:
            c.publish(self.listing_id, message)
            logging.info("Writing message : " + json.dumps(message))
            self.write(json.dumps(message))

    class MessageUpdatesHandler(BaseHandler):
        @tornado.web.authenticated
        @tornado.web.asynchronous
        def post(self):
            self.listing_id = self.get_argument("listing_id", None)
            self.client = brukva.Client()
            self.client.connect()
            self.client.subscribe(self.listing_id)
            self.client.listen(self.on_new_messages)

        def on_new_messages(self, messages):
            # Closed client connection
            if self.request.connection.stream.closed():
                return
            logging.info("Getting update : " + json.dumps(messages.body))
            self.finish(json.dumps(messages.body))
            self.client.unsubscribe(self.listing_id)


        def on_connection_close(self):
            # unsubscribe user from channel
            self.client.unsubscribe(self.listing_id)
            self.client.disconnect()

Я буду благодарен, если вы предоставите пример кода для аналогичного случая.

8
задан Burak Dede 11 December 2011 в 21:40
поделиться