Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to start @KafkaListener based on start flag

I'm trying to start my KafkaListener only when a flag is set to true.

@Component
public class KafkaTopicConsumer {

//Somehow wrap the listener to only start when a property value is set to true

@KafkaListener(topics = "#{@consumerTopic}", groupId = "#{@groupName}")
public void consumeMessage(ConsumerRecord<String, String> message) throws IOException {
    logger.info("Consumed message from topic: {} with message: {}", message.topic(), message);
}

Is there a way to only make sure the listener is started when said a property such as start.consumer property is set to true? I don't want the listener starting every time the application is started only when I specify that I want it to be started. Is there a good way to approach this use case?

like image 314
Beez Avatar asked Oct 19 '25 09:10

Beez


1 Answers

First, you need to set autoStartup to false and give your container a name. Then you need to start it manually based on a flag using @EventListener.

@Component
public class KafkaTopicConsumer {
    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Value("${start.consumer}")
    private boolean shouldStart;

    @KafkaListener(id = "myListener", autoStartup = "false", topics = "#{@consumerTopic}", groupId = "#{@groupName}")
    public void consumeMessage(ConsumerRecord<String, String> message) throws IOException {
        logger.info("Consumed message from topic: {} with message: {}", message.topic(), message);
    }

    @EventListener
    public void onStarted(ApplicationStartedEvent event) {
        if (shouldStart) {
            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer("myListener");
            listenerContainer.start();    
        }
    }
}

Note: @EventListener will make sure containers are properly loaded, if you use @PostConstruct it probably won't work.

EDIT:

Added the actual reading of the property using the @Value annotation.

Note: This approach has the added flexibility of allowing the start and stop methods to also be called dynamically (using JMX for example) with just a few changes. This facilitates the scenario where we want to disable a consumer and enable it later without restarting the application.

Another good approach, as correctly stated in @Makoton's answer, is to use @ConditionalOnProperty. Just to note that in your example, you can use it with @Component instead of defining the @Bean manually.

@Component
@ConditionalOnProperty(
        value = "start.consumer",
        havingValue = "true")
public class KafkaTopicConsumer { // ...

It all comes down to the level of flexibility you need.

like image 80
rph Avatar answered Oct 21 '25 00:10

rph



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!