Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

IllegalStateException Subscription to topics, partitions and pattern are mutually exclusive

Need to fetch messages from a Kafka topic, from a particular offset

Stuck cause of IllegalStateException exception at assign()

If I do not use assign() , then the consumer does not perform seek, as that being a Lazy operation

Actual purpose: Need to iterate messages at topic from a pre-decided offset till end. This pre-decided offset is calculated at markOffset()

static void fetchMessagesFromMarkedOffset() {
    Consumer<Long, String> consumer = ConsumerCreator.createConsumer();
    consumer.assign(set); // <---- Exception at this place
    map.forEach((k,v) -> {
        consumer.seek(k, v-3);
    });
    ConsumerRecords<Long, String> consumerRecords = consumer.poll(100);
    consumerRecords.forEach(record -> {
        System.out.println("Record Key " + record.key());
        System.out.println("Record value " + record.value());
        System.out.println("Record partition " + record.partition());
        System.out.println("Record offset " + record.offset());
    });
    consumer.close();
}

Rest of concerned code involved

public static Set<TopicPartition> set;
public static Map<TopicPartition, Long> map;

static void markOffset() {
    Consumer<Long, String> consumer = ConsumerCreator.createConsumer();
    consumer.poll(100);
    set = consumer.assignment();
    map = consumer.endOffsets(set);
    System.out.println("Topic  Partitions: " + set);
    System.out.println("End Offsets: " + map);
}

Consumer Creation

private Consumer createConsumer(String topicName) {
    final Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "capacity-service-application");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

    final Consumer consumer = new KafkaConsumer(props);
    consumer.subscribe(Collections.singletonList(topicName));
    return consumer;
}   

Exception

Exception in thread "main" java.lang.IllegalStateException: Subscription to topics, partitions and pattern are mutually exclusive
at org.apache.kafka.clients.consumer.internals.SubscriptionState.setSubscriptionType(SubscriptionState.java:104)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromUser(SubscriptionState.java:157)
at org.apache.kafka.clients.consumer.KafkaConsumer.assign(KafkaConsumer.java:1064)
at com.gaurav.kafka.App.fetchMessagesFromMarkedOffset(App.java:44)
at com.gaurav.kafka.App.main(App.java:30)
like image 649
Aditya Rewari Avatar asked Sep 04 '25 02:09

Aditya Rewari


1 Answers

You can't mixed manual and automatic partition assignment. You should use KafkaConsumer::subscribe or KafkaConsumer::assign but not both.

If after calling KafkaConsumer::subscribe you want to switch to manual approach you should first call KafkaConsumer::unsubscribe.

According to https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Note that it isn't possible to mix manual partition assignment (i.e. using assign) with dynamic partition assignment through topic subscription (i.e. using subscribe).

like image 91
Bartosz Wardziński Avatar answered Sep 06 '25 14:09

Bartosz Wardziński