I am using Kafka message broker to publish and subscribe event. For that using spring infrastructure. My requirement is I need to create one consumer which will subscribe multiple topic.
Following is the code which is working perfectly fine when it subscribe to single topic.
@KafkaListener(topics = "com.customer.nike")
public void receive(String payload) {
LOGGER.info("received payload='{}'", payload);
}
But I want , it should be subscribe to some pattern of topic. like..
@KafkaListener(topics = "com.cusotmer.*.nike")
public void receive(String payload) {
LOGGER.info("received payload='{}'", payload);
}
In this code * will keep changing. It may be some numeric value like 1000. 1010 and so on. For this I also used SpeL.
@KafkaListener(topics = "#{com.cusotmer.*.nike}")
public void receive(String payload) {
LOGGER.info("received payload='{}'", payload);
}
But this one is also not working for me. Could someone help me to subscribe multiple topic.
Thanks in advance.
Regarding the subscription of multuiple topics, you can use topicPatterns to achieve that:
The topic pattern for this listener. The entries can be 'topic pattern', a 'property-placeholder key' or an 'expression'. The framework will create a container that subscribes to all topics matching the specified pattern to get dynamically assigned partitions. The pattern matching will be performed periodically against topics existing at the time of check. An expression must be resolved to the topic pattern (String or Pattern result types are supported).
Mutually exclusive with topics() and topicPartitions().
@KafkaListener(topicPattern = "com.customer.*")
public void receive(String payload) {
LOGGER.info("received payload='{}'", payload);
}
Regarding the programatic access to the topic name, you can use @Header annotated method to extract a specific header value, defined by KafkaHeaders, which in your case is RECEIVED_TOPIC:
The header containing the topic from which the message was received.
@KafkaListener(topics = "com.customer.nike")
public void receive(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
LOGGER.info("received payload='{}'", payload);
LOG.info("received from topic: {}", topic);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With