Может ли eventlet управлять соединением AMQP с сообщениями, передаваемыми асинхронно как на входе, так и на выходе?

Фактический дизайн:

Для тех, кто возвращается к этому вопросу, полезный ответ ниже подтолкнул меня к работоспособному дизайну, который работает нормально. Ключевыми моментами были три вывода:

  1. Eventlet - очень безопасная среда - если два гринлета одновременно попытаются recv () или оба попытаются send () из одного и того же сокета одновременно, то Eventlet элегантно убивает второй гринлет за исключением. Это великолепно и означает, что простые исключения, а также возможные ошибки перемежения данных, которые невозможно воспроизвести, будут результатом, если amqplib плохо «срастется».
  2. Методы amqplib делятся примерно на две группы: wait () циклы внутри recv () до тех пор, пока не будет собрано сообщение AMQP, а другие методы send () возвращает сообщения и не будет пытаться самостоятельно recv () .Это потрясающая удача, учитывая, что авторы amqplib понятия не имели, что кто-то попытается «озеленить» их библиотеку! Это означает, что отправка сообщений не только защищена от обратного вызова, вызываемого wait () , но также что сообщения могут быть безопасно отправлены из других гринлетов, которые полностью находятся вне контроля wait () петля. Вот эти безопасные методы, которые можно вызывать из любого гринлета, а не только из обратного вызова wait () :
    1. basic_ack
    2. basic_consume с nowait = True
    3. basic_publish
    4. basic_recover
    5. basic_reject
    6. exchange_declare с nowait = True
    7. exchange_delete с nowait = True
    8. queue_bind с nowait = True
    9. queue_unbind с nowait = True
    10. queue_declare с nowait = True
    11. queue_delete с с nowait = True
    12. queue_purge с nowait = True
  3. Семафоры могут использоваться в качестве блокировок: инициализировать семафор счетчиком 1 , а затем получить () и отпустите () , чтобы заблокировать и разблокировать. Все мои асинхронные гринлеты, которые хотят писать сообщения, могут использовать такую ​​блокировку, чтобы их отдельные вызовы send () не чередовались и не разрушали протокол AMQP.

Итак, мой код выглядит примерно так:

amqp = eventlet.patcher.import_patched('amqplib.client_0_8')

class Processor(object):
    def __init__(self):
        write_lock = eventlet.semaphore.Semaphore(1)

    def listening_greenlet(channel):
        # start this using eventlet.spawn_n()
        # create Connection and self.channel
        self.channel.basic_consume(queue, callback=self.consume)
        while True:
            self.channel.wait()

    def safe_publish(channel, *args, **kw):
        with write_lock:  # yes, Eventlet supports this!
            channel.basic_publish(*args, **kw)     

    def consume(message):
        # Returning immediately frees the wait() loop
        eventlet.spawn_n(self.process, message)

    def process(message):
        # do whatever I want
        # whenever I am done, I can async reply:
        self.safe_publish(...)

Наслаждайтесь!

Исходный вопрос:

Представьте себе сотни сообщений AMQP, поступающих каждую минуту в небольшое приложение Python Eventlet , каждое из которых требует обработки и ответа - где накладные расходы ЦП на обработку будут минимальными, но может включать ожидание ответов от других служб и сокетов.

Чтобы позволить, скажем, обрабатывать 100 сообщений одновременно, я, конечно, мог бы развернуть 100 отдельных TCP-соединений с RabbitMQ и иметь рабочего для каждого соединения, которое принимает, обрабатывает и отвечает на отдельные сообщения в режиме блокировки. Но чтобы сохранить TCP-соединения, я бы предпочел создать только одно соединение AMQP, разрешить RabbitMQ передавать сообщения по каналу ко мне на полной скорости, передавать эти задачи рабочим и отправлять ответы обратно, когда каждый рабочий завершает работу:

                                       +--------+
                                +------| worker | <-+
                                |      +--------+   |
                                |      +--------+   |
                                | +----| worker | <-+
                                | |    +--------+   |
                                | |    +--------+   |
                                | | +--| worker | <-+
                                | | |  +--------+   |
                                v v v               |
                           +------------+           |
 RabbitMQ <-AMQP-> socket--| dispatcher |-----------+
                           +------------+

Обратите внимание, что :

  • Очередь Eventlet может элегантно распределять входящую работу между рабочими по мере того, как они становятся доступными для дополнительной работы.
  • Возможно даже управление потоком из RabbitMQ: я могу ACK-сообщения только до тех пор, пока все мои рабочие не будут заняты, а затем ждать перед отправкой следующих ACK, пока очередь не начнет опустошаться.
  • Работа почти наверняка будет завершена не по порядку: один запрос может завершиться быстро, в то время как другое событие, пришедшее раньше, займет гораздо больше времени; а некоторые запросы могут вообще никогда не завершиться; поэтому рабочие будут возвращать ответы в непредсказуемом и асинхронном порядке.

Я планировал написать это с помощью Eventlet и py-amqplib после того, как увидел это привлекательное сообщение в блоге о том, как легко эту библиотеку AMQP можно включить в модель обработки Eventlet:

http: // blog.eventlet.net/2010/02/09/multiple-concurrent-connections-with-py-amqplib-and-eventlet/

Моя проблема в том, что, прочитав документацию для обеих библиотек, исходный код amqplib и большая часть исходного кода Eventlet,Я не могу понять, как я могу научить eventlet, которому принадлежит соединение AMQP - eventlet с именем connect_to_host () в сообщении в блоге - также просыпаться, когда рабочий завершает свою работу и генерирует ответ. Метод wait () в amqplib может быть разбужен только активностью в сокете AMQP. Хотя мне кажется, что я должен иметь возможность, чтобы рабочие записывали свои ответы в очередь, и чтобы событие connect_to_host () просыпалось либо , когда приходит новое входящее сообщение ] или , когда рабочий готов с ответом для отправки, я не могу найти способ, чтобы эвентлет сказал «разбуди меня, когда либо из этих событий произойдет».

Это действительно пришло в голову. мне, что рабочие могут попытаться присвоить объект соединения AMQP - или даже необработанный сокет - и записать свои собственные сообщения обратно через TCP; но кажется, что блокировки были бы необходимы, чтобы предотвратить чередование исходящих рабочих сообщений друг с другом или с сообщениями ACK, написанными основным событием прослушивателя, и я также не могу найти, где блокировки доступны в Eventlet.

Все это дает мне почти уверенность в том, что я пытаюсь решить эту проблему каким-то образом в обратном направлении.Неужели такая проблема - разрешение безопасного совместного использования одного соединения между слушателем-диспетчером и множеством рабочих процессов - просто не соответствует модели сопрограмм и требует полноценной асинхронной библиотеки? (В каком случае: есть ли тот, который вы бы порекомендовали для этой проблемы, и как будет происходить мультиплексирование между входящими сообщениями и исходящими ответами рабочих? Сегодня я не нашел чистого решения, попробовав такие комбинации, как Pika + ioloop - хотя я только что видел другой библиотека, stformed_amqp , которая могла бы работать лучше, чем это сделала Pika.) Или мне действительно нужно прибегать к реальным живым потокам Python, если мне нужен чистый и поддерживаемый код, который может реализовать эту модель? Я открыт для всех вариантов.

Спасибо за любую помощь или идеи! Я продолжаю думать, что у меня в значительной степени не работает весь параллелизм в Python, а затем я снова узнаю, что это не так. :) И я надеюсь, что в любом случае вам понравился рисунок ASCII выше.

8
задан Brandon Rhodes 3 November 2011 в 17:04
поделиться