I have multiple producers that can send multiple type of event to one kafka topic.
And I have a consumer that must consume all type of message. with different logic for every type of message.
    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(Message<EventOne> event) {
    logger.info("event={}", event);
}
But in this case all messages come to this method not only EventOne
If I implement two method (for every type of message) then all messages come to only one method.
If i implement listener like this:
    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(Message<?> event) {
    logger.info("event={}", event);
}
then I get exception: org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-listener-1] ERROR org.springframework.kafka.listener.LoggingErrorHandler - Error while processing: ConsumerRecord java.lang.IllegalArgumentException: Unrecognized Type: [null]
Please tell how I can implement multiple type consumer?
I found the soultion and it is very simple. So, it is not clear from the question, but I use jsonMessageConverter like this:
    @Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setMessageConverter(new StringJsonMessageConverter());
    return factory;
}
The Jackson library by default do not adds class information for JSON therefore it can't guess what type of i want to deserialize. Solution is the annotation
@JsonTypeInfo(use= JsonTypeInfo.Id.CLASS, include= JsonTypeInfo.As.PROPERTY, property="class")
that adds classinformation to json string. On the consumer side I just write the base class of my event
    @KafkaListener(topics = "test", containerFactory = "kafkaListenerContainerFactory")
public void handleEvent(BasicEvent event) {
    if (event instanceof EventOne) {
        logger.info("type={EventOne}, event={}", event);
    } else {
        logger.info("type={EventTwo}, event={}", event);
    }
}
Hope this info helps someone.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With