==
сравнивает ссылки на объекты в Java и не является исключением для объектов String
.
Для сравнения фактического содержимого объектов (в том числе String
) необходимо использовать equals
.
Если сравнение двух объектов String
с использованием ==
оказывается true
, это связано с тем, что объекты String
были интернированы, а виртуальная машина Java имеет несколько ссылки указывают на тот же экземпляр String
. Не следует ожидать сравнения одного объекта String
, содержащего то же содержимое, что и другой объект String
, используя ==
для оценки как true
.
Проблема заключалась в том, что наш разъем Elasticsearch имел конфигурацию key.ignore
, установленную на true
.
Мы обнаружили эту строку в источнике Github для коннектора (в DataConverter.java ):
final Long version = ignoreKey ? null : record.kafkaOffset();
Это означало, что с key.ignore=true
операции индексирования, которые были будучи сгенерированным и отправленным в Elasticsearch, фактически были «без версии» ... в основном, последний набор данных, полученный Elasticsearch для документа, заменит любые предыдущие данные, даже если это были «старые данные».
При просмотре файлов журнала у соединителя, похоже, есть несколько потоков потребителей, которые читают исходную тему, а затем передают преобразованные сообщения в Elasticsearch, но порядок их передачи в Elasticsearch не обязательно совпадает с порядком тем.
Используя key.ignore=false
, каждое сообщение Elasticsearch теперь содержит значение версии, равное смещению записи Kafka, и Elasticsearch отказывается обновлять данные индекса для документа, если он уже получил данные для более поздней «версии».
Это было не только , только , что исправило это. Нам все еще пришлось применить преобразование к сообщению Debezium из темы Кафки, чтобы получить ключ в формате простого текста, которым Elasticsearch был доволен:
"transforms": "ExtractKey",
"transforms.ExtractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractKey.field": "id"