Rx IObservable буферизация для сглаживания всплесков событий

У меня есть наблюдаемая последовательность, которая производит события в виде быстрых всплесков (то есть: пять событий одно за другим, затем длительная задержка, затем еще один быстрый всплеск событий и т. Д.). Я хочу сгладить эти всплески, вставив небольшую задержку между событиями. Представьте себе следующую диаграмму в качестве примера:

Raw:      --oooo--------------ooooo-----oo----------------ooo|
Buffered: --o--o--o--o--------o--o--o--o--o--o--o---------o--o--o|

Мой текущий подход заключается в создании таймера, подобного метроному, с помощью Observable.Interval () , который сигнализирует, когда можно получить другое событие из необработанного потока. Проблема в том, что я не могу понять, как затем объединить этот таймер с моей небуферизованной наблюдаемой последовательностью.

IObservable.Zip () близок к тому, чтобы делать то, что я хочу, но работает только до тех пор, пока необработанный поток генерирует события быстрее, чем таймер. Как только в необработанном потоке наступает значительная пауза, таймер создает серию нежелательных событий, которые затем немедленно объединяются со следующим пакетом событий из необработанного потока.

В идеале, Мне нужен метод расширения IObservable со следующей сигнатурой функции, которая производит поведение, описанное выше. Теперь мне на помощь приходит StackOverflow :)

public static IObservable<T> Buffered(this IObservable<T> src, TimeSpan minDelay)

PS. Я новичок в Rx, поэтому приношу свои извинения, если это тривиально простой вопрос ...


1. Простой, но ошибочный подход

Вот мое первоначальное наивное и упрощенное решение, которое имеет довольно много проблем:

public static IObservable<T> Buffered<T>(this IObservable<T> source, TimeSpan minDelay)
{
    Queue<T> q = new Queue<T>();
    source.Subscribe(x => q.Enqueue(x));
    return Observable.Interval(minDelay).Where(_ => q.Count > 0).Select(_ => q.Dequeue());
}

Первая очевидная проблема заключается в том, что IDisposable, возвращаемый внутренней подпиской на необработанный источник, теряется, и, следовательно, подписка может не прекращается. Вызов Dispose для IDisposable, возвращаемого этим методом, убивает таймер, но не базовый необработанный канал событий, который теперь напрасно заполняет очередь никем, чтобы вытащить события из очереди.

Вторая проблема заключается в том, что есть ' s нет способа распространять исключения или уведомления о конце потока из необработанного потока событий в буферизованный поток - они просто игнорируются при подписке на необработанный источник.

И последнее, но не менее важное: теперь у меня есть получил код, который периодически просыпается независимо от того, есть ли на самом деле какая-то работа, чего я бы предпочел избегать в этом чудесном новом реактивном мире.


2. Слишком сложный подход

Чтобы решить проблемы, возникшие в моем первоначальном упрощенном подходе, я написал гораздо более сложную функцию, которая ведет себя во многом как IObservable.Delay () (я использовал. NET Reflector, чтобы прочитать этот код и использовать его как основу моей функции). К сожалению, большая часть шаблонной логики, такой как AnonymousObservable , не является общедоступной вне кода system.reactive, поэтому мне пришлось скопировать и вставить лот кода. Это решение, похоже, работает, но, учитывая его сложность, я менее уверен, что в нем нет ошибок.

Я просто не могу поверить, что нет способа добиться этого, используя некоторую комбинацию стандартных расширений Reactive. Ненавижу ощущение, будто я напрасно изобретаю колесо, а шаблон, который я пытаюсь построить, кажется довольно стандартным.

18
задан Dan 22 December 2010 в 08:51
поделиться