Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to test Kafka Streams applications with Spring Kafka?

I am writing a streaming application with Kafka Streams, Spring-Kafka and Spring Boot. I cannot find any information how to properly test stream processing done by Kafka Streams DSL while using Spring-Kafka. Documentation mentions EmbeddedKafkaBroker but there seems to be no information on how to handle testing for example state stores.

Just to provide some simple example of what I would like to test. I have a following bean registered (where Item is avro generated):


    @Bean
    public KTable<String, Long> itemTotalKTable(StreamsBuilder streamsBuilder) {
        return streamsBuilder
                .stream(ITEM_TOPIC,
                        Consumed.with(Serdes.String(), itemAvroSerde))
                .mapValues((id, item) -> item.getNumber())
                .groupByKey()
                .aggregate(
                        () -> 0L,
                        (id, number, agg) -> agg + number,
                        Materialized.with(Serdes.String(), Serdes.Long()));
    }

What is a proper way to test that all item numbers are aggregated?

like image 556
redfox Avatar asked Oct 23 '25 17:10

redfox


1 Answers

Spring Kafka for Kafka Streams support doesn't bring any extra API, especially in streams building and their processing.

We have opened recently for ourselves that there is a good kafka-streams-test-utils library to be used in unit tests without any Kafka broker start (even embedded).

In several our tests we have something like this:

    KStream<String, String> stream = builder.stream(INPUT);
    stream
            .transform(() -> enricher)
            .to(OUTPUT);

    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
    TopologyTestDriver driver = new TopologyTestDriver(builder.build(), config);

    ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(),
            new StringSerializer());
    driver.pipeInput(recordFactory.create(INPUT, "key", "value"));
    ProducerRecord<byte[], byte[]> result = driver.readOutput(OUTPUT);
    assertThat(result.headers().lastHeader("foo")).isNotNull();

I believe there should some API in that TopologyTestDriver to deal with the mentioned state store.

like image 129
Artem Bilan Avatar answered Oct 26 '25 14:10

Artem Bilan



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!