I wrote a JUnit test case to test the code in the "With Java Configuration" lesson in the Spring Kafka docs. (https://docs.spring.io/spring-kafka/reference/htmlsingle/#_with_java_configuration). The onedifference is that I am using an Embedded Kafka Server in the class, instead of a localhost server. I am using Spring Boot 2.0.2 and its Spring-Kafka dependency.
While running this test case, I see that the Consumer is not reading the message from the topic and the "assertTrue" check fails. There are no other errors.
@RunWith(SpringRunner.class)
public class SpringConfigSendReceiveMessage {
public static final String DEMO_TOPIC = "demo_topic";
@Autowired
private Listener listener;
@Test
public void testSimple() throws Exception {
template.send(DEMO_TOPIC, 0, "foo");
template.flush();
assertTrue(this.listener.latch.await(60, TimeUnit.SECONDS));
}
@Autowired
private KafkaTemplate<Integer, String> template;
@Configuration
@EnableKafka
public static class Config {
@Bean
public KafkaEmbedded kafkaEmbedded() {
return new KafkaEmbedded(1, true, 1, DEMO_TOPIC);
}
@Bean
public ConsumerFactory<Integer, String> createConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(createConsumerFactory());
return factory;
}
@Bean
public Listener listener() {
return new Listener();
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
}
}
class Listener {
public final CountDownLatch latch = new CountDownLatch(1);
@KafkaListener(id = "foo", topics = DEMO_TOPIC)
public void listen1(String foo) {
this.latch.countDown();
}
}
I think that this is because the @KafkaListener is using some wrong/default setting when reading from the topic. I dont see any errors in the logs.
Is this unit test case correct? How can i find the object that is created for the KafkaListener annotation and see which Kafka broker it consumes from? Any inputs will be helpful. Thanks.
The message is sent before the consumer starts.
By default, new consumers start consuming at the end of the topic.
Add
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With