Как выборочно удалить сообщения из AMQP (RabbitMQ) очередь?

Я хотел бы выборочно удалить сообщения из очереди AMQP, даже не читая их.

Сценарий следующие:

Отправка стороны хочет истечь сообщения типа X на основе того, что новая информация типа X прибыла. Поскольку очень вероятно, что подписчик еще не использовал последнее сообщение типа X, издатель должен просто удалить предыдущие сообщения X-типа и поместить новейшее в очередь. Целая операция должна быть очевидна для подписчика - на самом деле он должен использовать что-то в качестве простого, как ТОПАЮТ для получения сообщений.

Как сделать это с помощью AMQP? Или возможно это более удобно в другом протоколе обмена сообщениями?

Я хотел бы избежать сложной инфраструктуры. Целый необходимый обмен сообщениями так же прост как выше: у одной очереди, одного подписчика, одного издателя, но издателя должна быть способность к специальному удалению сообщений для данного критерии.

Клиент издателя будет использовать Ruby, но на самом деле я имел бы дело с любым языком, как только я обнаруживаю, как сделать это в протоколе.

9
задан Wojciech Kaczmarek 8 August 2010 в 15:03
поделиться

1 ответ

В настоящее время вы не можете сделать это в RabbitMQ (или, в более общем случае, в AMQP) автоматически. Но вот простой обходной путь.

Допустим, вы хотите отправить три типа сообщений: Xs, Ys и Zs. Если я правильно понял ваш вопрос, когда приходит сообщение X, вы хотите, чтобы брокер забыл все другие сообщения X, которые не были доставлены.

Это довольно легко сделать в RabbitMQ:

  • производитель объявляет три очереди: X, Y и Z (они автоматически привязываются к обмену по умолчанию с их именами в качестве ключей маршрутизации, что именно то, что нам нужно),
  • при публикации сообщения производитель сначала очищает соответствующую очередь (так, если он публикует сообщение X, он сначала очищает очередь X); это эффективно удаляет устаревшие сообщения,
  • потребитель просто потребляет из той очереди, которая ему нужна (X для сообщений X, Y для сообщений Y и т.д.). ); с его точки зрения, он просто должен сделать basic.get, чтобы получить следующее релевантное сообщение.

Это подразумевает условие гонки, когда два производителя посылают сообщения одного типа примерно в одно и то же время. В результате в очереди могут одновременно находиться два (или более) сообщений, но поскольку количество сообщений ограничено числом производителей, а лишние сообщения удаляются при следующей публикации, это не должно представлять особой проблемы.

Подводя итог, можно сказать, что это решение имеет всего один дополнительный шаг по сравнению с оптимальным решением, а именно очистка очереди X перед публикацией сообщения типа X.

Если вам нужна помощь в настройке этой конфигурации, идеальным местом для получения совета является список рассылки rabbitmq-discuss.

7
ответ дан 4 December 2019 в 10:02
поделиться
Другие вопросы по тегам:

Похожие вопросы: