I'm trying to use Kafka Streams (i.e. not a simple Kafka Consumer) to read from a retry topic with events that have previously failed to process. I wish to consume from the retry topic, and if processing still fails (for example, if an external system is down), I wish to put the event back on the retry topic. Thus I don't want to keep consuming immediately, but instead wait a while before consuming, in order to not flood the systems with messages that are temporarily unprocessable.
Simplified, the code currently does this, and I wish to add a delay to it.
fun createTopology(topic: String): Topology {
    val streamsBuilder = StreamsBuilder()
    streamsBuilder.stream<String, ArchivalData>(topic, Consumed.with(Serdes.String(), ArchivalDataSerde()))
        .peek { key, msg -> logger.info("Received event for key $key : $msg") }
        .map { key, msg -> enrich(msg) }
        .foreach { key, enrichedMsg -> archive(enrichedMsg) }
    return streamsBuilder.build()
}
I have tried to use Window Delay to set this up, but have not managed to get it to work. I could of course do a sleep inside a peek, but that would leave a thread hanging and does not sound like a very clean solution.
The exact details of how the delay would work is not terribly important to my use case. For example, all of these would work fine:
x seconds are all consumed at once. After it begins / finishes to consume, the stream waits x seconds before consuming againx seconds after being put on the topicx seconds between every eventI would be very grateful if someone could provide a few lines of Kotlin or Java code that would accomplish any of the above.
You cannot really pause reading from the input topic using Kafka Streams—the only way to "delay" would be to call a "sleep", but as you mentioned, that blocks the whole thread and is not a good solution.
However, what you can do is to use a stateful processor, e.g., process() (with attached state store) instead of foreach(). If the retry fails, you don't put the record back into the input topic, but you put it into the store and also register a punctuation with desired retry delay. If the punctuation fires, you retry and if the retry succeeds, you delete the entry from the store and cancel the punctuation; otherwise, you wait until the punctuation fires again.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With