Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the IObservable.Subscribe(IObserver<T> observer) overload for?

When I am writing .Subscribe expressions I often find that Resharper has chosen the following overload for me, located in mscorlib, Version=4.0.0.0:

namespace System
{
  public interface IObservable<out T>
  {
    IDisposable Subscribe(IObserver<T> observer);
  }
}

This seems very different from most overloads which take an Action and also it's coming from mscorlib and not System.Reactive.* which is where I expect most of the Reactive stuff to be.

What does this overload do? How should it be used? How does IObserver<T> relate to Action? And why is this single overload in mscorlib?

like image 324
Jack Ukleja Avatar asked Oct 21 '25 12:10

Jack Ukleja


2 Answers

To be clear, that isn't an overload, that's the core of Rx really. All the other Subscribe methods, and all other operators for that matter, that you're used to are really extension methods that ultimately call that.

If you look at early documentation or Rx, you'll see that the creators viewed it as the push-based side of LINQ. So a lot of things are the mirror image of what you see in LINQ. IObservable is the mirror of IEnumerable, and IObserver is the mirror of IEnumerator.

However, because push is the opposite of pull, so the Rx versions are the opposite of their pull-based counterpart:

  • IEnumerable defines one method that produces an IEnumerator. IObservable defines one method that takes in an IObserver.
  • If you think of IEnumerator.MoveNext() + IEnumerator.Current as one operation, that can return in one of three ways: next element returned, end of collection reached, or exception thrown. Similarly, the IObserver has to handle three cases: next element (OnNext), end of stream (OnCompleted), or exception (OnError).

The more familiar 'overloads' of Subscribe are really just extension methods that look something like this:

public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext)
{
    return Subscribe(onNext, e => {/*onError */}, () => {/*onCompleted*/);
}

public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
    source.Subscribe(new AnonymousObserver<T>(onNext, onError, onCompleted));
}
like image 171
Shlomo Avatar answered Oct 23 '25 07:10

Shlomo


If you look at IObserver you might see the reason. The interface contains three methods (or Actions), the most "used" of them being OnNext.

Now let's see the implementation of the overload for Action here. The extension method actually generates IObserver for you by passing the provided Action as the OnNext semantics.

You can provide Actions for all interface method implementations if you wish to handle OnError and OnCompleted notifications.

like image 20
supertopi Avatar answered Oct 23 '25 07:10

supertopi