Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I send items to a Kotlin.Flow (like a Behaviorsubject)

I wanted to know how can I send/emit items to a Kotlin.Flow, so my use case is:

In the consumer/ViewModel/Presenter I can subscribe with the collect function:

fun observe() {  coroutineScope.launch {     // 1. Send event     reopsitory.observe().collect {       println(it)     }   } } 

But the issue is in the Repository side, with RxJava we could use a Behaviorsubject expose it as an Observable/Flowable and emit new items like this:

behaviourSubject.onNext(true) 

But whenever I build a new flow:

flow {  } 

I can only collect. How can I send values to a flow?

like image 634
Joaquim Ley Avatar asked Aug 04 '19 09:08

Joaquim Ley


People also ask

Is kotlin flow experimental?

Note: Flow is experimental in Kotlin 1.3 but will likely be stable in Kotlin 1.4.

What is shared flow in Kotlin?

SharedFlow is a Flow that allows for sharing itself between multiple collectors, so that only one flow is effectively run (materialized) for all of the simultaneous collectors.

Is kotlin flow collect blocking?

Flow is an idiomatic way in kotlin to publish sequence of values. While the flow itself suspendable, the collector will block the coroutine from proceeding further.


1 Answers

If you want to get the latest value on subscription/collection you should use a ConflatedBroadcastChannel:

private val channel = ConflatedBroadcastChannel<Boolean>() 

This will replicate BehaviourSubject, to expose the channel as a Flow:

// Repository fun observe() {   return channel.asFlow() }  

Now to send an event/value to that exposed Flow simple send to this channel.

// Repository fun someLogicalOp() {   channel.send(false) // This gets sent to the ViewModel/Presenter and printed. } 

Console:

false

If you wish to only receive values after you start collecting you should use a BroadcastChannel instead.

To make it clear:

Behaves as an Rx's PublishedSubject

private val channel = BroadcastChannel<Boolean>(1)  fun broadcastChannelTest() {   // 1. Send event   channel.send(true)    // 2. Start collecting   channel     .asFlow()     .collect {       println(it)     }    // 3. Send another event   channel.send(false) }  

false

Only false gets printed as the first event was sent before collect { }.


Behaves as an Rx's BehaviourSubject

private val confChannel = ConflatedBroadcastChannel<Boolean>()  fun conflatedBroadcastChannelTest() {   // 1. Send event   confChannel.send(true)    // 2. Start collecting   confChannel     .asFlow()     .collect {       println(it)     }    // 3. Send another event   confChannel.send(false) } 

true

false

Both events are printed, you always get the latest value (if present).

Also, want to mention Kotlin's team development on DataFlow (name pending):

  • https://github.com/Kotlin/kotlinx.coroutines/pull/1354

Which seems better suited to this use case (as it will be a cold stream).

like image 200
Joaquim Ley Avatar answered Oct 17 '22 04:10

Joaquim Ley