I wrote a simple test using EmbeddedKafkaBroker, I created a test producer and sent a message, but my KafkaListener doesn’t get triggered, so the test fails every time. Is there a way to test my Kafka consumer so I can ensure the test code coverage? I’d like my fake Producer (producerTest) to trigger my “real” Kafka Consumer from inside the testing class and process the messages.
Kafka Consumer:
@Component
@EnableAutoConfiguration
@Slf4j
public class MyKafkaListener {
@KafkaListener(topics = "${kafka.topic.name}")
public void consume(@Payload String message, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
try {
log.info("Reading message: " + message);
//do stuff, process message
} catch (Exception e) {
log.error("Error while reading message from topic", e);
}
}
}
My Test class:
@Slf4j
@ExtendWith(SpringExtension.class)
@ActiveProfiles("local")
@TestInstance(PER_CLASS)
@EmbeddedKafka(topics = { "test-topic" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
@RunWith(SpringRunner.class)
@DirtiesContext
@Disabled
@SpringBootTest
public class MyKafkaListenerTest {
private KafkaTemplate<String, String> producer;
public static final String TEST_KEY = "x";
public static final String TOPIC = "test-topic";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Test
public void myKafkaListener_success_test() throws InterruptedException, ExecutionException {
//insert object first so I can later assert that it was modified after receiving message from producer
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker.getBrokersAsString());
log.info("props {}", producerProps);
Producer<String, String> producerTest = new KafkaProducer(producerProps, new StringSerializer(), new StringSerializer());
producerTest.send(new ProducerRecord(TOPIC, "", TEST_KEY));
Thread.sleep(5000);
//Assertions.assertNull(condition to assert message has been processed);
producerTest.close();
}
I tried debugging my code and the Kafka Listener doesnt get triggered, here's my test application.yaml:
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
jaas:
enabled: true
properties:
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: PLAIN
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username="{USERNAME}" password="{PASSWORD}";
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
properties:
sasl:
mechanism: PLAIN
security:
protocol: SASL_PLAINTEXT
request:
timeout:
ms: 20000
group-id: kafka-list-app
producer:
key-serializer: org.apache.kafka.common.serialization.StringDeserializer
value-serializer: org.apache.kafka.common.serialization.StringDeserializer
retries: 10
properties:
sasl:
mechanism: PLAIN
security:
protocol: SASL_PLAINTEXT
request:
timeout:
ms: 20000
max:
in:
flight:
requests:
per:
connection: 1
kafka:
topic:
name: ${TOPIC_NAME:profil.topic-dev}
I also always get the following error:
Connection to node -1 (kubernetes.docker.internal/127.0.0.1:51131) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []
You will receive this error message if the broker doesn’t use SASL and you specify either SASL_SSL or SASL_PLAINTEXT for the security.protocol property (like you do in the application.yaml file in your question).
Trying instead with either PLAIN (in your specific case) or SSL for security.protocol will exclude that possibility.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With