I'm trying to create a multi-threaded listener, but all of the messages execute in the same thread. When run, the thread ID is always the same, even thought the KafkaListerContainerFactory is (correctly) the one that I instantiated. If I send 7 messages nearly simultaneously, I would expect the first three to process concurrently, then the next three concurrently, and then the last one. What I see is that the first processes to completion, then the second, then the third, etc. Am I misunderstanding something, or just misconfiguring?
This is my listener:
@Component
public class ExampleKafkaController {
Log log = Log.getLog(ExampleKafkaController.class);
@Autowired
private KafkaListenerContainerFactory kafkaListenerContainerFactory;
@KafkaListener(topics = "${kafka.example.topic}")
public void listenForMessage(ConsumerRecord<?, ?> record) {
log.info("Got record:\n" + record.value());
System.out.println("Kafka Thread: " + Thread.currentThread());
System.out.println(kafkaListenerContainerFactory);
log.info("Waiting...");
waitSleep(10000);
log.info("Done!");
}
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.example.topic}")
public String topic;
public void send(String payload) {
log.info("sending payload='" + payload + "' to topic='" + topic + "'");
kafkaTemplate.send(topic, payload);
}
private void waitSleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
And this is my application with the new config:
@SpringBootApplication
@ComponentScan("net.reigrut.internet.services.example.*")
@EntityScan("net.reigrut.internet.services.example.*")
@EnableKafka
@Configuration
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Autowired
ConsumerFactory<Integer,String> consumerFactory;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(3);
System.out.println("===========>" + consumerFactory);
System.out.println(factory);
return factory;
}
}
With Kafka, concurrency is limited to the number of partitions in the topic. If there is only one partition, you will only receive messages on one thread, regardless of the container's concurrency setting.
You should provision the number of partitions to be greater than or equal to the concurrency you desire. If the number of partitions is greater than the concurrency, the partitions will be distributed across the consumer threads.
Only one consumer in a group can consume from a partition.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With