Реализация IObservable <T> с нуля

Большинство IDE для Java автоматически генерирует метода get и код метода set для Вас, если Вы захотите их к. Существует много различных соглашений, и IDE как Eclipse позволит Вам выбирать, какой Вы хотите использовать, и даже позволить Вам определить свое собственное.

Eclipse даже включает автоматизированный рефакторинг, который позволит Вам оборачивать свойство в методе get и методе set, и это изменит весь код, который получает доступ к свойству непосредственно, чтобы заставить его использовать метода get и/или метод set.

, Конечно, Eclipse может только изменить код, который он знает о - любые внешние зависимости, которые Вы имеете, мог быть поврежден таким рефакторингом.

38
задан Jesper Larsen-Ledet 7 August 2013 в 14:02
поделиться

3 ответа

Честно говоря, я не уверен, насколько все это «правильно», но, судя по моему опыту, мне кажется, что это неплохо. Это код F #, но, надеюсь, вы уловили его вкус. Он позволяет вам «обновить» исходный объект, который вы затем можете вызвать Next / Completed / Error, и он управляет подписками и пытается утверждать, когда источник или клиенты делают что-то плохое.

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs

Я буду интересоваться любым мысли о том, что здесь хорошо или плохо; У меня еще не было возможности взглянуть на все новые Rx-материалы от devlabs ...

Мой собственный опыт подсказывает, что:

  • Те, кто подписывается на наблюдаемые объекты, никогда не должны отказываться от подписок. Наблюдаемое не может сделать ничего разумного, когда подписчик бросает. (Это похоже на события.) Скорее всего, исключение просто переместится в обработчик общего доступа верхнего уровня или приведет к сбою приложения.
  • Источники, вероятно, должны быть «логически однопоточными». Я думаю, что будет сложнее написать клиентов, которые могут реагировать на одновременные вызовы OnNext; даже если каждый отдельный вызов поступает из другого потока, полезно избегать одновременных вызовов.
  • Определенно полезно иметь базовый / вспомогательный класс, который обеспечивает выполнение некоторых «контрактов».

Мне очень любопытно, могут ли люди дать более конкретные советы в этом направлении.

  • Исходники, вероятно, должны быть «логически однопоточными». Я думаю, что будет сложнее написать клиентов, которые могут реагировать на одновременные вызовы OnNext; даже если каждый отдельный вызов поступает из другого потока, полезно избегать одновременных вызовов.
  • Определенно полезно иметь базовый / вспомогательный класс, который обеспечивает выполнение некоторых «контрактов».
  • Мне очень любопытно, могут ли люди дать более конкретные советы в этом направлении.

  • Исходники, вероятно, должны быть «логически однопоточными». Я думаю, что будет сложнее написать клиентов, которые могут реагировать на одновременные вызовы OnNext; даже если каждый отдельный вызов поступает из другого потока, полезно избегать одновременных вызовов.
  • Определенно полезно иметь базовый / вспомогательный класс, который обеспечивает выполнение некоторых «контрактов».
  • Мне очень любопытно, могут ли люди дать более конкретные советы в этом направлении.

    10
    ответ дан 27 November 2019 в 03:55
    поделиться
    1. Взломайте отражатель и посмотрите.

    2. Посмотрите несколько видеороликов о C9 - этот показывает, как можно «получить» комбинатор Select

    3. Секрет заключается в создании классов AnonymousObservable, AnonymousObserver и AnonymousDisposable (которые просто обходятся вокруг того факта, что вы не можете создавать экземпляры интерфейсов). Они содержат нулевую реализацию, поскольку вы передаете это с помощью действий и функций.

    Например:

    public class AnonymousObservable<T> : IObservable<T>
    {
        private Func<IObserver<T>, IDisposable> _subscribe;
        public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
        {
            _subscribe = subscribe;
        }
    
        public IDisposable Subscribe(IObserver<T> observer)
        {
            return _subscribe(observer);
        }
    }
    

    Я позволю вам разобраться с остальным ... это очень хорошее упражнение в понимании.

    Есть хороший небольшая нить здесь с соответствующими вопросами.

    2
    ответ дан 27 November 2019 в 03:55
    поделиться

    только одно замечание относительно этой реализации:

    после того, как параллельные коллекции были введены в .net fw 4, вероятно, лучше использовать ConcurrentDictioary вместо простого словаря.

    это избавляет от блокировок обработки коллекции.

    ади.

    2
    ответ дан 27 November 2019 в 03:55
    поделиться
    Другие вопросы по тегам:

    Похожие вопросы: