Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka MockConsumer throwing exception error java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive

I am trying to Unit test my kafka consumer. I am trying to use MockConsumer class which comes with kafka-client java api. Below is my configuration code

@Bean
public MockConsumer consumer(){

    MockConsumer consumer = new MockConsumer(OffsetResetStrategy.LATEST);
    consumer.assign(Arrays.asList(new TopicPartition("test-topic", 0)));

    HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
    beginningOffsets.put(new TopicPartition("test-topic", 0), 0L);
    consumer.updateBeginningOffsets(beginningOffsets);

    consumer.addRecord(new ConsumerRecord<String, String>("test-topic",0,
            0L, "mykey", "myvalue0"));
    consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0,
            1L, "mykey", "myvalue1"));
    consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0,
            2L, "mykey", "myvalue2"));
    consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0,
            3L, "mykey", "myvalue3"));
    consumer.addRecord(new ConsumerRecord<String, String>("test-topic", 0,
            4L, "mykey", "myvalue4"));
    HashMap<TopicPartition, Long> endOffsets = new HashMap<>();
    endOffsets.put(new TopicPartition("test-topic", 0), 4L);
    consumer.updateEndOffsets(endOffsets);
    return consumer;
}

Now When I am using this MockConsumer Bean in my test case like below

@Autowired
MockConsumer kafkaConsumer;

@Autowired
@InjectMocks
MyConsumer myConsumer; //this is the class having consumer code. This 
                      //is the class under test

@Test
public void testConsumeWithAutoAssignment() throws Exception {
  myConsumer.consumeTopic("test-topic");
}

I am getting exception from

kafkaConsumer.subscribe(topic)

java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive

Please let me know if anyone has found the issue or fixed this.

like image 519
Vijendra Kumar Kulhade Avatar asked Sep 11 '25 15:09

Vijendra Kumar Kulhade


1 Answers

This is because in the bean you are using consumer.assign(Arrays.asList(new TopicPartition("test-topic", 0))); which means that the consumer wants to consume from a specific partition (0) from the "test-topic". Then somewhere but I don't see where from the code you provided there is a call to subscribe(topic). With subscribe, the consumer becomes part of a consumer group and the Kafka broker assign partitions automatically (for re-balancing). You can't use both : assigning specific partition (USER DEFINED) and subscribing with auto assigning.

like image 144
ppatierno Avatar answered Sep 13 '25 06:09

ppatierno