Объединение Rx-потоков

Я пытаюсь смоделировать нетривиальный (для меня) Rx-запрос:

  • В комнате находятся Мужчины и Женщины.
  • Они входят и выходят из комнаты, а находясь в комнате, иногда меняют свое местонахождение.
  • Каждый мужчина может смотреть на одну (или ноль) женщину в данный момент времени.
  • Каждый мужчина обладает следующими свойствами:

    класс Man
    {
    public const int LookingAtNobody = 0;
    публичный идентификатор {получить; набор; }
    общественное двойное местоположение { получить; набор; }
    общественность Int LookingAt { получить; набор; }
    }
    
  • Каждая женщина имеет следующие свойства:

    класс Женщина
    {
    публичный идентификатор {получить; набор; }
    общественное двойное местоположение { получить; набор; }
    }
    
  • Для представления Мужчин у нас есть IObservable>, а для представления Женщин у нас есть IObservable>.

Как бы вы использовали Rx для генерации векторов от мужчин к женщинам, на которых они смотрят: IObservable>>?

Чтобы помочь, вот несколько модульных тестов для некоторых простых случаев:

public class Tests : ReactiveTest
{
    [Test]
    public void Puzzle1()
    {
        var scheduler = new TestScheduler();

        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnCompleted(300));

        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted(350));

        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1));

        var results = runQuery(scheduler, women, men);

        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted>(300),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
    }
    [Test]
    public void Puzzle2()
    {
        var scheduler = new TestScheduler();

        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnCompleted(400));

        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted(350));

        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1));

        var results = runQuery(scheduler, women, men);

        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted>(350),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
    }
    [Test]
    public void Puzzle3()
    {
        var scheduler = new TestScheduler();

        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnNext(300, new Man { Id = 1, Location = 2.0, LookingAt = Man.LookingAtNobody }),
            OnCompleted(400));

        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted(350));

        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1));

        var results = runQuery(scheduler, women, men);

        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted>(300),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
    }
    [Test]
    public void Puzzle4()
    {
        var scheduler = new TestScheduler();

        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnNext(300, new Man { Id = 1, Location = 3.0, LookingAt = 20 }),
            OnNext(400, new Man { Id = 1, Location = 4.0, LookingAt = 20 }),
            OnCompleted(500));

        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted(350));
        var w2 = scheduler.CreateHotObservable(
            OnNext(155, new Woman { Id = 20, Location = 100.0 }),
            OnNext(255, new Woman { Id = 20, Location = 200.0 }),
            OnNext(355, new Woman { Id = 20, Location = 300.0 }),
            OnCompleted(455));

        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1), OnNext(50, w2));

        var results = runQuery(scheduler, women, men);

        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted>(300),
                       };
        var expectedVector2 = new[]
                       {
                           OnNext(300, Tuple.Create(3.0, 200.0)),
                           OnNext(355, Tuple.Create(3.0, 300.0)),
                           OnNext(400, Tuple.Create(4.0, 300.0)),
                           OnCompleted>(455),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
        ReactiveAssert.AreElementsEqual(expectedVector2, innerResults[1]);
    }

    private static IEnumerable>>>>>> runQuery(TestScheduler scheduler, IObservable> women, IObservable> men)
    {
        // assuming nested sequences are hot
        var vectors =
            from manDuration in men
            join womanDuration in women on manDuration equals womanDuration
            select from man in manDuration
                   join woman in womanDuration on manDuration equals womanDuration
                   where man.LookingAt == woman.Id
                   select Tuple.Create(man.Location, woman.Location);

        var query = vectors.Select(vectorDuration =>
        {
            var vectorResults = scheduler.CreateObserver>();
            vectorDuration.Subscribe(vectorResults);
            return vectorResults.Messages;
        });

        var results = scheduler.Start(() => query, 0, 0, 1000).Messages;
        return results;
    }
}

(примечание: этот вопрос был опубликован на форумах Rx: http://social.msdn.microsoft.com/Forums /en-US/rx/thread/e73ae4e2-68c3-459a-a5b6-ea957b205abe)

25
задан Omer Mor 18 March 2012 в 12:45
поделиться