Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Correct way of long running API calls in RX.net and WPF

I've been happily making some API calls in a WPF app using RX in the following manner:

    IDisposable disposable = _textFromEventPatternStream
        .ObserveOn(_rxConcurrencyService.Dispatcher)
        .Subscribe(async input =>
            {
                try
                {
                    IsLoading = true;
                    int x = int.Parse(input);
                    var y = await _mathApi.CalcAsync(x);
                    IsLoading = false;
                    Model.Update("", y);
                }
                catch (Exception ex)
                {
                    Model.Update(ex.Message, "Error caught in subscribe, stream continues...");
                }
                finally
                {
                    IsLoading = false;
                }
            },
            ex => Model.Update(ex.Message, "Error, stream will end..."));

However for various reasons, i think I may need to make the calls using the SelectMany operator and do some processing on the stream.

I expect that within the api calls there may be some errors. For example the API endpoint may not be available. Some of the parsing before the API call fail. Etc. I want the Hot Observable to continue. I need to display a standard IsLoading spinner as well.

Now I also understand, that once on OnError is received the sequence should not continue. I understand this... I just don't like it.

With, that, the question is: Is using Retry() the correct method of achieving a hot observable that continues to operate regardless of errors?

The below rewritten code works, but feels yucky:

    IDisposable disposable = _textFromEventPatternStream
        .Select(input => int.Parse(input)) // simulating much heavier pre processing, leading to a possible error
        .ObserveOn(_rxConcurrencyService.Dispatcher)
        .Do(_ => IsLoading = true)
        .ObserveOn(_rxConcurrencyService.TaskPool)
        .SelectMany(inputInt => _mathApi.CalcAsync(inputInt))
        .ObserveOn(_rxConcurrencyService.Dispatcher)
        .Do(s => { },
            ex =>
            {
                // this feels like a hack.
                Model.Update(ex.Message, "Error, stream will retry...");
                IsLoading = false;
            })
        .Retry()
        .Subscribe(x => Model.Update("", x),
            ex => Model.Update(ex.Message, "Error, stream will end..."));

I have seen some code examples, where people use nested streams to resubscribe to a faulted stream. From what I've read this seems like a common approach, but to me this it seems to turn what should be a simple scenario into a hard to follow situation.

like image 850
just part of the crowd Avatar asked Oct 29 '25 00:10

just part of the crowd


1 Answers

If it's the CalcAsync that could throw an error, I'd try this instead:

.SelectMany(inputInt => Observable.FromAsync(() => _mathApi.CalcAsync(inputInt)).Retry())

Put the retry as close to the faulting observable as possible.

I'd also suggest some sort of retry count so that a a perpetual error doesn't just hang the observable.

Here's a sample that shows that this works.

This fails:

void Main()
{
    var subject = new Subject<string>();

    IDisposable disposable =
        subject
            .Select(input => int.Parse(input))
            .SelectMany(inputInt => Observable.FromAsync(() => CalcAsync(inputInt)))
            .Subscribe(x => Console.WriteLine(x));

    subject.OnNext("1");
    subject.OnNext("2");
    subject.OnNext("3");
    subject.OnNext("4");
    subject.OnNext("5");
    subject.OnNext("6");
    subject.OnCompleted();
}

private int _counter = 0;

public async Task<int> CalcAsync(int x)
{
    if (_counter++ == 3)
    {
        throw new Exception();
    }
    return await Task.Factory.StartNew(() => -x);
}

It typically outputs:

-1
-2
-3
Exception of type 'System.Exception' was thrown. 

Change the SelectMany to:

.SelectMany(inputInt => Observable.FromAsync(() => CalcAsync(inputInt)).Retry())

Now I get:

-1
-3
-2
-4
-5
-6
like image 60
Enigmativity Avatar answered Oct 30 '25 15:10

Enigmativity