JUST FOUND A SOLUTION!
Я только что заменил localhost 127.0.0.1.
Как ни странно, я думаю, что раньше я это пробовал. Возможно, возиться с конфигурацией и повторить попытку, она теперь работает.
Проблема оказалась в использовании timestamp
в качестве имени поля в моем классе Event. Изменение его на eventTime
было достаточно, чтобы все заработало:
public class Sort {
public static final int OUT_OF_ORDERNESS = 1000;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
Table events = tableEnv.fromDataStream(eventStream, "eventTime.rowtime");
tableEnv.registerTable("events", events);
Table sorted = tableEnv.sqlQuery("SELECT eventTime FROM events ORDER BY eventTime ASC");
DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);
sortedEventStream.print();
env.execute();
}
public static class Event {
public Long eventTime;
Event() {
this.eventTime = Instant.now().toEpochMilli() + (new Random().nextInt(OUT_OF_ORDERNESS));
}
}
private static class OutOfOrderEventSource implements SourceFunction<Event> {
private volatile boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while(running) {
ctx.collect(new Event());
Thread.sleep(1);
}
}
@Override
public void cancel() {
running = false;
}
}
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
public TimestampsAndWatermarks() {
super(Time.milliseconds(OUT_OF_ORDERNESS));
}
@Override
public long extractTimestamp(Event event) {
return event.eventTime;
}
}
}