Ожидайте единственного сообщения RabbitMQ с тайм-аутом

Я хотел бы отправить сообщение на сервер RabbitMQ и затем ожидать ответного сообщения (на "ответе -" очереди). Конечно, я не хочу ожидать навсегда в случае, если приложение, обрабатывающее эти сообщения, снижается - должен быть тайм-аут. Это походит на очень простую задачу, все же я не могу найти способ сделать это. Я теперь столкнулся с этой проблемой и с py-amqplib и с клиентом.NET RabbitMQ.

Лучшее решение, которое я имею до сих пор, состоит в том, чтобы опросить использование basic_get с sleep промежуток, но это довольно ужасно:

def _wait_for_message_with_timeout(channel, queue_name, timeout):
    slept = 0
    sleep_interval = 0.1

    while slept < timeout:
        reply = channel.basic_get(queue_name)
        if reply is not None:
            return reply

        time.sleep(sleep_interval)
        slept += sleep_interval

    raise Exception('Timeout (%g seconds) expired while waiting for an MQ response.' % timeout)

Конечно, существует некоторый лучший путь?

7
задан EMP 10 May 2010 в 00:26
поделиться

4 ответа

Я только что добавил поддержку тайм-аута для amqplib в carrot .

Это подкласс amqplib.client0_8.Connection :

http://github.com/ask/carrot/blob/master/carrot/backends/pyamqplib.py#L19-97

wait_multi - это версия channel.wait , способная принимать на произвольном количестве каналов.

Я предполагаю, что в какой-то момент это может быть объединено в восходящем направлении.

8
ответ дан 6 December 2019 в 09:18
поделиться

Вот пример здесь с использованием qpid с msg = q.get (timeout = 1 ) , который должен делать то, что вы хотите. Извините, я не знаю, какие другие клиентские библиотеки AMQP реализуют тайм-ауты (и, в частности, я не знаю двух упомянутых вами конкретных).

2
ответ дан 6 December 2019 в 09:18
поделиться

Это, кажется, нарушает всю идею асинхронной обработки, но если вам нужно, я думаю, что правильный способ сделать это - использовать RpcClient.

1
ответ дан 6 December 2019 в 09:18
поделиться

Вот что я в итоге сделал в клиенте .NET:

protected byte[] WaitForMessageWithTimeout(string queueName, int timeoutMs)
{
    var consumer = new QueueingBasicConsumer(Channel);
    var tag = Channel.BasicConsume(queueName, true, null, consumer);
    try
    {
        object result;
        if (!consumer.Queue.Dequeue(timeoutMs, out result))
            throw new ApplicationException(string.Format("Timeout ({0} seconds) expired while waiting for an MQ response.", timeoutMs / 1000.0));

        return ((BasicDeliverEventArgs)result).Body;
    }
    finally
    {
        Channel.BasicCancel(tag);
    }
}

К сожалению, я не могу сделать то же самое с py-amqplib, потому что его метод basic_consume не вызывает обратный вызов, если вы не вызываете channel.wait () и channel.wait () не поддерживает тайм-ауты! Это глупое ограничение (с которым я постоянно сталкиваюсь) означает, что если вы никогда не получите другого сообщения, ваша цепочка будет заморожена навсегда.

8
ответ дан 6 December 2019 в 09:18
поделиться
Другие вопросы по тегам:

Похожие вопросы: