I have Observable.Interval(TimeSpan.FromSeconds(1)) and a subscriber that checks something in DB every time interval occurs. But sometimes when I do that check from DB i want to immediately perform another check(call that subscriber again because I know there is something in the queue).
I have already managed to achieve something similar by combining Interval with while inside the subscriber method:
 Observable              
.Interval(TimeSpan.FromSeconds(1))
.Sample(TimeSpan.FromSeconds(1)) //to avoid multiple 'stacked' intervals 
.Subscribe(RepeatAction);
private void RepeatAction(long _)
{
    bool wasSuccess;
    do
    {
        wasSuccess = CheckingInDB(); //Long operation
    } while (wasSuccess );
}
But is it possible to achieve that kind of behavior with pure reactive?
Yes. It is possible.
First up though, there is a misunderstanding that you have with Rx.
If you run this code:
void Main()
{
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Sample(TimeSpan.FromSeconds(1.0))
        .Timestamp()
        .Subscribe(RepeatAction);
}
private void RepeatAction(Timestamped<long> _)
{
    Console.WriteLine(_.Timestamp);
    Thread.Sleep(10000);
}
You'll get this result:
2016/05/11 10:37:57 +00:00
2016/05/11 10:38:07 +00:00
2016/05/11 10:38:17 +00:00
2016/05/11 10:38:27 +00:00
You'll see that the steps between each value being produced is 10 seconds, not 1. The Interval operator simply ensures that the gap between each value is at least the duration of the TimeSpan, but if the observer takes longer then the duration becomes as long as each of the subscribers take. It doesn't queue up the values.
Another way of looking at it is that the .Sample(TimeSpan.FromSeconds(1)) does nothing as the .Interval(TimeSpan.FromSeconds(1.0)) ensures that the minimal gap between values is already 1 second.
Now, to solve the problem using purely Rx operators. Try this:
var query =
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Select(_ =>
            Observable
                .While(
                    () => CheckingInDB(),
                    Observable.Return(Unit.Default)))
        .Switch();
This will try every second to check the database, but once it hits a value it quickly repeats the check until it doesn't. Then it waits 1 second and tries again.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With