I'm using Spring Boot 2.x with spring-kafka (not spring-integration-kafka)
I have multiple beans annotated with @KafkaListener
... each one consuming from one topic... so since I have 12 topics then I also need to have 12 KafkaConsumers beans ... and I would like to know if I can create those beans programmatically / dynamically ... maybe using KafkaListenerEndpointRegistry in order to create consumer containers dynamically.
Note: I need to consume messages in batch ... so maybe I can use BatchMessageListener?
Current code:
@KafkaListener(
id = COUNTRY,
containerFactory = KAFKA_LISTENER_FACTORY_BEAN_NAME,
topics = {TOPIC},
groupId = GROUP_ID,
clientIdPrefix = CLIENT_ID,
errorHandler = VALIDATION_ERROR_HANDLER_BEAN_NAME
)
@Override
public void consume(final List<MessageDTO> messages,
@Header(KafkaHeaders.RECEIVED_TOPIC) final List<String> topics,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) final List<String> messagesKey,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) final List<Integer> partitionIds,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) final List<Long> timestamps,
@Header(KafkaHeaders.OFFSET) final List<Long> offsets) {
(...)
}
Each topic consumer has its own implementation depending on the topic. Can you guys point me to a blog/pseudocode/git thread/answer, please?
https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/context/support/GenericApplicationContext.html#registerBean-java.lang.Class-java.util.function.Supplier-org.springframework.beans.factory.config.BeanDefinitionCustomizer...-
Create your object and register it as a bean providing it via the Supplier in the above method. Spring will run the bean post processors necessary to set everything up.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With