Какой лучший шаблон для разработки асинхронного приложения RPC с использованием Python, Pika и AMQP?

Модуль продюсера моего приложения запускается пользователями, которые хотят представить работу, которые необходимо выполнить в небольшом кластере. Он отправляет подписки в форме JSON через Broker Mankitmq Message.

Я попробовал несколько стратегий, и лучшее до сих пор является следующим, что еще не полностью работает:

Каждый кластерный аппарат запускает потребительский модуль, который подписывается в очередь AMQP и выдает Prefetch_count Чтобы рассказать брокеру, сколько задач может работать одновременно.

Я смог сделать это работать с помощью SelectConnection из библиотеки Pika AMQP. Как потребитель, так и производитель начинают два канала, которые подключены к каждой очереди. Производитель отправляет запросы на канал [A] и ждет ответов в канале [b], а потребитель ждет запрос на канал [a] и отправлять ответы на канал [b]. Похоже, однако, что, когда потребитель запускает обратный вызов, который рассчитывает ответ, он блокирует, поэтому у меня есть только одна задача, выполняемая у каждого потребителя.

Что мне нужно в конце:

  1. Потребитель [A] подписывает его задачи (примерно на 5К каждый раз) к кластеру
  2. диспетчеризации брокеров n сообщений / запросов для каждого потребителя, где n - количество Одновременные задачи могут обрабатывать
  3. , когда завершено одно задание, потребитель отвечает на брокер / производитель с результатом
  4. , производитель получает ответы, обновляет статус вычисления и, в конце концов, печатает некоторые отчеты

Ограничения:

  • Если другой пользователь представляет работу, все его задачи будут в очереди после предыдущего пользователя (я думаю, это автоматически верно из системы очередей, но я не думал о последствиях на резьбовой среде)
  • Задачи имеют приказ, который будет отправлен, но заказ, который они отвечали, не важно

Обновление

Я изучал немного дальше, и моя фактическая проблема, кажется, я использую простую функцию в качестве обратного вызова в Pika Функция SelectConnection.Channel.basic_consume (). Моя последняя (нерепленная) идея состоит в том, чтобы пройти функцию резьбы, а не регулярно, поэтому обратный вызов не будет блокировать, а потребитель может продолжать слушать.

9
задан guhcampos 13 September 2011 в 15:26
поделиться