Update Coroutines 1.3.0-RC
Working version:
@FlowPreview suspend fun streamTest(): Flow<String> = channelFlow { listener.onSomeResult { result -> if (!isClosedForSend) { offer(result) } } awaitClose { listener.unsubscribe() } } Also checkout this Medium article by Roman Elizarov: Callbacks and Kotlin Flows
Original Question
I have a Flow emitting multiple Strings:
@FlowPreview suspend fun streamTest(): Flow<String> = flowViaChannel { channel -> listener.onSomeResult { result -> if (!channel.isClosedForSend) { channel.sendBlocking(result) } } } After some time I want to unsubscribe from the stream. Currently I do the following:
viewModelScope.launch { beaconService.streamTest().collect { Timber.i("stream value $it") if(it == "someString") // Here the coroutine gets canceled, but streamTest is still executed this.cancel() } } If the coroutine gets canceled, the stream is still executed. There is just no subscriber listening to new values. How can I unsubscribe and stop the stream function?
A solution is not to cancel the flow, but the scope it's launched in.
val job = scope.launch { flow.cancellable().collect { } } job.cancel() NOTE: You should call cancellable() before collect if you want your collector stop when Job is canceled.
You could use the takeWhile operator on Flow.
flow.takeWhile { it != "someString" }.collect { emittedValue -> //Do stuff until predicate is false }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With