Spring's Kafka producer embeds type header into messages which specifies to which class the message should be deserialized by a consumer.This is a problem when the producer isn't using Spring Kafka, but the consumer is.In that case, JsonDeserializer cannot deserialize a message and will throw an exception "No type information in headers and no default type provided".
One way to get around this is to set a default deserialization type.This won't work in cases where a single topic contains multiple message schemas.
Another solution I've found is to set
spring.kafka.consumer.properties.spring.json.use.type.headers
to false (in application.properties file).This doesn't do anything as the same exception is thrown again.
How do I make sure that JsonDeserializer ignores type headers?
See this option of that deserializer:
/**
* Set to false to ignore type information in headers and use the configured
* target type instead.
* Only applies if the preconfigured type mapper is used.
* Default true.
* @param useTypeHeaders false to ignore type headers.
* @since 2.2.8
*/
public void setUseTypeHeaders(boolean useTypeHeaders) {
It can be configured via property as:
/**
* Kafka config property for using type headers (default true).
* @since 2.2.3
*/
public static final String USE_TYPE_INFO_HEADERS = "spring.json.use.type.headers";
In this case the logic is going to be like this:
this.typeMapper.setTypePrecedence(this.useTypeHeaders ? TypePrecedence.TYPE_ID : TypePrecedence.INFERRED);
which means that the type for deserialization is inferred from the listener method.
See more info in docs: https://docs.spring.io/spring-kafka/reference/html/#json-serde
This consumerConfig worked for me details can be found here - https://docs.spring.io/spring-kafka/reference/kafka/serdes.html
@Configuration
public class KafkaConsumerConfig {
@Bean
public Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey");
//props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue");
return props;
}
@Bean
public ConsumerFactory<String,Object> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With