Как обрабатывать исключения в OnNext при использовании ObserveOn?

Мое приложение завершается, когда наблюдатель выдает ошибку OnNextпри использовании ObserveOn( Scheduler.ThreadPool). Единственный способ, который я нашел, чтобы справиться с этим, — использовать пользовательский метод расширения ниже (помимо того, чтобы убедиться, что OnNext никогда не выдает исключение). Затем убедитесь, что за каждым ObserveOnследует ExceptionToError.

    public static IObservable<T> ExceptionToError<T>(this IObservable<T> source) {
        var sub = new Subject<T>();
        source.Subscribe(i => {
            try {
                sub.OnNext(i);
            } catch (Exception err) {
                sub.OnError(err);
            }
        }
            , e => sub.OnError(e), () => sub.OnCompleted());
        return sub;
    }

Однако это кажется неправильным. Есть ли лучший способ справиться с этим?

Пример

Эта программа аварийно завершает работу из-за неперехваченного исключения.

class Program {
    static void Main(string[] args) {
        try {
            var xs = new Subject<int>();

            xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x => {
                Console.WriteLine(x);
                if (x % 5 == 0) {
                    throw new System.Exception("Bang!");
                }
            }, ex => Console.WriteLine("Caught:" + ex.Message)); // <- not reached

            xs.OnNext(1);
            xs.OnNext(2);
            xs.OnNext(3);
            xs.OnNext(4);
            xs.OnNext(5);
        } catch (Exception e) {
            Console.WriteLine("Caught : " + e.Message); // <- also not reached
        } finally {

            Console.ReadKey();
        }
    }
}
10
задан Herman 25 June 2012 в 03:44
поделиться