Модуль продюсера моего приложения запускается пользователями, которые хотят представить работу, которые необходимо выполнить в небольшом кластере. Он отправляет подписки в форме JSON через Broker Mankitmq Message.
Я попробовал несколько стратегий, и лучшее до сих пор является следующим, что еще не полностью работает:
Каждый кластерный аппарат запускает потребительский модуль, который подписывается в очередь AMQP и выдает Prefetch_count Чтобы рассказать брокеру, сколько задач может работать одновременно.
Я смог сделать это работать с помощью SelectConnection из библиотеки Pika AMQP. Как потребитель, так и производитель начинают два канала, которые подключены к каждой очереди. Производитель отправляет запросы на канал [A] и ждет ответов в канале [b], а потребитель ждет запрос на канал [a] и отправлять ответы на канал [b]. Похоже, однако, что, когда потребитель запускает обратный вызов, который рассчитывает ответ, он блокирует, поэтому у меня есть только одна задача, выполняемая у каждого потребителя.
Что мне нужно в конце:
Ограничения:
Обновление
Я изучал немного дальше, и моя фактическая проблема, кажется, я использую простую функцию в качестве обратного вызова в Pika Функция SelectConnection.Channel.basic_consume (). Моя последняя (нерепленная) идея состоит в том, чтобы пройти функцию резьбы, а не регулярно, поэтому обратный вызов не будет блокировать, а потребитель может продолжать слушать.