I have a method returning an IObservable<long> that I call from an async method. I want to convert this into a normal List<long>, but to have that operation be cancelled if my CancellationToken is signaled.
I would like to do something like this:
List<long> result = await Foo().ToList(myCancellationToken);
What is the correct (and simplest) way to accomplish this? The ToList() extension method of IObservable<T> returns an IObservable<List<T>> and does not take a CancellationToken parameter.
var list = await Foo().ToList().ToTask(cancellationToken);
This has the advantage of cancelling immediately if the token is cancelled (the other Answer will not cancel until the next time the Foo observable produces a value).
Use TakeWhile to terminate the list.
CancellationToken MyToken = ...
var list = await Foo().TakeWhile(v=>!MyToken.IsCancellationRequested).ToList();
If you are worried about the subscription only cancelling when the next item is provided you can have this extensions method.
public static IObservable<T>
TakeWhile<T>
( this IObservable<T> This
, CancellationToken t
)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(t);
return Observable.Create<T>(( IObserver<T> observer ) =>
{
This.Subscribe(observer, cts.Token);
return Disposable.Create(() => cts.Cancel());
});
}
and write
CancellationToken MyToken = ...
var list = await Foo().TakeWhile(MyToken.IsCancellationRequested).ToList();
Using TakeWhile with a cancellation token is more composable than ToTask which just returns the last element.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With