[ this question is in the realm of Reactive Extensions (Rx) ]
int nValuesBeforeOutput = 123;
myStream.Buffer(nValuesBeforeOutput).Subscribe(
    i => Debug.WriteLine("Something Critical on Every 123rd Value"));
Now I need to serialize and deserialize the state of this subscription so that next time the application is started the buffer count does NOT start from zero, but from whatever the buffer count got to before application exit.
Based on Paul Betts approach, here's a semi-generalizable implementation that worked in my initial testing
int nValuesBeforeOutput = 123;
var myRecordableStream = myStream.Record(serializer);
myRecordableStream.Buffer(nValuesBeforeOutput).ClearRecords(serializer).Subscribe(
    i => Debug.WriteLine("Something Critical on Every 123rd Value"));
    private static bool _alreadyRecording;
    public static IObservable<T> Record<T>(this IObservable<T> input,
                                           IRepositor repositor) 
    {
        IObservable<T> output = input;
        List<T> records = null;
        if (repositor.Deserialize(ref records))
        {
            ISubject<T> history = new ReplaySubject<T>();
            records.ForEach(history.OnNext);
            output = input.Merge(history);
        }
        if (!_alreadyRecording)
        {
            _alreadyRecording = true;
            input.Subscribe(i => repositor.SerializeAppend(new List<T> {i}));
        }
        return output;
    }
    public static IObservable<T> ClearRecords<T>(this IObservable<T> input,
                                                 IRepositor repositor)
    {
        input.Subscribe(i => repositor.Clear());
        return input;
    }
Notes
_alreadyRecording is needed if you subscribe to myRecordableStream more than once_alreadyRecording is a static boolean, very ugly, and prevents the extension methods from being used in more than one place if needing parallel subscriptions - needs to be re-implemented for future useThere is no general solution for this, and making one would be NonTrivial™. The closest thing you can do is make myStream some sort of replay Observable (i.e. instead of serializing the state, serialize the state of myStream and redo the work to get you back to where you were).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With