Способ отправки буферизованных событий даже interval

Я пытаюсь буферизовать входящие события от некоторого IObservable (они приходят пакетами) и выпускайте их дальше, но по одному, с равными интервалами. Примерно так:

-oo-ooo-oo------------------oooo-oo-o-------------->

-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->

Поскольку я новичок в Rx , я не уверен, существует ли уже субъект или оператор, который делает именно это. Может быть, это можно сделать композицией?

обновление:

Спасибо Ричард Салай за указание на оператора Drain , я нашел еще один пример Джеймса Майлза использования оператора Drain. Вот как мне удалось заставить его работать в приложении WPF:

    .Drain(x => {
        Process(x);
        return Observable.Return(new Unit())
            .Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
    }).Subscribe();

Я немного повеселился, потому что отсутствие параметра планировщика приводит к сбою приложения в режиме отладки без появления каких-либо исключений (мне нужно узнать, как работать с исключениями в Rx). Метод Process напрямую изменяет состояние пользовательского интерфейса, но я думаю, что довольно просто сделать из него IObservable (используя ISubject?).

update:

Тем временем я экспериментировал с ISubject, классом ниже выполняет то, что я хотел - своевременно выпускает буферизованные Ts:

public class StepSubject : ISubject
{
    IObserver subscriber;
    Queue queue = new Queue();
    MutableDisposable cancel = new MutableDisposable();
    TimeSpan interval;
    IScheduler scheduler;
    bool idle = true;

    public StepSubject(TimeSpan interval, IScheduler scheduler)
    {
        this.interval = interval;
        this.scheduler = scheduler;
    }

    void Step()
    {
        T next;
        lock (queue)
        {
            idle = queue.Count == 0;
            if (!idle)
                next = queue.Dequeue();
        }

        if (!idle)
        {
            cancel.Disposable = scheduler.Schedule(Step, interval);
            subscriber.OnNext(next);
        }
    }

    public void OnNext(T value)
    {
        lock (queue)
            queue.Enqueue(value);

        if (idle)
            cancel.Disposable = scheduler.Schedule(Step);
    }

    public IDisposable Subscribe(IObserver observer)
    {
        subscriber = observer;
        return cancel;
    }
}

Эта наивная реализация для ясности исключена из OnCompleted и OnError, также разрешена только одна подписка.

12
задан Community 23 May 2017 в 12:34
поделиться