Natural aka. smart batching is a technique in stream processing that optimizes throughput without affecting latency. On the example of a concurrent queue, the consumer has the ability to atomically drain all the items observed at some instant and then process them as a batch. Ideally, the queue should be bounded, giving an upper limit to the batch size and providing backpressure to the sender at the same time.
It's called "natural" batching because there's no imposed batch size: when the traffic is low, it will process each item as soon as it arrives. In that case you don't need any throughput optimizations by batching items together. When the traffic gets higher, the consumer will automatically start processing larger batches, amortizing the fixed latency of a single operation like a database INSERT.
I wrote some code that achieves the basic goal:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
const val batchLimit = 20
@ObsoleteCoroutinesApi
suspend inline fun <T: Any> ReceiveChannel<T>.consumeBatched(
        handleItems: (List<T>) -> Unit
) {
    val buf = mutableListOf<T>()
    while (true) {
        receiveOrNull()?.also { buf.add(it) } ?: break
        for (x in 2..batchLimit) {
            poll()?.also { buf.add(it) } ?: break
        }
        handleItems(buf)
        buf.clear()
    }
}
We can test it with this:
@ObsoleteCoroutinesApi
@ExperimentalCoroutinesApi
fun main() {
    val chan = generateMockTraffic()
    runBlocking {
        chan.consumeBatched { println("Received items: $it") }
    }
}
@ExperimentalCoroutinesApi
private fun generateMockTraffic(): ReceiveChannel<Int> {
    return GlobalScope.produce(capacity = batchLimit) {
        (1..100).forEach {
            send(it)
            if (it % 10 == 0) {
                delay(1)
            }
        }
    }
}
consumeBatched() polls the queue one item at a time and therefore must additionally impose a batch limit. It would be more optimal if written against a concurrent queue like the Agrona project's OneToOneConcurrentArrayQueue, which supports the drain operation.
Is there a better approach with Kotlin channels, with more support from the library?
If not, would this be considered as a feature to add?
Is there a better approach with Kotlin channels, with more support from the library?
The library does not have a support for this feature.
If not, would this be considered as a feature to add?
It depends on the desired API surface. drain member is unlikely to be fit for channel semantics: it constraints implementation, it should somehow expose drain limit and it gives channel more "collection-like" API. E.g. how should drain behave with an unlimited channel? Is it possible to implement drain in an efficient manner (with pre-sized buffer, but avoiding OOMs and unlimited collections) once and use it with any channel implementation?
What could be improved is additional hints from the channel such as expected capacity and count of enqueued elements. They can have a relaxed semantics with default implementation and act like hints to drain extension with some reasonable configurable upper bounds. Such API can be added in the future, feel free to create an issue 
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With