I need to transfer a stream of values from a publisher / producer to one or multiple consumers / subscribers / observers. The exact requirements are:
value propertyFlowCollectorMutableStateFlow is meeting almost all of these requirements except item #5:
Updates to the value are always conflated. So a slow collector skips fast updates, but always collects the most recently emitted value (https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-state-flow/).
So something like this:
val flow = MutableStateFlow(0)
// runs in co-routine 1
flow.collect {
println("Collect: $it")
delay(100)
}
// runs in co-routine 2
repeat(10) {
flow.emit(it + 1)
}
will print 0 and 10 but not the numbers in between because the collector is slow and emitted values are conflated
I could use MutableSharedFlow:
val flow = MutableSharedFlow<Int>(replay = 1, extraBufferCapacity = 1)
MutableSharedFlow doesn't conflate values but 1) has no value property (there's last() but that's a suspend function), 2) allows duplicates and 3) has no initial values.
While it's possible to add these three requirements, adding the value property and the duplicate check isn't trivial.
I could use a BehaviorSubject:
val subject = BehaviorSubject(0)
val flow = subject.asFlow().distinctUntilChanged()
That would work perfectly but I'd have to add https://github.com/badoo/Reaktive just for this.
So here's my question: is there a Kotlin Flow solution that meets all the requirements without having to add the missing pieces "manually" (like with MutableSharedFlow)?
It's not so bad to add the missing pieces you want to MutableSharedFlow to have exactly the syntax you want:
class MutableBehaviorFlow<T : Any?>(
private val initialValue: T,
private val _backingSharedFlow: MutableSharedFlow<T> = MutableSharedFlow(
replay = 1,
extraBufferCapacity = Int.MAX_VALUE,
onBufferOverflow = BufferOverflow.SUSPEND
)
) : MutableSharedFlow<T> by _backingSharedFlow {
init {
tryEmit(initialValue)
}
val value: T
get() = try {
replayCache.last()
} catch (_: NoSuchElementException) {
initialValue
}
override suspend fun emit(value: T) {
if (value != this.value) _backingSharedFlow.emit(value)
}
override fun tryEmit(value: T): Boolean =
if (value != this.value) _backingSharedFlow.tryEmit(value)
else true
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With