Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can i add topics to my @kafkalistener at runtime

I have created a bean for the topic array and at runtime i added some topics to this topic array but the consumer did not update the topic and still consuming from the first topics inside the topic array. i would like that the consumer added these new topic and start consuming from them

@Autowired
private String[] topicArray;

@KafkaListener(topics = "#{topicArray}", groupId = "MyGroup")
    public void listen(...) {
        ...
    }
like image 633
Ahmed-Matteli Avatar asked Sep 10 '25 01:09

Ahmed-Matteli


1 Answers

No; the property is evaluated once, during initialization.

You cannot add topics to an existing listener container at runtime.

You can, however, make your listener bean a prototype bean and create a new container each time you want to listen to new topics.

Here's an example:

@SpringBootApplication
public class So68744775Application {

    public static void main(String[] args) {
        SpringApplication.run(So68744775Application.class, args);
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    Foo foo(String id, String[] topics) {
        return new Foo(id, topics);
    }

    @Bean
    public ApplicationRunner runner(ApplicationContext context) {
        return args -> {
            context.getBean(Foo.class, "one", new String[] { "topic1", "topic2" });
            context.getBean(Foo.class, "two", new String[] { "topic3" });
        };
    }

}

class Foo {

    private final String id;

    private final String[] topics;

    public Foo(String id, String[] topics) {
        this.id = id;
        this.topics = topics;
    }

    public String getId() {
        return this.id;
    }

    public String[] getTopics() {
        return this.topics;
    }

    @KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topics}")
    public void listen(String in) {
        System.out.println(in);
    }

}

Note that it is better, however, to omit the groupId so each container is in its own group (the id property). This avoids an unnecessary rebalance when the new container is added.

like image 86
Gary Russell Avatar answered Sep 12 '25 21:09

Gary Russell