Фрагментация должна быть прозрачной для приложения TCP. Имейте в виду, что TCP является потоковым протоколом: вы получаете поток данных, а не пакеты! Если вы создаете приложение на основе идеи полных пакетов данных, тогда у вас будут проблемы, если вы не добавите слой абстракции для сборки целых пакетов из потока, а затем передайте пакеты до приложения.
Если вы хотите иметь только одно значение для каждого ключа, вы должны использовать абстракцию KTable<K,V>
: StreamsBuilder::table(final String topic)
из Потоки Кафки . В используемой теме должна быть установлена политика очистки compact
.
Если вы используете KafkaConsumer, вы просто получаете данные от брокеров. Он не дает никакого механизма, который выполняет какую-то дедупликацию. В зависимости от того, было выполнено сжатие или нет, вы можете получить одно - n сообщений для одного и того же ключа.
Относительно сжатия
Сжатие не означает, что все старые значения для одного и того же ключа удаляются немедленно. Когда old
сообщение для одного и того же ключа будет удалено, зависит от нескольких свойств. Наиболее важными из них являются:
log.cleaner.min.cleanable.ratio
Минимальное отношение грязного журнала к общему журналу для журнала, пригодного для очистки
blockquote>
log.cleaner.min.compaction.lag.ms
Минимальное время, в течение которого сообщение остается некомпактированным в журнале. Применяется только для журналов, которые уплотняются.
blockquote>
log.cleaner.enable
Включите процесс очистки журнала для запуска на сервере. Должен быть включен, если используются какие-либо темы с cleanup.policy = compact, включая тему внутренних смещений. Если эти темы отключены, они не будут сжиматься и постоянно увеличиваться в размерах.
blockquote>Подробнее о сжатии вы можете найти https://kafka.apache.org/documentation/#compaction