There is zip function to zip two Flows. Is there something to zip three (or more) Flows together?
If not, can you help me to implement extension function for it? Something like:
flow.zip(flow2, flow3) { a, b, c ->
}
The simplest involves merging the elements from two flows into one. No modifications are made, no matter from which flow elements originate. To do this, we use the top-level merge function. It is important to know that when we use merge the elements from one flow do not wait for another flow.
Lightweight: You can run many coroutines on a single thread due to support for suspension, which doesn't block the thread where the coroutine is running. Suspending saves memory over blocking while supporting many concurrent operations. Fewer memory leaks: Use structured concurrency to run operations within a scope.
Kotlin coroutines provide an API that enables you to write asynchronous code. With Kotlin coroutines, you can define a CoroutineScope , which helps you to manage when your coroutines should run. Each asynchronous operation runs within a particular scope.
You can check the zip operator implementation and try to copy/emulate how it works adapting it to your needs.
Test it and make all the changes you need
fun <T1, T2, T3, R> Flow<T1>.zip(flow2: Flow<T2>, flow3: Flow<T3>, transform: suspend (T1, T2, T3) -> R): Flow<R> = channelFlow {
val first: ReceiveChannel<T1> = produce {
[email protected] {
channel.send(it)
}
}
val second: ReceiveChannel<T2> = produce {
flow2.collect {
channel.send(it)
}
}
val third: ReceiveChannel<T3> = produce {
flow3.collect {
channel.send(it)
}
}
(second as SendChannel<*>).invokeOnClose {
if (!first.isClosedForReceive) first.cancel(MyFlowException())
if (!third.isClosedForReceive) third.cancel(MyFlowException())
}
(third as SendChannel<*>).invokeOnClose {
if (!first.isClosedForReceive) first.cancel(MyFlowException())
if (!second.isClosedForReceive) second.cancel(MyFlowException())
}
val otherIterator = second.iterator()
val anotherIterator = third.iterator()
try {
first.consumeEach { value ->
if (!otherIterator.hasNext() || !anotherIterator.hasNext()) {
return@consumeEach
}
send(transform(value, otherIterator.next(), anotherIterator.next()))
}
} catch (e: MyFlowException) {
// complete
} finally {
if (!second.isClosedForReceive) second.cancel(MyFlowException())
if (!third.isClosedForReceive) third.cancel(MyFlowException())
}
}
class MyFlowException: CancellationException()
Usage:
flow1.zip(flow2, flow3) { a, b, c ->
//Do your work
}...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With