Фактический дизайн:
Для тех, кто возвращается к этому вопросу, полезный ответ ниже подтолкнул меня к работоспособному дизайну, который работает нормально. Ключевыми моментами были три вывода:
recv ()
или оба попытаются send ()
из одного и того же сокета одновременно, то Eventlet элегантно убивает второй гринлет за исключением. Это великолепно и означает, что простые исключения, а также возможные ошибки перемежения данных, которые невозможно воспроизвести, будут результатом, если amqplib
плохо «срастется». amqplib
делятся примерно на две группы: wait ()
циклы внутри recv ()
до тех пор, пока не будет собрано сообщение AMQP, а другие методы send ()
возвращает сообщения и не будет пытаться самостоятельно recv ()
.Это потрясающая удача, учитывая, что авторы amqplib
понятия не имели, что кто-то попытается «озеленить» их библиотеку! Это означает, что отправка сообщений не только защищена от обратного вызова, вызываемого wait ()
, но также что сообщения могут быть безопасно отправлены из других гринлетов, которые полностью находятся вне контроля wait ()
петля. Вот эти безопасные методы, которые можно вызывать из любого гринлета, а не только из обратного вызова wait ()
:
basic_ack
basic_consume
с nowait = True
basic_publish
basic_recover
basic_reject
exchange_declare
с nowait = True
exchange_delete
с nowait = True
queue_bind
с nowait = True
queue_unbind
с nowait = True
queue_declare
с nowait = True
queue_delete с
с nowait = True
queue_purge
с nowait = True
1
, а затем получить ()
и отпустите ()
, чтобы заблокировать и разблокировать. Все мои асинхронные гринлеты, которые хотят писать сообщения, могут использовать такую блокировку, чтобы их отдельные вызовы send ()
не чередовались и не разрушали протокол AMQP.Итак, мой код выглядит примерно так:
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
class Processor(object):
def __init__(self):
write_lock = eventlet.semaphore.Semaphore(1)
def listening_greenlet(channel):
# start this using eventlet.spawn_n()
# create Connection and self.channel
self.channel.basic_consume(queue, callback=self.consume)
while True:
self.channel.wait()
def safe_publish(channel, *args, **kw):
with write_lock: # yes, Eventlet supports this!
channel.basic_publish(*args, **kw)
def consume(message):
# Returning immediately frees the wait() loop
eventlet.spawn_n(self.process, message)
def process(message):
# do whatever I want
# whenever I am done, I can async reply:
self.safe_publish(...)
Наслаждайтесь!
Исходный вопрос:
Представьте себе сотни сообщений AMQP, поступающих каждую минуту в небольшое приложение Python Eventlet , каждое из которых требует обработки и ответа - где накладные расходы ЦП на обработку будут минимальными, но может включать ожидание ответов от других служб и сокетов.
Чтобы позволить, скажем, обрабатывать 100 сообщений одновременно, я, конечно, мог бы развернуть 100 отдельных TCP-соединений с RabbitMQ и иметь рабочего для каждого соединения, которое принимает, обрабатывает и отвечает на отдельные сообщения в режиме блокировки. Но чтобы сохранить TCP-соединения, я бы предпочел создать только одно соединение AMQP, разрешить RabbitMQ передавать сообщения по каналу ко мне на полной скорости, передавать эти задачи рабочим и отправлять ответы обратно, когда каждый рабочий завершает работу:
+--------+
+------| worker | <-+
| +--------+ |
| +--------+ |
| +----| worker | <-+
| | +--------+ |
| | +--------+ |
| | +--| worker | <-+
| | | +--------+ |
v v v |
+------------+ |
RabbitMQ <-AMQP-> socket--| dispatcher |-----------+
+------------+
Обратите внимание, что :
Я планировал написать это с помощью Eventlet и py-amqplib после того, как увидел это привлекательное сообщение в блоге о том, как легко эту библиотеку AMQP можно включить в модель обработки Eventlet:
http: // blog.eventlet.net/2010/02/09/multiple-concurrent-connections-with-py-amqplib-and-eventlet/
Моя проблема в том, что, прочитав документацию для обеих библиотек, исходный код amqplib и большая часть исходного кода Eventlet,Я не могу понять, как я могу научить eventlet, которому принадлежит соединение AMQP - eventlet с именем connect_to_host ()
в сообщении в блоге - также просыпаться, когда рабочий завершает свою работу и генерирует ответ. Метод wait ()
в amqplib может быть разбужен только активностью в сокете AMQP. Хотя мне кажется, что я должен иметь возможность, чтобы рабочие записывали свои ответы в очередь, и чтобы событие connect_to_host ()
просыпалось либо , когда приходит новое входящее сообщение ] или , когда рабочий готов с ответом для отправки, я не могу найти способ, чтобы эвентлет сказал «разбуди меня, когда либо из этих событий произойдет».
Это действительно пришло в голову. мне, что рабочие могут попытаться присвоить объект соединения AMQP - или даже необработанный сокет - и записать свои собственные сообщения обратно через TCP; но кажется, что блокировки были бы необходимы, чтобы предотвратить чередование исходящих рабочих сообщений друг с другом или с сообщениями ACK, написанными основным событием прослушивателя, и я также не могу найти, где блокировки доступны в Eventlet.
Все это дает мне почти уверенность в том, что я пытаюсь решить эту проблему каким-то образом в обратном направлении.Неужели такая проблема - разрешение безопасного совместного использования одного соединения между слушателем-диспетчером и множеством рабочих процессов - просто не соответствует модели сопрограмм и требует полноценной асинхронной библиотеки? (В каком случае: есть ли тот, который вы бы порекомендовали для этой проблемы, и как будет происходить мультиплексирование между входящими сообщениями и исходящими ответами рабочих? Сегодня я не нашел чистого решения, попробовав такие комбинации, как Pika + ioloop - хотя я только что видел другой библиотека, stformed_amqp , которая могла бы работать лучше, чем это сделала Pika.) Или мне действительно нужно прибегать к реальным живым потокам Python, если мне нужен чистый и поддерживаемый код, который может реализовать эту модель? Я открыт для всех вариантов.
Спасибо за любую помощь или идеи! Я продолжаю думать, что у меня в значительной степени не работает весь параллелизм в Python, а затем я снова узнаю, что это не так. :) И я надеюсь, что в любом случае вам понравился рисунок ASCII выше.