Проблема оказалась в использовании timestamp
в качестве имени поля в моем классе Event. Изменение его на eventTime
было достаточно, чтобы все заработало:
public class Sort {
public static final int OUT_OF_ORDERNESS = 1000;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
Table events = tableEnv.fromDataStream(eventStream, "eventTime.rowtime");
tableEnv.registerTable("events", events);
Table sorted = tableEnv.sqlQuery("SELECT eventTime FROM events ORDER BY eventTime ASC");
DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);
sortedEventStream.print();
env.execute();
}
public static class Event {
public Long eventTime;
Event() {
this.eventTime = Instant.now().toEpochMilli() + (new Random().nextInt(OUT_OF_ORDERNESS));
}
}
private static class OutOfOrderEventSource implements SourceFunction<Event> {
private volatile boolean running = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
while(running) {
ctx.collect(new Event());
Thread.sleep(1);
}
}
@Override
public void cancel() {
running = false;
}
}
private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
public TimestampsAndWatermarks() {
super(Time.milliseconds(OUT_OF_ORDERNESS));
}
@Override
public long extractTimestamp(Event event) {
return event.eventTime;
}
}
}
NHibernate, по умолчанию, поддерживает оптимистичный параллелизм. Пессимистический параллелизм, с другой стороны, может быть выполнен через ISession.Lock()
метод.
Эти вопросы обсуждаются подробно в этот документ .
Можно также 'просто' вручную сравнить номера версий (предполагающий добавление свойства Version к объекту).
Явно Оптимистичный единственная нормальная опция. Иногда, конечно, мы должны иметь дело с сумасшедшими сценариями однако...
NHibernate поддерживает 2 типа оптимистичного параллелизма.
у Вас может или быть он, проверяют грязные поля при помощи "optimistic-lock=dirty" атрибута на элементе "класса" в Ваших файлах отображения, или можно использовать "optimistic-lock=version" (который является также значением по умолчанию). При использовании версии, необходимо обеспечить элемент "версии" в файле отображения, который отображается на поле в базе данных.
Версия может иметь тип Int64, Int32, Int16, Галочки, Метку времени или TimeSpan и автоматически увеличена на сохранении. См. Глава 5 в документации NHibernate для большего количества информации