Используя Reactive Extensions, я хочу игнорировать сообщения, поступающие из моего потока событий, которые происходят, когда мой метод Subscribe
это работает. т.е. иногда мне требуется больше времени, чтобы обработать сообщение, чем время между сообщениями, поэтому я хочу удалить сообщения, на обработку которых у меня нет времени.
Однако, когда мой метод Subscribe
завершится, если какие-либо сообщения действительно пришли, я хочу обработать последнее. Поэтому я всегда обрабатываю самое последнее сообщение.
Итак, если у меня есть какой-то код, который делает:
messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);
и если предположить, что '100' занимает много времени для обработки. Затем я хочу, чтобы «2» обрабатывалось, когда «100» завершается. «1» следует игнорировать, потому что он был заменен «2», пока «100» все еще обрабатывается.
Вот пример результата, который я хочу получить, используя фоновую задачу и Latest()
var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));
Task.Factory.StartNew(() =>
{
foreach(var n in messages.Latest())
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
}
});
Однако Latest() является блокирующим вызовом, и я бы предпочел, чтобы поток не сидел в ожидании следующего значения. вот так (иногда между сообщениями будут очень длинные промежутки).
Я также могу получить желаемый результат, используя BroadcastBlock
из TPL Dataflow,следующим образом:
var buffer = new BroadcastBlock(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));
buffer.AsObservable()
.Subscribe(n =>
{
Thread.Sleep(TimeSpan.FromMilliseconds(250));
Console.WriteLine(n);
});
но мне кажется, что это должно быть возможно непосредственно в Rx.Как лучше всего это сделать?