Каков наилучший способ «ограничить скорость» потребления Observable?

У меня есть куча событий, и я должен выполнить ВСЕ из них без потерь, но я хочу убедиться, что они буферизуются и используются в соответствующие временные интервалы. У кого-нибудь есть решение?

Я не могу найти никаких операторов в Rx, которые могли бы сделать это без потери событий (Дроссель -теряет события ). Я также рассматривал Buffered, Delay и т. д. Не могу найти хорошего решения.

Я пытался поставить таймер посередине, но он почему-то вообще не работает:

GetInitSequence()
           .IntervalThrottle(TimeSpan.FromSeconds(5))
           .Subscribe(
                item =>
                    {
                        Console.WriteLine(DateTime.Now);
                        // Process item
                    }
            );

public static IObservable<T> IntervalThrottle<T>(this IObservable<T> source, TimeSpan dueTime)
    {
        return Observable.Create<T>(o =>
            {
                return source.Subscribe(x =>
                    {
                        new Timer(state => 
                            o.OnNext((T)state), x, dueTime, TimeSpan.FromMilliseconds(-1));
                    }, o.OnError, o.OnCompleted);
        });
    }
6
задан IgorM 1 July 2012 в 17:44
поделиться