У меня есть приложение, которое в некоторых моментах вызывает 1000 событий почти одновременно. Что я хотел бы сделать, так это разбить события на куски по 50 элементов и начать их обработку каждые 10 секунд. Нет необходимости ждать завершения обработки пакета перед началом обработки нового пакета.
Например:
10:00:00: 10000 new events received
10:00:00: StartProcessing (events.Take(50))
10:00:10: StartProcessing (events.Skip(50).Take(50))
10:00:15: StartProcessing (events.Skip(100).Take(50))
Есть идеи, как этого добиться? Я полагаю, что Reactive Extensions — это то, что нужно, но приемлемы и другие решения.
Я попытался начать отсюда:
var bufferedItems = eventAsObservable
.Buffer(15)
.Delay(TimeSpan.FromSeconds(5)
Но заметил, что задержка не работает, как я надеялся, и вместо этого все партии запускались одновременно, хотя и с задержкой в 5 секунд.
Я также тестировал оконный метод, но не заметил никакой разницы в поведении. Я предполагаю, что TimeSpan в Window на самом деле означает, что «брать каждое событие, которое происходит в течение следующих 10 секунд:
var bufferedItems = eventAsObservable
.Window(TimeSpan.FromSeconds(10), 5)
.SelectMany(x => x)
.Subscribe(DoProcessing);
Я использую бета-версию Rx-Main 2.0.20304.