Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kotlin Flow with initial value, replay = 1, no duplicates, no conflation

I need to transfer a stream of values from a publisher / producer to one or multiple consumers / subscribers / observers. The exact requirements are:

  1. the "stream" has a default value
  2. consumers receive the last published value when they subscribe (the last value is replayed)
  3. the last published value can also be retrieved via a value property
  4. two consecutive published values are distinct (no duplicates)
  5. all published values need to be received by the consumer (values cannot be conflated)
  6. consumption happens via a FlowCollector
  7. the implementation must be multiplatform (Android, iOS, JS)

MutableStateFlow is meeting almost all of these requirements except item #5:

Updates to the value are always conflated. So a slow collector skips fast updates, but always collects the most recently emitted value (https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-state-flow/).

So something like this:

val flow = MutableStateFlow(0)

// runs in co-routine 1
flow.collect {
    println("Collect: $it")
    delay(100)
}

// runs in co-routine 2
repeat(10) {
    flow.emit(it + 1)
}

will print 0 and 10 but not the numbers in between because the collector is slow and emitted values are conflated

I could use MutableSharedFlow:

val flow = MutableSharedFlow<Int>(replay = 1, extraBufferCapacity = 1)

MutableSharedFlow doesn't conflate values but 1) has no value property (there's last() but that's a suspend function), 2) allows duplicates and 3) has no initial values. While it's possible to add these three requirements, adding the value property and the duplicate check isn't trivial.

I could use a BehaviorSubject:

val subject = BehaviorSubject(0)
val flow = subject.asFlow().distinctUntilChanged()

That would work perfectly but I'd have to add https://github.com/badoo/Reaktive just for this.

So here's my question: is there a Kotlin Flow solution that meets all the requirements without having to add the missing pieces "manually" (like with MutableSharedFlow)?

like image 971
Emanuel Moecklin Avatar asked Dec 20 '25 19:12

Emanuel Moecklin


1 Answers

It's not so bad to add the missing pieces you want to MutableSharedFlow to have exactly the syntax you want:

class MutableBehaviorFlow<T : Any?>(
    private val initialValue: T,
    private val _backingSharedFlow: MutableSharedFlow<T> = MutableSharedFlow(
        replay = 1,
        extraBufferCapacity = Int.MAX_VALUE,
        onBufferOverflow = BufferOverflow.SUSPEND
    )
) : MutableSharedFlow<T> by _backingSharedFlow {
    init {
        tryEmit(initialValue)
    }

    val value: T
        get() = try {
            replayCache.last()
        } catch (_: NoSuchElementException) {
            initialValue
        }

    override suspend fun emit(value: T) {
        if (value != this.value) _backingSharedFlow.emit(value)
    }

    override fun tryEmit(value: T): Boolean =
        if (value != this.value) _backingSharedFlow.tryEmit(value)
        else true
}
like image 120
Can_of_awe Avatar answered Dec 23 '25 12:12

Can_of_awe



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!