Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring boot kafka - how to tell JsonDeserializer to ignore type header?

Spring's Kafka producer embeds type header into messages which specifies to which class the message should be deserialized by a consumer.This is a problem when the producer isn't using Spring Kafka, but the consumer is.In that case, JsonDeserializer cannot deserialize a message and will throw an exception "No type information in headers and no default type provided".
One way to get around this is to set a default deserialization type.This won't work in cases where a single topic contains multiple message schemas. Another solution I've found is to set

spring.kafka.consumer.properties.spring.json.use.type.headers

to false (in application.properties file).This doesn't do anything as the same exception is thrown again.
How do I make sure that JsonDeserializer ignores type headers?

like image 696
The light one Avatar asked Oct 23 '25 15:10

The light one


2 Answers

See this option of that deserializer:

/**
 * Set to false to ignore type information in headers and use the configured
 * target type instead.
 * Only applies if the preconfigured type mapper is used.
 * Default true.
 * @param useTypeHeaders false to ignore type headers.
 * @since 2.2.8
 */
public void setUseTypeHeaders(boolean useTypeHeaders) {

It can be configured via property as:

/**
 * Kafka config property for using type headers (default true).
 * @since 2.2.3
 */
public static final String USE_TYPE_INFO_HEADERS = "spring.json.use.type.headers";

In this case the logic is going to be like this:

this.typeMapper.setTypePrecedence(this.useTypeHeaders ? TypePrecedence.TYPE_ID : TypePrecedence.INFERRED);

which means that the type for deserialization is inferred from the listener method.

See more info in docs: https://docs.spring.io/spring-kafka/reference/html/#json-serde

like image 69
Artem Bilan Avatar answered Oct 26 '25 06:10

Artem Bilan


This consumerConfig worked for me details can be found here - https://docs.spring.io/spring-kafka/reference/kafka/serdes.html

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey");
        //props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue");
        return props;
    }

    @Bean
    public ConsumerFactory<String,Object> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
like image 42
Raushan Avatar answered Oct 26 '25 06:10

Raushan



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!