Как с помощью Rx игнорировать все значения, кроме самого последнего, когда работает мой метод Subscribe?

Используя Reactive Extensions, я хочу игнорировать сообщения, поступающие из моего потока событий, которые происходят, когда мой метод Subscribeэто работает. т.е. иногда мне требуется больше времени, чтобы обработать сообщение, чем время между сообщениями, поэтому я хочу удалить сообщения, на обработку которых у меня нет времени.

Однако, когда мой метод Subscribeзавершится, если какие-либо сообщения действительно пришли, я хочу обработать последнее. Поэтому я всегда обрабатываю самое последнее сообщение.

Итак, если у меня есть какой-то код, который делает:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);

и если предположить, что '100' занимает много времени для обработки. Затем я хочу, чтобы «2» обрабатывалось, когда «100» завершается. «1» следует игнорировать, потому что он был заменен «2», пока «100» все еще обрабатывается.

Вот пример результата, который я хочу получить, используя фоновую задачу и Latest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});

Однако Latest() является блокирующим вызовом, и я бы предпочел, чтобы поток не сидел в ожидании следующего значения. вот так (иногда между сообщениями будут очень длинные промежутки).

Я также могу получить желаемый результат, используя BroadcastBlockиз TPL Dataflow,следующим образом:

var buffer = new BroadcastBlock(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });

но мне кажется, что это должно быть возможно непосредственно в Rx.Как лучше всего это сделать?

35
задан Wilka 17 April 2013 в 12:13
поделиться