сохранить состояние подписки IObservable в Rx

[ этот вопрос относится к сфере Reactive Extensions (Rx)]

Подписка, которая необходимо продолжить при перезапуске приложения

int nValuesBeforeOutput = 123;

myStream.Buffer(nValuesBeforeOutput).Subscribe(
    i => Debug.WriteLine("Something Critical on Every 123rd Value"));

Теперь мне нужно сериализовать и десериализовать состояние этой подписки, чтобы в следующий раз при запуске приложения счетчик буфера начинался НЕ с нуля, а с того значения, до которого счетчик буфера дошел до этого выход из приложения .

  • Как в этом случае сохранить состояние IObservable.Subscribe()и позже загрузить его?
  • Есть ли общее решение для сохранения состояния наблюдателя в Rx?



От ответа к решению

Основанная на подходе Пола Беттса, вот полу-обобщаемая реализация, которая работала в моем начальном тестировании

Используйте

int nValuesBeforeOutput = 123;

var myRecordableStream = myStream.Record(serializer);
myRecordableStream.Buffer(nValuesBeforeOutput).ClearRecords(serializer).Subscribe(
    i => Debug.WriteLine("Something Critical on Every 123rd Value"));

Методы расширения

    private static bool _alreadyRecording;

    public static IObservable Record(this IObservable input,
                                           IRepositor repositor) 
    {
        IObservable output = input;
        List records = null;
        if (repositor.Deserialize(ref records))
        {
            ISubject history = new ReplaySubject();
            records.ForEach(history.OnNext);
            output = input.Merge(history);
        }
        if (!_alreadyRecording)
        {
            _alreadyRecording = true;
            input.Subscribe(i => repositor.SerializeAppend(new List {i}));
        }
        return output;
    }

    public static IObservable ClearRecords(this IObservable input,
                                                 IRepositor repositor)
    {
        input.Subscribe(i => repositor.Clear());
        return input;
    }

Примечания

  • Это не сработает для хранения состояний, зависящих от времени-интервалы между полученными значениями
  • Вам нужна реализация сериализатора, которая поддерживает сериализацию T
  • _alreadyRecordingтребуется, если вы подписываетесь на myRecordableStreamболее одного раза
  • _alreadyRecordingявляется статическим логическим значением, очень некрасиво и не позволяет использовать методы расширения более чем в одном месте, если требуются параллельные подписки -необходимо повторно-реализовать для будущего использования

6
задан Cel 11 April 2012 в 14:44
поделиться