I am trying to Unit test my kafka consumer. I am trying to use MockConsumer
class which comes with kafka-client
java api.
Below is my configuration code
@Bean
public MockConsumer consumer(){
MockConsumer consumer = new MockConsumer(OffsetResetStrategy.LATEST);
consumer.assign(Arrays.asList(new TopicPartition("test-topic", 0)));
HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(new TopicPartition("test-topic", 0), 0L);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.addRecord(new ConsumerRecord<String, String>("test-topic",0,
0L, "mykey", "myvalue0"));
consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0,
1L, "mykey", "myvalue1"));
consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0,
2L, "mykey", "myvalue2"));
consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0,
3L, "mykey", "myvalue3"));
consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0,
4L, "mykey", "myvalue4"));
HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
endOffsets.put(new TopicPartition("test-topic", 0), 4L);
consumer.updateEndOffsets(endOffsets);
return consumer;
}
Now When I am using this MockConsumer Bean in my test case like below
@Autowired
MockConsumer kafkaConsumer;
@Autowired
@InjectMocks
MyConsumer myConsumer; //this is the class having consumer code. This
//is the class under test
@Test
public void testConsumeWithAutoAssignment() throws Exception {
myConsumer.consumeTopic("test-topic");
}
I am getting exception from
kafkaConsumer.subscribe(topic)
java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive
Please let me know if anyone has found the issue or fixed this.
This is because in the bean you are using consumer.assign(Arrays.asList(new TopicPartition("test-topic", 0)));
which means that the consumer wants to consume from a specific partition (0) from the "test-topic". Then somewhere but I don't see where from the code you provided there is a call to subscribe(topic)
. With subscribe, the consumer becomes part of a consumer group and the Kafka broker assign partitions automatically (for re-balancing). You can't use both : assigning specific partition (USER DEFINED) and subscribing with auto assigning.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With