RabbitMQ, Pika и стратегия повторного подключения

Я использую Pika для обработки данных из RabbitMQ.Поскольку я столкнулся с различными проблемами, я решил написать небольшое тестовое приложение, чтобы посмотреть, как я могу обрабатывать разъединения.

Я написал это тестовое приложение, которое выполняет следующие действия:

  1. Подключиться к брокеру, повторить попытку до успешного завершения
  2. При подключении создать очередь.
  3. Использовать эту очередь и поместить результат в python Queue.Queue(0)
  4. Получить элемент из Queue.Queue(0) и вернуть его в очередь брокера.

Я заметил 2 проблемы:

  1. Когда я запускаю свой скрипт с одного хоста, подключающегося к rabbitmq на другом хосте (внутри виртуальной машины), этот скрипт завершается в случайные моменты без возникновения ошибки.
  2. Когда я запускаю свой сценарий на том же хосте, на котором установлен RabbitMQ, он работает нормально и продолжает работать.

Это может быть объяснено проблемами с сетью, пакеты теряются, хотя я считаю, что соединение не очень надежное.

Когда скрипт запускается локально на сервере RabbitMQ и я убиваю RabbitMQ, скрипт завершается с ошибкой: «ОШИБКА pika SelectConnection: Ошибка сокета на 3: 104»

Похоже, я не могу получить переподключение стратегия работает как надо. Может ли кто-нибудь взглянуть на код, чтобы увидеть, что я делаю неправильно?

Спасибо,

Джей

#!/bin/python
import logging
import threading
import Queue
import pika
from pika.reconnection_strategies import SimpleReconnectionStrategy
from pika.adapters import SelectConnection
import time
from threading import Lock

class Broker(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.logging = logging.getLogger(__name__)
        self.to_broker = Queue.Queue(0)
        self.from_broker = Queue.Queue(0)
        self.parameters = pika.ConnectionParameters(host='sandbox',heartbeat=True)
        self.srs = SimpleReconnectionStrategy()
        self.properties = pika.BasicProperties(delivery_mode=2)

        self.connection = None
        while True:
            try:
                self.connection = SelectConnection(self.parameters, self.on_connected,  reconnection_strategy=self.srs)
                break
            except Exception as err:
                self.logging.warning('Cant connect. Reason: %s' % err)
                time.sleep(1)

        self.daemon=True
    def run(self):
        while True:
            self.submitData(self.from_broker.get(block=True))
        pass
    def on_connected(self,connection):
        connection.channel(self.on_channel_open)
    def on_channel_open(self,new_channel):
        self.channel = new_channel
        self.channel.queue_declare(queue='sandbox', durable=True)
        self.channel.basic_consume(self.processData, queue='sandbox')    
    def processData(self, ch, method, properties, body):
        self.logging.info('Received data from broker')
        self.channel.basic_ack(delivery_tag=method.delivery_tag)
        self.from_broker.put(body)
    def submitData(self,data):
        self.logging.info('Submitting data to broker.')
        self.channel.basic_publish(exchange='',
                    routing_key='sandbox',
                    body=data,
                    properties=self.properties)
if __name__ == '__main__':
    format=('%(asctime)s %(levelname)s %(name)s %(message)s')
    logging.basicConfig(level=logging.DEBUG, format=format)
    broker=Broker()
    broker.start()
    try:
        broker.connection.ioloop.start()
    except Exception as err:
        print err
15
задан jay_t 29 February 2012 в 23:21
поделиться