Существует ли встроенный способ обработать несколько файлов как один поток?

Отличный вопрос! Все дело в использовании правильного инструмента для работы. Специальное назначение Kafka Connect - потоковая интеграция между исходными системами и Kafka, или из Kafka в другие системы (включая RDBMS).

Что дает вам Kafka Connect?

  • Масштабируемость; вы можете развернуть несколько рабочих, и Kafka Connect будет распределять задачи между ними
  • Устойчивость; если узел выходит из строя, Kafka Connect перезапустит работу на другом работнике
  • Простота использования; коннекторы существуют для множества технологий, поэтому для реализации коннектора обычно требуется всего несколько строк управления схемами JSON
  • ; поддержка схем в JSON, полная интеграция с реестром схем для Avro, подключаемые преобразователи из сообщества для Protobuf
  • Встроенные преобразования с помощью преобразования отдельных сообщений
  • Унифицированное и централизованное управление и настройка для всей вашей интеграции tasks

Это не значит, что вы не можете сделать это в Kafka Streams, но вам придется в конечном итоге самому написать много кода, когда он предоставляется из коробка для вас от Kafka Connect. Таким же образом вы могли бы использовать API-интерфейс Consumer API и кучу сделанного на заказ кода для выполнения потоковой обработки, которую дает вам Kafka Streams API, аналогично вы могли бы использовать Kafka Streams для получения данных из тема Кафки в базу данных - но зачем вам?

Если вам необходимо преобразовать данные перед их отправкой в ​​приемник, то рекомендуется отсоединить преобразование от отправки. Преобразуйте данные в Kafka Streams (или KSQL) и запишите их в другую тему Kafka. Используйте Kafka Connect для прослушивания этой новой темы и записи преобразованных сообщений в целевой приемник.

5
задан Michael Stum 12 February 2012 в 07:15
поделиться

1 ответ

Я не полагаю, что в платформе существует что-либо, но я предложил бы делать ее немного более гибкой - берут IEnumerable<Stream> в Вашем конструкторе, и происходят из Stream самостоятельно. Затем для получения потоков файла Вы можете (принятие C# 3.0) просто сделайте:

Stream combined = new CombinationStream(files.Select(file => File.Open(file));

Часть "владения" немного хитра здесь - вышеупомянутое позволило бы потоку комбинации брать владение любого потока, из которого это читает, но Вы не можете хотеть, чтобы это должно было выполнить итерации через всю остальную часть потоков и закрыть их всех, если это закрывается преждевременно.

7
ответ дан 13 December 2019 в 19:37
поделиться