I wanted to know how can I send/emit items to a Kotlin.Flow
, so my use case is:
In the consumer/ViewModel/Presenter I can subscribe with the collect
function:
fun observe() { coroutineScope.launch { // 1. Send event reopsitory.observe().collect { println(it) } } }
But the issue is in the Repository
side, with RxJava we could use a Behaviorsubject expose it as an Observable/Flowable
and emit new items like this:
behaviourSubject.onNext(true)
But whenever I build a new flow:
flow { }
I can only collect. How can I send values to a flow?
Note: Flow is experimental in Kotlin 1.3 but will likely be stable in Kotlin 1.4.
SharedFlow is a Flow that allows for sharing itself between multiple collectors, so that only one flow is effectively run (materialized) for all of the simultaneous collectors.
Flow is an idiomatic way in kotlin to publish sequence of values. While the flow itself suspendable, the collector will block the coroutine from proceeding further.
If you want to get the latest value on subscription/collection you should use a ConflatedBroadcastChannel:
private val channel = ConflatedBroadcastChannel<Boolean>()
This will replicate BehaviourSubject
, to expose the channel as a Flow:
// Repository fun observe() { return channel.asFlow() }
Now to send an event/value to that exposed Flow
simple send to this channel.
// Repository fun someLogicalOp() { channel.send(false) // This gets sent to the ViewModel/Presenter and printed. }
Console:
false
If you wish to only receive values after you start collecting you should use a BroadcastChannel
instead.
Behaves as an Rx's PublishedSubject
private val channel = BroadcastChannel<Boolean>(1) fun broadcastChannelTest() { // 1. Send event channel.send(true) // 2. Start collecting channel .asFlow() .collect { println(it) } // 3. Send another event channel.send(false) }
false
Only false
gets printed as the first event was sent before collect { }
.
Behaves as an Rx's BehaviourSubject
private val confChannel = ConflatedBroadcastChannel<Boolean>() fun conflatedBroadcastChannelTest() { // 1. Send event confChannel.send(true) // 2. Start collecting confChannel .asFlow() .collect { println(it) } // 3. Send another event confChannel.send(false) }
true
false
Both events are printed, you always get the latest value (if present).
Also, want to mention Kotlin's team development on DataFlow
(name pending):
Which seems better suited to this use case (as it will be a cold stream).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With