Кафка подписаться только на последнее сообщение?

Фрагментация должна быть прозрачной для приложения TCP. Имейте в виду, что TCP является потоковым протоколом: вы получаете поток данных, а не пакеты! Если вы создаете приложение на основе идеи полных пакетов данных, тогда у вас будут проблемы, если вы не добавите слой абстракции для сборки целых пакетов из потока, а затем передайте пакеты до приложения.

0
задан Todd 18 January 2019 в 18:23
поделиться

1 ответ

Если вы хотите иметь только одно значение для каждого ключа, вы должны использовать абстракцию KTable<K,V>: StreamsBuilder::table(final String topic) из Потоки Кафки . В используемой теме должна быть установлена ​​политика очистки compact.

Если вы используете KafkaConsumer, вы просто получаете данные от брокеров. Он не дает никакого механизма, который выполняет какую-то дедупликацию. В зависимости от того, было выполнено сжатие или нет, вы можете получить одно - n сообщений для одного и того же ключа.

Относительно сжатия

Сжатие не означает, что все старые значения для одного и того же ключа удаляются немедленно. Когда old сообщение для одного и того же ключа будет удалено, зависит от нескольких свойств. Наиболее важными из них являются:

  • log.cleaner.min.cleanable.ratio

Минимальное отношение грязного журнала к общему журналу для журнала, пригодного для очистки

  • log.cleaner.min.compaction.lag.ms

Минимальное время, в течение которого сообщение остается некомпактированным в журнале. Применяется только для журналов, которые уплотняются.

  • log.cleaner.enable

Включите процесс очистки журнала для запуска на сервере. Должен быть включен, если используются какие-либо темы с cleanup.policy = compact, включая тему внутренних смещений. Если эти темы отключены, они не будут сжиматься и постоянно увеличиваться в размерах.

Подробнее о сжатии вы можете найти https://kafka.apache.org/documentation/#compaction

0
ответ дан wardziniak 18 January 2019 в 18:23
поделиться
Другие вопросы по тегам:

Похожие вопросы: