I'm trying to find a way to re-order messages within a topic partition and send ordered messages to a new topic.
I have Kafka publisher that sends String messages of the following format:
{system_timestamp}-{event_name}?{parameters}
for example:
1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3
Also, we add some message key for each message, to send them to the corresponding partition.
What I want to do is reorder events based on {system-timestamp} part of the message and within a 1-minute window, cause our publishers doesn't guarantee that messages will be sent in accordance with {system-timestamp} value.
For example, we can deliver to the topic, a message with a bigger {system-timestamp} value first.
I've investigated Kafka Stream API and found some examples regarding messages windowing and aggregation:
Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 KStreamBuilder builder = new KStreamBuilder();
 KStream<String, String> stream = builder.stream("events");
 KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion.
    /* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing.
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
                () -> "",  // initial value
                (aggKey, value, aggregate) -> aggregate + "",   // aggregating value
                TimeWindows.of(1000), // intervals in milliseconds
                Serdes.String(), // serde for aggregated value
                "test-store"
        );*/
But what should I do next with this grouped stream? I don't see any 'sort() (e1,e2) -> e1.compareTo(e2)' methods available, also windows could be applied to methods like aggregation(), reduce() ,count() , but I think that I don't need any messages data manipulations.
How can I re-order message in the 1-minute window and send them to another topic?
Here's an outline:
Create a Processor implementation that:
in process() method, for each message:
in the punctuate() method:
The problem with this approach is that punctuate() is not triggered if no new msgs arrive to advance the "stream time". If this is a risk in your case, you can create an external scheduler that sends periodic "tick" messages to each(!) partition of your topic, that your processor should just ignore, but they'll cause punctuate to trigger in the absence of "real" msgs. KIP-138 will address this limitation by adding explicit support for system-time punctuation: https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With