Приостановить и возобновить подписку на холодном IObservable

Используя Rx , я хочу приостановить и возобновить работу ty в следующем коде:

Как реализовать Pause () и Resume ()?

    static IDisposable _subscription;

    static void Main(string[] args)
    {
        Subscribe();
        Thread.Sleep(500);
        // Second value should not be shown after two seconds:
        Pause();
        Thread.Sleep(5000);
        // Continue and show second value and beyond now:
        Resume();
    }

    static void Subscribe()
    {
        var list = new List { 1, 2, 3, 4, 5 };
        var obs = list.ToObservable();
        _subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
        {
            Console.WriteLine(p.ToString());
            Thread.Sleep(2000);
        },
        err => Console.WriteLine("Error"),
        () => Console.WriteLine("Sequence Completed")
        );
    }

    static void Pause()
    {
        // Pseudocode:
        //_subscription.Pause();
    }

    static void Resume()
    {
        // Pseudocode:
        //_subscription.Resume();
    }

Решение Rx?

  • Я считаю, что могу заставить его работать с каким-то логическим стробированием поля в сочетании с блокировкой потока ( Monitor.Wait и Monitor.Pulse )

  • Но существует ли Rx-оператор или какое-либо другое сокращенное обозначение для достижения той же цели?

10
задан Cel 1 October 2011 в 12:22
поделиться