2
ответа

Как процессор потокового процессора (API низкого уровня) получает данные из темы?

Я новичок в потоковом процессоре kafka и натолкнулся на ключевую концепцию «топологии». Я создал процессор исходного кода, который читает из «темы-источника» следующим образом: Topology topology = new Topology (); ...
вопрос задан: 19 March 2019 09:17
1
ответ

Используйте kafka для обнаружения изменений в значениях

У меня есть потоковое приложение, которое непрерывно принимает поток координат вместе с некоторыми пользовательскими метаданными, которые также включают цепочку битов. Этот поток создается на тему кафки с использованием ...
вопрос задан: 29 March 2019 08:43
1
ответ

Могу ли я установить группу Kafka Stream для потребителей?

Я использую библиотеку Kafka Stream для потокового приложения. Я хотел установить идентификатор группы потребителей kafka. Затем я поставил конфигурацию потока Kafka, как показано ниже. streamsCopnfiguration.put (StreamsConfig ....
вопрос задан: 19 March 2019 08:23
1
ответ

Spring Cloud Stream против Kafka Stream для функции "Точно однажды"

Я не смог найти здесь и на веб-сайте и в блогах Spring, если Spring Cloud Stream может предоставить семантику «Ровно когда-то», предоставляемую API-интерфейсами Kafka Stream. Может быть, нет ни одной конфигурации / ...
вопрос задан: 4 March 2019 07:54
1
ответ

Как проверить, была ли отправлена ​​новая запись в данный период времени, используя kafka и faust

Я использую тестовую настройку, включая платформу слияния (докер), и обрабатываю записи со следующей информацией: идентификатор датчика, метка времени, значение. Использование robinhood's faust (похоже на Kafka Streams ...
вопрос задан: 25 February 2019 23:50
1
ответ

Как вывести несколько записей из Transformer?

Дано: топология DSL с KStream :: transform. В рамках выполнения Transformer :: transform из входного сообщения генерируется множество сообщений (KeyValue < String, Message >). Я, наверное, могу вернуться ...
вопрос задан: 22 January 2019 13:21
1
ответ

Kafka Connect против потоков для раковин [закрыто]

Я пытаюсь понять, что Connect покупает у вас, а Streams - нет. У нас есть часть нашего приложения, где мы хотим использовать тему и писать в mariadb. Я мог бы сделать это с помощью простого ...
вопрос задан: 19 January 2019 23:58
1
ответ

Возможно ли InvalidStateStoreException при получении хранилища состояний из ProcessorContext?

При попытке получить локальное хранилище состояний из KafkaStreams возможно получить исключение InvalidStateStoreException, если локальный экземпляр KafkaStreams еще не готов или хранилище состояний было ...
вопрос задан: 18 January 2019 17:37
1
ответ

Как операции с состоянием работают в потоках Kafka, когда есть несколько экземпляров потокового приложения?

Как работают полные операции состояния в приложении Kafka Stream с несколькими экземплярами? Давайте просто скажем, что у нас есть 2 темы с 2 разделами A и B. У нас есть потоковое приложение, которое ...
вопрос задан: 18 January 2019 11:04
1
ответ

Kafka Streams: Каковы негативные последствия медленной пунктуальной работы?

В нашей топологии Kafka Streams есть некоторые знаки пунктуации, запуск которых может занять много времени (несколько минут). Каковы последствия таких медленных точек? Будет ли процесс, в котором они ...
вопрос задан: 18 January 2019 01:52
1
ответ

Kafka Stream подавляет агрегацию оконных сессий

Я написал этот код в потоковом приложении Kafka: KGroupedStream < String, foo > groupedStream = stream.groupByKey (); groupedStream.windowedBy (SessionWindows.with (Duration.ofSeconds (3)) ....
вопрос задан: 16 January 2019 17:44
1
ответ

Как закрыть KStream, если все брокеры не доступны

Я использую Streams с Spring Boot Application. Если Kafka закрыт или недоступен, блокируется только следующее: Не удалось установить соединение с узлом -1. Брокер может быть недоступен ....
вопрос задан: 16 January 2019 10:13
0
ответов

Хранилище состояний Kafka пусто, а метод metadataForKey возвращает текущий экземпляр для необходимого ключа

У меня есть несколько экземпляров потокового приложения Kafka. Чтобы запросить ключ из магазина, я вызвал streams.metadataForKey с необходимым ключом. Этот метод возвращает сведения об экземпляре для магазина ...
вопрос задан: 27 March 2019 02:00
0
ответов

Kafka Streams: как получить первую и последнюю запись SessionWindow?

По умолчанию .windowedBy (SessionWindows.with (Duration.ofSeconds (60)) возвращает запись для каждой входящей записи. В сочетании с .count () и .filter () легко получить первую запись. Используя ... ,
вопрос задан: 26 March 2019 16:17
0
ответов

Не удалось найти класс io.confluent.connect.avro.ConnectDefault

Я сталкиваюсь с подобной проблемой, упомянутой в https://github.com/confluentinc/kafka-streams-examples/issues/22. Я изменил свою исходную конфигурацию JDBC согласно рекомендации в комментарии, но все же ...
вопрос задан: 10 March 2019 09:10
0
ответов

Kafka Streams - пунктуатор расписания процессора по выражению cron

ProcessorContext.schedule (..) принимает параметр интервала продолжительности, и мы можем указать, что какое-то действие (пунктуатор) будет выполняться через каждый интервал времени X (например, каждый час). С таким планированием, ...
вопрос задан: 20 January 2019 00:08
0
ответов

Kafka Streams уравновешивает всплески задержки на высокопроизводительных сервисах kafka-streams

мы начинаем работать с потоками Kafka, наш сервис очень прост в использовании без сохранения состояния. У нас жесткие требования к задержке, и мы сталкиваемся со слишком большими задержками, когда группа потребителей ...
вопрос задан: 16 January 2019 14:05
0
ответов

KStreams + Spark Streaming + Машинное обучение

Я делаю POC для запуска алгоритма машинного обучения на потоке данных. Моя первоначальная идея состояла в том, чтобы взять данные, использовать Spark Streaming -> Aggregate Data из нескольких таблиц -> запустить MLLib on Stream of ...
вопрос задан: 29 November 2018 10:57
0
ответов

Тест топологии потоков Кафки

Я ищу способ протестировать приложение Kafka Streams. Чтобы я мог определить входные события, а набор тестов показывает мне вывод. Это возможно без реальной настройки Kafka?
вопрос задан: 24 January 2017 18:20