Я бы рекомендовал использовать функцию бокового вывода Флинка для сбора исключений, а затем выводить их в тему Кафки.
final OutputTag outputTag = new OutputTag("side-output"){};
SingleOutputStreamOperator task1 = ...;
SingleOutputStreamOperator task2 = ...;
SingleOutputStreamOperator task3 = ...;
DataStream exceptions1 = task1.getSideOutput(outputTag);
DataStream exceptions2 = task2.getSideOutput(outputTag);
DataStream exceptions3 = task3.getSideOutput(outputTag);
DataStream exceptions = exceptions1.union(exceptions2, exceptions3);
exceptions.addSink(new FlinkKafkaProducer(...));
Вы также можете заключить свой результат в Left
и исключения в Right
типа Either
. В конце вашего конвейера вам нужно разделить поток на полезную нагрузку и исключения с помощью функции split/select
.
DataStream> stage2 = stage1.flatMap(...);
DataStream> stage3 = stage2.flatMap((Either payload, Collector out) -> {
if (payload.isLeft()) {
out.collect(Left.of(map(payload.left)));
} else {
out.collect(Right.of(payload.right()));
}
});
SplitStream> split = stage3.split((Either value) -> {
if (value.isLeft()) {
return Colletions.singleton("left");
} else {
return Collections.singleton("right");
}
});
DataStream> payloads = split.select("left");
DataStream> exceptions = split.select("right");
Я услышал о людях, имеющих проблемы с app_offline.htm, если это не имело достаточного количества содержания в файле.
Заполните его парой сотни Кбит текста Lorem Ipsum и посмотрите, помогает ли это.
Хорошо это не могло бы быть Вашей проблемой, но если это - первый раз, когда это произошло, Вы могли бы хотеть перезапустить свою машину. Иногда процесс умирает с блокировкой на .dll, таким образом Вы больше не можете использовать его, пока не разблокировано (таким образом нуждаются в перезапуске ОС).
Другая возможная проблема состоит в том, что .dll или его зависимый .dll's, не находятся в Вашем пути или корректной папке.
Посмотрите на оба из них сначала.