Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Migrate kafka client to latest version

I want to migrate old Java code which uses org.apache.kafka:kafka-clients:1.0.1-mapr-1803 but I get missing Java methods:

public Producer kafkaProducer() {
    final Properties producerproperties = new Properties();
    producerproperties.put(ProducerConfig.STREAMS_BUFFER_TIME_CONFIG,
        kafkaProducerConfiguration.getBufferTimeInMilliSeconds());
    producerproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
        kafkaProducerConfiguration.getProducerBootstrapServers());
    return new Producer<String, String>(producerproperties);
  }

STREAMS_BUFFER_TIME_CONFIG ("streams.buffer.max.time.ms") is missing.

private void setHeaders(SimpleMapMessage message, ConsumerRecord<K, V> record) {
    message.setHeader("partition", String.valueOf(record.partition()));
    message.setHeader("producer", record.producer());
  }

producer() is missing.

Do you know how I have to replace producer() into the latest kafka client?

Test project: https://github.com/rcbandit111/kafka_streams_migration_poc/blob/main/src/main/java/com/test/portal/account/platform/kafka/consumer/EventConsumer.java

Change implementation 'org.apache.kafka:kafka-clients:1.1.1-mapr-1808-streams-6.1.0' to implementation 'org.apache.kafka:kafka-clients:3.7.0' to get the exception.

like image 964
Peter Penzov Avatar asked Nov 16 '25 07:11

Peter Penzov


1 Answers

To make it able to be compiled:

  1. You could simply add "streams.buffer.max.time.ms" as String instead of a constant:
producerproperties.put("streams.buffer.max.time.ms", kafkaProducerConfiguration.getBufferTimeInMilliSeconds());

(if this should still be a valid config, which I doubt, it will be recognized as property)

  1. Not sure what record.producer() was supposed to achieve, it's also just a String. You could put an empty String there or extract a specific Header that you are sure exists.

However mind there is no guarantee that your code behaves the way you expect when making this large of a jump in versions (also coming from a more specific release). I'd recommend performing unit tests and/or redefining what you actually try to achieve with the code, restructuring the application with current methods and interfaces.

like image 62
kopaka Avatar answered Nov 17 '25 21:11

kopaka