Отличный вопрос! Все дело в использовании правильного инструмента для работы. Специальное назначение Kafka Connect - потоковая интеграция между исходными системами и Kafka, или из Kafka в другие системы (включая RDBMS).
Что дает вам Kafka Connect?
Это не значит, что вы не можете сделать это в Kafka Streams, но вам придется в конечном итоге самому написать много кода, когда он предоставляется из коробка для вас от Kafka Connect. Таким же образом вы могли бы использовать API-интерфейс Consumer API и кучу сделанного на заказ кода для выполнения потоковой обработки, которую дает вам Kafka Streams API, аналогично вы могли бы использовать Kafka Streams для получения данных из тема Кафки в базу данных - но зачем вам?
Если вам необходимо преобразовать данные перед их отправкой в приемник, то рекомендуется отсоединить преобразование от отправки. Преобразуйте данные в Kafka Streams (или KSQL) и запишите их в другую тему Kafka. Используйте Kafka Connect для прослушивания этой новой темы и записи преобразованных сообщений в целевой приемник.
Я не полагаю, что в платформе существует что-либо, но я предложил бы делать ее немного более гибкой - берут IEnumerable<Stream>
в Вашем конструкторе, и происходят из Stream
самостоятельно. Затем для получения потоков файла Вы можете (принятие C# 3.0) просто сделайте:
Stream combined = new CombinationStream(files.Select(file => File.Open(file));
Часть "владения" немного хитра здесь - вышеупомянутое позволило бы потоку комбинации брать владение любого потока, из которого это читает, но Вы не можете хотеть, чтобы это должно было выполнить итерации через всю остальную часть потоков и закрыть их всех, если это закрывается преждевременно.