Can someone explain why the following AsObservable method creates an infinite loop even though the end of stream is reached?
public static class StreamExt {
    public static IObservable<byte> AsObservable(this Stream stream, int bufferSize) {
        return Observable
            .FromAsync(cancel => stream.ReadBytes(bufferSize, cancel))
            .Repeat()
            .TakeWhile(bytes => bytes != null) // EndOfStream
            .SelectMany(bytes => bytes);
    }
    private static async Task<byte[]> ReadBytes(this Stream stream, int bufferSize, CancellationToken cancel) {
        var buf = new byte[bufferSize];
        var bytesRead = await stream
            .ReadAsync(buf, 0, bufferSize, cancel)
            .ConfigureAwait(false);
        if (bytesRead < 1) return null; // EndOfStream
        var result_size = Math.Min(bytesRead, bufferSize);
        Array.Resize(ref buf, result_size);
        return buf;
    }
}
A quick tests shows that it produces an infinite loop:
class Program {
    static void Main(string[] args) {
        using (var stream = new MemoryStream(new byte[] { 1, 2, 3 })) {
            var testResult = stream
                .AsObservable(1024)
                .ToEnumerable()
                .ToArray();
            Console.WriteLine(testResult.Length);
        }
    }
}
Of course I could add an .SubscribeOn(TaskPoolScheduler.Default) but however, the infinite loop stays alive (blocks a task pool scheduler + infinitely reads from Stream).
[UPDATE 2017-05-09]
Shlomo posted a better example to reproduce this issue:
int i = 0;
var testResult = Observable.FromAsync(() => Task.FromResult(i++))
    .Repeat()
    .TakeWhile(l => l < 3);
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is never printed.");
For anyone who ends up here and needs an answer, not just an explanation: the issue appears to be the default scheduler of FromAsync, as indicated by this self-answered question. If you adjust to the "current thread" scheduler Repeat().TakeWhile(...) behaves more predictably. E.g. (extract from question):
.FromAsync(cancel => stream.ReadBytes(bufferSize, cancel), 
    System.Reactive.Concurrency.Scheduler.CurrentThread)
.Repeat()
.TakeWhile(bytes => bytes != null) // EndOfStream
You can confirm the OnCompleted is being produced correctly with this:
using (var stream = new MemoryStream(new byte[] { 1, 2, 3 }))
{
    var testResult = stream
        .AsObservable(1024)
        ;
    testResult.Subscribe(b => Console.WriteLine(b), e => {}, () => Console.WriteLine("OnCompleted"));
}
It looks like there's a problem with the .FromAsync + .Repeat combination. The following code acts similarly:
int i = 0;
var testResult = Observable.FromAsync(() => Task.FromResult(i++))
    .Repeat()
    .TakeWhile(l => l < 3)
    ;
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is never printed.");
...whereas this code terminates correctly:
var testResult = Observable.Generate(0, i => true, i => i + 1, i => i)
    .Repeat()
    .TakeWhile(l => l < 3)
    ;
testResult.Subscribe(b => Console.WriteLine(b), e => { }, () => Console.WriteLine("OnCompleted"));
Console.WriteLine("This is printed.");
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With