Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to split a Kotlin flow into 2 flows?

I am trying to learn coroutines and i am still having some basic issues. I have a flow that emits a sequence of items and i want to split the stream into 2 flows. This is how i would write it in RxJava:

    val list = Flowable.just(1..6).share()
    val even = list.filter { it % 2 == 0 }.subscribe { println(it) } // 2, 4, 6
    val odd = list.filter { it % 2 == 1 }.subscribe { println(it) } // 1, 3, 5

How can i replicate this with Kotlin coroutine flows ? Thanks in advance.

like image 556
Zeyad Gasser Avatar asked Oct 23 '25 09:10

Zeyad Gasser


2 Answers

A family of sharing operators (as well as a hot SharedFlow) are on their way to simplify the kind of workflow you're looking for (using Kotlin Flows).

In the meantime, it's true that flows are cold in nature (and thus you cannot really share them as-is), but they can nevertheless share a hot source to achieve what you need. I provided details on how to do this in this answer.

In short, the end result looks like this:

val original: Flow<String> = flowOf("aap", "noot", "mies", "wim", "zus","jet","weide","does")

// create an implicit hot BroadcastChannel, shared between collectors
// so that they each get all elements (which are each produced only once)
val sharedFlow = original.broadcastIn(scope).asFlow()

// create derived cold flows, which will subscribe (on collect) to the
// same hot source (BroadcastChannel)
val flow1 = sharedFlow.filter { it.length == 4 }
val flow2 = sharedFlow.filter { it.length == 3 }.map { it.toUppercase() }

flow1.collect { it -> println("Four letter: ${it}") }
flow2.collect { it -> println("Three letter: ${it}") }

(This is soon to be replaced by a SharedFlow.)

like image 87
Joffrey Avatar answered Oct 24 '25 22:10

Joffrey


What you did with Rx is somewhat impossible with Kotlin flows, since in your example share() will create a hot observable and flows in Kotlin are by nature cold.

You could use a Channel instead since they represent hot streams in Kotlin.

I'd read this blog-post about Cold flows, hot channels from Roman Elizarov.

like image 31
Róbert Nagy Avatar answered Oct 24 '25 23:10

Róbert Nagy