Я не уверен, что это лучший подход, но в аналогичной задаче я использовал промежуточную сущность, чтобы зафиксировать изменение состояния. В вашем случае это будет что-то вроде
streamsBuilder.stream("my-topic").groupByKey()
.aggregate(DeviceState::new, new Aggregator<String, Device, DeviceState>() {
public DeviceState apply(String key, Device newValue, DeviceState state) {
if(!newValue.getStatusBit().equals(state.getStatusBit())){
state.setChanged(true);
}
state.setStatusBit(newValue.getStatusBit());
state.setDeviceId(newValue.getDeviceId());
state.setKey(key);
return state;
}
}, TimeWindows.of(…) …).filter((s, t) -> (t.changed())).toStream();
В полученной теме у вас будут изменения. Вы также можете добавить некоторые атрибуты в DeviceState, чтобы сначала инициализировать его, в зависимости от того, хотите ли вы отправить событие, когда поступит первая запись устройства и т. Д.