Используйте kafka для обнаружения изменений в значениях

4
задан Nikhil Sahu 29 March 2019 в 08:43
поделиться

1 ответ

Я не уверен, что это лучший подход, но в аналогичной задаче я использовал промежуточную сущность, чтобы зафиксировать изменение состояния. В вашем случае это будет что-то вроде

streamsBuilder.stream("my-topic").groupByKey()
          .aggregate(DeviceState::new, new Aggregator<String, Device, DeviceState>() {
        public DeviceState apply(String key, Device newValue, DeviceState state) {
            if(!newValue.getStatusBit().equals(state.getStatusBit())){
                 state.setChanged(true);    
            }
            state.setStatusBit(newValue.getStatusBit());
            state.setDeviceId(newValue.getDeviceId());
            state.setKey(key);
            return state;
        }
    }, TimeWindows.of(…) …).filter((s, t) -> (t.changed())).toStream();

В полученной теме у вас будут изменения. Вы также можете добавить некоторые атрибуты в DeviceState, чтобы сначала инициализировать его, в зависимости от того, хотите ли вы отправить событие, когда поступит первая запись устройства и т. Д.

0
ответ дан Katya Gorshkova 29 March 2019 в 08:43
поделиться
Другие вопросы по тегам:

Похожие вопросы: