I am writing a streaming application with Kafka Streams, Spring-Kafka and Spring Boot. I cannot find any information how to properly test stream processing done by Kafka Streams DSL while using Spring-Kafka. Documentation mentions EmbeddedKafkaBroker but there seems to be no information on how to handle testing for example state stores.
Just to provide some simple example of what I would like to test. I have a following bean registered (where Item is avro generated):
@Bean
public KTable<String, Long> itemTotalKTable(StreamsBuilder streamsBuilder) {
return streamsBuilder
.stream(ITEM_TOPIC,
Consumed.with(Serdes.String(), itemAvroSerde))
.mapValues((id, item) -> item.getNumber())
.groupByKey()
.aggregate(
() -> 0L,
(id, number, agg) -> agg + number,
Materialized.with(Serdes.String(), Serdes.Long()));
}
What is a proper way to test that all item numbers are aggregated?
Spring Kafka for Kafka Streams support doesn't bring any extra API, especially in streams building and their processing.
We have opened recently for ourselves that there is a good kafka-streams-test-utils library to be used in unit tests without any Kafka broker start (even embedded).
In several our tests we have something like this:
KStream<String, String> stream = builder.stream(INPUT);
stream
.transform(() -> enricher)
.to(OUTPUT);
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
TopologyTestDriver driver = new TopologyTestDriver(builder.build(), config);
ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(),
new StringSerializer());
driver.pipeInput(recordFactory.create(INPUT, "key", "value"));
ProducerRecord<byte[], byte[]> result = driver.readOutput(OUTPUT);
assertThat(result.headers().lastHeader("foo")).isNotNull();
I believe there should some API in that TopologyTestDriver to deal with the mentioned state store.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With