Порядок сообщений с Kafka Connect Elasticsearch Connector

== сравнивает ссылки на объекты в Java и не является исключением для объектов String.

Для сравнения фактического содержимого объектов (в том числе String) необходимо использовать equals.

Если сравнение двух объектов String с использованием == оказывается true, это связано с тем, что объекты String были интернированы, а виртуальная машина Java имеет несколько ссылки указывают на тот же экземпляр String. Не следует ожидать сравнения одного объекта String, содержащего то же содержимое, что и другой объект String, используя == для оценки как true.

0
задан Yoni Gibbs 17 January 2019 в 10:14
поделиться

1 ответ

Проблема заключалась в том, что наш разъем Elasticsearch имел конфигурацию key.ignore, установленную на true.

Мы обнаружили эту строку в источнике Github для коннектора (в DataConverter.java ):

final Long version = ignoreKey ? null : record.kafkaOffset();

Это означало, что с key.ignore=true операции индексирования, которые были будучи сгенерированным и отправленным в Elasticsearch, фактически были «без версии» ... в основном, последний набор данных, полученный Elasticsearch для документа, заменит любые предыдущие данные, даже если это были «старые данные».

При просмотре файлов журнала у соединителя, похоже, есть несколько потоков потребителей, которые читают исходную тему, а затем передают преобразованные сообщения в Elasticsearch, но порядок их передачи в Elasticsearch не обязательно совпадает с порядком тем.

Используя key.ignore=false, каждое сообщение Elasticsearch теперь содержит значение версии, равное смещению записи Kafka, и Elasticsearch отказывается обновлять данные индекса для документа, если он уже получил данные для более поздней «версии».

Это было не только , только , что исправило это. Нам все еще пришлось применить преобразование к сообщению Debezium из темы Кафки, чтобы получить ключ в формате простого текста, которым Elasticsearch был доволен:

"transforms": "ExtractKey",
"transforms.ExtractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractKey.field": "id"
0
ответ дан Steven Frew 17 January 2019 в 10:14
поделиться
Другие вопросы по тегам:

Похожие вопросы: