Как объединить несколько последовательностей IObservable?

        var a = Observable.Range(0, 10);
        var b = Observable.Range(5, 10);
        var zip = a.Zip(b, (x, y) => x + "-" + y);
        zip.Subscribe(Console.WriteLine);

Печать
0–5
1–6
2–7
...

Вместо этого я хотел бы объединить идентичные значения
5–5
6–6
7–7
8–8
...

Это упрощенный пример проблемы слияния сотен упорядоченных асинхронных последовательностей. Соединить два IEnumerable очень просто, но я не смог найти способ сделать что-то подобное в Rx. Есть идеи?

Подробнее о вкладах и о том, чего я пытаюсь достичь. По сути, вся система представляет собой конвейер реального времени с несколькими конечными автоматами (агрегаторы, буферы, сглаживающие фильтры и т. Д.), Соединенными шаблоном fork-join. Подходит ли RX для реализации таких вещей? Каждый вход может быть представлен как

public struct DataPoint
{
    public double Value;
    public DateTimeOffset Timestamp;
}

Каждый входной бит данных имеет метку времени по прибытии, таким образом, все события естественным образом упорядочиваются по их ключу соединения (метка времени). По мере того, как события проходят по конвейеру, они разделяются и объединяются. Соединения должны коррелироваться по метке времени и применяться в заранее определенном порядке. Например, join (a, b, c, d) => join (join (join (a, b), c), d).

Изменить Вот то, что я мог придумать в спешке. Надеюсь, есть более простое решение, основанное на существующих операторах Rx.

static void Test()
    {
        var a = Observable.Range(0, 10);
        var b = Observable.Range(5, 10);
        //var zip = a.Zip(b, (x, y) => x + "-" + y);
        //zip.Subscribe(Console.WriteLine);

        var joined = MergeJoin(a,b, (x,y) => x + "-" + y);
        joined.Subscribe(Console.WriteLine);
    }

static IObservable<string> MergeJoin(IObservable<int> left, IObservable<int> right, Func<int, int, string> selector)
    {
        return Observable.CreateWithDisposable<string>(o =>
            {
                Queue<int> a = new Queue<int>();
                Queue<int> b = new Queue<int>();
                object gate = new object();

                left.Subscribe(x =>
                    {
                        lock (gate)
                        {
                            if (a.Count == 0 || a.Peek() < x)
                                a.Enqueue(x);

                            while (a.Count != 0 && b.Count != 0)
                            {
                                if (a.Peek() == b.Peek())
                                {
                                    o.OnNext(selector(a.Dequeue(), b.Dequeue()));
                                }
                                else if (a.Peek() < b.Peek())
                                {
                                    a.Dequeue();
                                }
                                else
                                {
                                    b.Dequeue();
                                }
                            }
                        }
                    });

                right.Subscribe(x =>
                {
                    lock (gate)
                    {
                        if (b.Count == 0 || b.Peek() < x)
                            b.Enqueue(x);

                        while (a.Count != 0 && b.Count != 0)
                        {
                            if (a.Peek() == b.Peek())
                            {
                                o.OnNext(selector(a.Dequeue(), b.Dequeue()));
                            }
                            else if (a.Peek() < b.Peek())
                            {
                                a.Dequeue();
                            }
                            else
                            {
                                b.Dequeue();
                            }
                        }
                    }
                });

                return Disposable.Empty;
            });
8
задан 6 February 2011 в 18:42
поделиться