I'd like to "share" a Mono as I do with Flux.
Flux share() example with Kotlin:
fun `test flux share`() {
val countDownLatch = CountDownLatch(2)
val originalFlux = Flux.interval(Duration.ofMillis(200))
.map { "$it = ${Instant.now()}" }
.take(7)
.share()
.doOnTerminate {
countDownLatch.countDown()
}
println("Starting #1...")
originalFlux.subscribe {
println("#1: $it")
}
println("Waiting ##2...")
CountDownLatch(1).await(1000, TimeUnit.MILLISECONDS)
println("Starting ##2...")
originalFlux.subscribe {
println("##2: $it")
}
countDownLatch.await(10, TimeUnit.SECONDS)
println("End!")
}
I couldn't find a share() operator to Mono. Why doesn't it exist?
I couldn't find a share() operator to Mono. Why doesn't it exist?
The specific behaviour of share() doesn't make much sense with a Mono, but we have cache() which may be what you're after.
share() is equivalent to you calling publish().refcount() on your Flux. Specifically, publish() gives you a ConnectableFlux, or a "hot" flux. (refcount() just automatically connects / stops the flux based on the first / last subscriber.)
The "raison d'être" for ConnectableFlux is allowing multiple subscribers to subscribe whenever they wish, missing the data that was emitted before they subscribed. In the case of Mono this doesn't make a great deal of sense, as by definition there is only one value emitted - so if you've missed it, then you've missed it.
However, we do have cache() on Mono, which also turns it into a "hot" source (where the original supplier isn't called for each subscription, just once on first subscribe.) The obvious difference from above is that the value is replayed for every subscriber, but that's almost certainly what you want.
(Sidenote if you test the above - note that you'll need to use Mono.fromSupplier() rather than Mono.just(), as the latter will just grab the value once at instantiation, thus cache() has no meaningful effect.)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With