Like I say in the title I want to receive the last windowedBy messages when the producer stops to send menssages. At the moment I am doing it manually, but first of all, a little description.
I have a Kafka producer that is reading lines from a file (every line is a different jSon) every read line is send to Kafka with a difference of 500 ms time period. I have only 120 lines (or jSons).
I have a consumer that consumes all the jSons sent by the producer. The code:
final KStream<String, Aggregate> transactions = builder.stream(kafkaProperties.getTopic(), Consumed.with(Serdes.String(), aggregateSerde));
// Topology
transactions
.groupBy(this::groupedByTimeStampAndProtocolName)
.windowedBy( TimeWindows
.of( Duration.ofSeconds( 10 ))
.grace( Duration.ofMillis( 0 )))
.aggregate(
tool::emptyAggregate,
this::processNewRecord, //new TransactionAggregator(),
Materialized.<String, Aggregate, WindowStore<Bytes, byte[]>>as(TRANSACTION_AGGREGATE)
.withKeySerde(Serdes.String())
.withValueSerde(aggregateSerde)
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach(sendAggregatesToCassandra);
I have the expected functionality, I mean, it receives all the records but to receive the last windowed messages I must to send manually records.
Two questions about this:
I am using Kafka Streams (with spring) in JDK 11 and I am working with dockerized Kafka:
<version.kafka>2.5.0</version.kafka>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${version.kafka}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${version.kafka}</version>
</dependency>
The properties used in the Kafka consumed are:
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getAppId()+Constants.APP_ID);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
And in the producer side:
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.ACKS_CONFIG, "all");
Please, could you help me?
As you are using suppress() (with untilWindowCloses config) the operator will only emit a final result if "stream-time" advances. "stream-time" is computed as a function over the record timestamps and thus, if you no records are processed, "stream-time" would advance and suppress() would never emit anything. Thus, sending more record is the only way how "stream-time" can be advance.
Note: for a streaming use case, the assumption is that data never stops and thus it's not a issue for an actual deployment -- reading from a file as you do, is not a real stream processing use case: I assume you read from a file for a test, and for this case, your input file should contain a few more record to advance stream-time accordingly.
For more details, check out this blog post: https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/
I also did a Kafka Summit talk about this topic: https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With