[ этот вопрос относится к сфере Reactive Extensions (Rx)]
int nValuesBeforeOutput = 123;
myStream.Buffer(nValuesBeforeOutput).Subscribe(
i => Debug.WriteLine("Something Critical on Every 123rd Value"));
Теперь мне нужно сериализовать и десериализовать состояние этой подписки, чтобы в следующий раз при запуске приложения счетчик буфера начинался НЕ с нуля, а с того значения, до которого счетчик буфера дошел до этого выход из приложения .
Основанная на подходе Пола Беттса, вот полу-обобщаемая реализация, которая работала в моем начальном тестировании
int nValuesBeforeOutput = 123;
var myRecordableStream = myStream.Record(serializer);
myRecordableStream.Buffer(nValuesBeforeOutput).ClearRecords(serializer).Subscribe(
i => Debug.WriteLine("Something Critical on Every 123rd Value"));
private static bool _alreadyRecording;
public static IObservable Record(this IObservable input,
IRepositor repositor)
{
IObservable output = input;
List records = null;
if (repositor.Deserialize(ref records))
{
ISubject history = new ReplaySubject();
records.ForEach(history.OnNext);
output = input.Merge(history);
}
if (!_alreadyRecording)
{
_alreadyRecording = true;
input.Subscribe(i => repositor.SerializeAppend(new List {i}));
}
return output;
}
public static IObservable ClearRecords(this IObservable input,
IRepositor repositor)
{
input.Subscribe(i => repositor.Clear());
return input;
}
Примечания
_alreadyRecording
требуется, если вы подписываетесь на myRecordableStream
более одного раза_alreadyRecording
является статическим логическим значением, очень некрасиво и не позволяет использовать методы расширения более чем в одном месте, если требуются параллельные подписки -необходимо повторно-реализовать для будущего использования