I have created a bean for the topic array and at runtime i added some topics to this topic array but the consumer did not update the topic and still consuming from the first topics inside the topic array. i would like that the consumer added these new topic and start consuming from them
@Autowired
private String[] topicArray;
@KafkaListener(topics = "#{topicArray}", groupId = "MyGroup")
public void listen(...) {
...
}
No; the property is evaluated once, during initialization.
You cannot add topics to an existing listener container at runtime.
You can, however, make your listener bean a prototype bean and create a new container each time you want to listen to new topics.
Here's an example:
@SpringBootApplication
public class So68744775Application {
public static void main(String[] args) {
SpringApplication.run(So68744775Application.class, args);
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
Foo foo(String id, String[] topics) {
return new Foo(id, topics);
}
@Bean
public ApplicationRunner runner(ApplicationContext context) {
return args -> {
context.getBean(Foo.class, "one", new String[] { "topic1", "topic2" });
context.getBean(Foo.class, "two", new String[] { "topic3" });
};
}
}
class Foo {
private final String id;
private final String[] topics;
public Foo(String id, String[] topics) {
this.id = id;
this.topics = topics;
}
public String getId() {
return this.id;
}
public String[] getTopics() {
return this.topics;
}
@KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topics}")
public void listen(String in) {
System.out.println(in);
}
}
Note that it is better, however, to omit the groupId
so each container is in its own group (the id
property). This avoids an unnecessary rebalance when the new container is added.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With