Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to DROP_LATEST with coroutine Flow<T>?

Please help. I was trying to do:

    2sec     2sec     2sec
------[A]------[B]------[C]------...----------------> InitailFlow
       \        |        | 
        \      drop      drop
         \
     5sec \    5sec        5sec
----------[1]---------[2]---------[3]-----|> AnotherFlow
result: [A1, A2, A3]

So I have InitailFlow which emits a short amount of time (2 seconds) which is then transformed to AnotherFlow which takes longer to finish (15 seconds in total)... I would like to drop the other incoming items emitted by the InitialFlow while AnotherFlow isn't finished...

I've tried:

flow{
    delay(2000)
    emit("A")
    delay(2000)
    emit("B")
    delay(2000)
    emit("C")
}.buffer(0, BufferOverflow.DROP_LATEST)
    .onEach {
       println("Event for $it")
    }
    .flatMapConcat {
       flow {
           delay(5000)
           emit("${it}1")
           delay(5000)
           emit("${it}2")
           delay(5000)
           emit("${it}3")
        }
     }
     .onEach {
         println(it)
     }
     .launchIn(scope)

But for some reason this is the result:

Event for A
A1
A2
A3
Event for B
B1
B2
B3

It still process Event B for some reason even when I have a .buffer(0, BufferOverflow.DROP_LATEST).

Why does it still process Event B?

Is there a way to do this? I expect the output to be only:

Event for A
A1
A2
A3

Thanks in advance.

like image 523
Archie G. Quiñones Avatar asked Oct 22 '25 09:10

Archie G. Quiñones


2 Answers

This should work for you:

fun <T> Flow<T>.dropIfBusy(): Flow<T> = flow {
    coroutineScope {
        val channel = produce(capacity = Channel.RENDEZVOUS) {
            collect { offer(it) }
        }
        channel.consumeEach { emit(it) }
    }
}

This is basically the "naive" buffer implementation from the kotlin docs
The only difference here is that we use channel.offer instead of channel.send When used in conjunction with a RENDEZVOUS Channel, all values that are offered to the channel, while it is suspended, are dropped, creating your desired behavior.

like image 61
Adrian K Avatar answered Oct 24 '25 09:10

Adrian K


After playing around with @AdrianK's solution for a bit, I actually found a simpler solution using channelFlow. Due to channelFlow currently being experimental API, you have to opt in to use it though.

Like this:

fun <T> Flow<T>.dropIfBusy(): Flow<T> = channelFlow {
    collect { offer(it) }
}.buffer(0)
like image 45
marstran Avatar answered Oct 24 '25 09:10

marstran