Suppose that I want to convert some legacy asynchronous API into FS2 Streams. The API provides an interface with 3 callbacks: next element, success, error. I'd like the Stream to emit all the elements and then complete upon receiving success or error callback.
FS2 guide (https://functional-streams-for-scala.github.io/fs2/guide.html) suggests using fs2.Queue for such situations,
and it works great for enqueueing, but all the examples I've seen so far expect that the stream that queue.dequeue returns will never complete - 
there's no obvious way to handle success/error callback in my situation.
I've tried to use queue.dequeue.interruptWhen(...here goes the signal...), but if success/error callback arrives before the client has read the data from the stream,
stream gets terminated prematurely - there are still unread elements. I'd like the consumer to finish reading them before completing the stream.
Is it possible to do that with FS2? With Akka Streams it's trivial - SourceQueueWithComplete has complete and fail methods. 
UPDATE: I was able to get good enough result by wrapping elements in Option and considering None as a signal to stop reading the stream, and additionally by using a Promise to propagate errors:
queue.dequeue
.interruptWhen(interruptingPromise.get)
.takeWhile(_.isDefined).map(_.get)
However, did I overlook more natural way of doing such things?
One idiomatic way to do this is to create a Queue[Option[A]] instead of Queue[A]. When enqueueing, wrap in Some, and you can explicitly enqueue None to signal completion. On the dequeueing side, do q.dequeue.unNoneTerminate, which gives you a Stream[F, A] that terminates once the Queue emits None
Answer to your update: Combine unNoneTerminate with rethrow, which takes a Stream[F, Either[Throwable, A]] and returns a Stream[F, A] that errors out with Stream.raiseError when it encouters a throwable.
Your complete stack would then be a Stream[F, Either[Throwable, Option[A]]] and you unwrap into Stream[F,A] by calling .rethrow.unNoneTerminate
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With