Реактивные расширения: обработка событий пакетами + добавление задержки между каждым пакетом

У меня есть приложение, которое в некоторых моментах вызывает 1000 событий почти одновременно. Что я хотел бы сделать, так это разбить события на куски по 50 элементов и начать их обработку каждые 10 секунд. Нет необходимости ждать завершения обработки пакета перед началом обработки нового пакета.

Например:

10:00:00: 10000 new events received
10:00:00: StartProcessing (events.Take(50))
10:00:10: StartProcessing (events.Skip(50).Take(50))
10:00:15: StartProcessing (events.Skip(100).Take(50))

Есть идеи, как этого добиться? Я полагаю, что Reactive Extensions — это то, что нужно, но приемлемы и другие решения.

Я попытался начать отсюда:

        var bufferedItems = eventAsObservable
            .Buffer(15)
            .Delay(TimeSpan.FromSeconds(5)

Но заметил, что задержка не работает, как я надеялся, и вместо этого все партии запускались одновременно, хотя и с задержкой в ​​5 секунд.

Я также тестировал оконный метод, но не заметил никакой разницы в поведении. Я предполагаю, что TimeSpan в Window на самом деле означает, что «брать каждое событие, которое происходит в течение следующих 10 секунд:

        var bufferedItems = eventAsObservable
            .Window(TimeSpan.FromSeconds(10), 5)
            .SelectMany(x => x)
            .Subscribe(DoProcessing);

Я использую бета-версию Rx-Main 2.0.20304.

10
задан Mikael Koskinen 7 June 2012 в 07:37
поделиться