I'm trying to create Kafka topics using Java. But I get a Exception in thread "main" java.lang.NoSuchFieldError: DEFAULT_SSL_PRINCIPAL_MAPPING_RULES and I can't fix it.
My goal is to create a topic so that when I run my Kafka server to display my topics using this command bin/kafka-topics.sh --list --bootstrap-server localhost:9092, I can actually see my topic in the list.
(the command is from Kafka's official website https://kafka.apache.org/quickstart)
I looked up to this problem How to create a Topic in Kafka through Java which actually inspired my code, but not only it doesn't really help, but it seems like it uses deprecated classes and methods.
I tried to use what I believed to be more recent classes such as ZooKeeperClient, KafkaZkClient and AdminZkClient, but from what I understand, the method adminZkClient.createTopic(topic, noOfPartitions, noOfReplication, topicConfiguration, RackAwareMode.Disabled$.MODULE$); is what brings the exception. I don't know what it is about that function that creates the exception, whether I forgot something from my application.properties file or something else.
Here's my code
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.kafka.common.utils.Time;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import kafka.zookeeper.ZooKeeperClient;
import kafka.utils.*;
@SpringBootApplication
@EnableJpaAuditing
public class ChatApplication {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
try {
String zookeeperHost = "127.0.0.1:2181";
int sessionTimeOutInMs = 15 * 1000;
int connectionTimeOutInMs = 10 * 1000;
ZooKeeperClient zooKeeperClient = new ZooKeeperClient(zookeeperHost, sessionTimeOutInMs, connectionTimeOutInMs, 2, Time.SYSTEM, "BytesInPerSec", "BytesOutPerSec");
KafkaZkClient kafkaZkClient = new KafkaZkClient(zooKeeperClient, true, Time.SYSTEM);
String topic = "superTopic";
int noOfPartitions = 2;
int noOfReplication = 1;
Properties topicConfiguration = new Properties();
AdminZkClient adminZkClient = new AdminZkClient(kafkaZkClient);
adminZkClient.createTopic(topic, noOfPartitions, noOfReplication, topicConfiguration, RackAwareMode.Disabled$.MODULE$);
} catch (Exception ex) {
ex.printStackTrace();
}
}
//SpringApplication.run(ChatApplication.class, args);
}
Here's the output I get :
06:50:06.796 [main] INFO kafka.utils.Log4jControllerRegistration$ - Registered kafka:type=kafka.Log4jController MBean
06:50:07.101 [main] INFO kafka.zookeeper.ZooKeeperClient - [ZooKeeperClient] Initializing a new session to 127.0.0.1:2181.
06:50:07.116 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:zookeeper.version=3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 00:39 GMT
06:50:07.116 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:host.name=kunta
06:50:07.116 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.version=11.0.3
06:50:07.116 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.vendor=Oracle Corporation
06:50:07.116 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.home=/usr/lib/jvm/java-11-openjdk-amd64
06:50:07.116 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.class.path=/home/robscientist/STS-WORKSPACE/poc_chat/target/classes:/home/robscientist/.m2/repository/org/springframework/boot/spring-boot-starter-web/2.1.3.RELEASE/spring-boot-starter-web-2.1.3.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/boot/spring-boot-starter/2.1.3.RELEASE/spring-boot-starter-2.1.3.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/boot/spring-boot-starter-logging/2.1.3.RELEASE/spring-boot-starter-logging-2.1.3.RELEASE.jar:/home/robscientist/.m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar:/home/robscientist/.m2/repository/ch/qos/logback/logback-core/1.2.3/logback-core-1.2.3.jar:/home/robscientist/.m2/repository/org/apache/logging/log4j/log4j-to-slf4j/2.11.2/log4j-to-slf4j-2.11.2.jar:/home/robscientist/.m2/repository/org/apache/logging/log4j/log4j-api/2.11.2/log4j-api-2.11.2.jar:/home/robscientist/.m2/repository/org/slf4j/jul-to-slf4j/1.7.25/jul-to-slf4j-1.7.25.jar:/home/robscientist/.m2/repository/javax/annotation/javax.annotation-api/1.3.2/javax.annotation-api-1.3.2.jar:/home/robscientist/.m2/repository/org/yaml/snakeyaml/1.23/snakeyaml-1.23.jar:/home/robscientist/.m2/repository/org/springframework/boot/spring-boot-starter-json/2.1.3.RELEASE/spring-boot-starter-json-2.1.3.RELEASE.jar:/home/robscientist/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jsr310/2.9.8/jackson-datatype-jsr310-2.9.8.jar:/home/robscientist/.m2/repository/com/fasterxml/jackson/module/jackson-module-parameter-names/2.9.8/jackson-module-parameter-names-2.9.8.jar:/home/robscientist/.m2/repository/org/springframework/boot/spring-boot-starter-tomcat/2.1.3.RELEASE/spring-boot-starter-tomcat-2.1.3.RELEASE.jar:/home/robscientist/.m2/repository/org/apache/tomcat/embed/tomcat-embed-core/9.0.16/tomcat-embed-core-9.0.16.jar:/home/robscientist/.m2/repository/org/apache/tomcat/embed/tomcat-embed-el/9.0.16/tomcat-embed-el-9.0.16.jar:/home/robscientist/.m2/repository/org/apache/tomcat/embed/tomcat-embed-websocket/9.0.16/tomcat-embed-websocket-9.0.16.jar:/home/robscientist/.m2/repository/org/hibernate/validator/hibernate-validator/6.0.14.Final/hibernate-validator-6.0.14.Final.jar:/home/robscientist/.m2/repository/javax/validation/validation-api/2.0.1.Final/validation-api-2.0.1.Final.jar:/home/robscientist/.m2/repository/org/jboss/logging/jboss-logging/3.3.2.Final/jboss-logging-3.3.2.Final.jar:/home/robscientist/.m2/repository/com/fasterxml/classmate/1.4.0/classmate-1.4.0.jar:/home/robscientist/.m2/repository/org/springframework/spring-web/5.1.5.RELEASE/spring-web-5.1.5.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/spring-webmvc/5.1.5.RELEASE/spring-webmvc-5.1.5.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/spring-aop/5.1.5.RELEASE/spring-aop-5.1.5.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/spring-expression/5.1.5.RELEASE/spring-expression-5.1.5.RELEASE.jar:/home/robscientist/.m2/repository/io/debezium/debezium-core/0.9.5.Final/debezium-core-0.9.5.Final.jar:/home/robscientist/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.9.8/jackson-core-2.9.8.jar:/home/robscientist/.m2/repository/io/debezium/debezium-core/0.9.5.Final/debezium-core-0.9.5.Final-tests.jar:/home/robscientist/.m2/repository/org/springframework/spring-jdbc/5.1.5.RELEASE/spring-jdbc-5.1.5.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/spring-beans/5.1.5.RELEASE/spring-beans-5.1.5.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/spring-core/5.1.5.RELEASE/spring-core-5.1.5.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/spring-jcl/5.1.5.RELEASE/spring-jcl-5.1.5.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/spring-tx/5.1.5.RELEASE/spring-tx-5.1.5.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/boot/spring-boot-starter-websocket/2.1.3.RELEASE/spring-boot-starter-websocket-2.1.3.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/spring-messaging/5.1.5.RELEASE/spring-messaging-5.1.5.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/spring-websocket/5.1.5.RELEASE/spring-websocket-5.1.5.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/boot/spring-boot-starter-data-jpa/2.1.3.RELEASE/spring-boot-starter-data-jpa-2.1.3.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/boot/spring-boot-starter-aop/2.1.3.RELEASE/spring-boot-starter-aop-2.1.3.RELEASE.jar:/home/robscientist/.m2/repository/org/aspectj/aspectjweaver/1.9.2/aspectjweaver-1.9.2.jar:/home/robscientist/.m2/repository/org/springframework/boot/spring-boot-starter-jdbc/2.1.3.RELEASE/spring-boot-starter-jdbc-2.1.3.RELEASE.jar:/home/robscientist/.m2/repository/com/zaxxer/HikariCP/3.2.0/HikariCP-3.2.0.jar:/home/robscientist/.m2/repository/javax/transaction/javax.transaction-api/1.3/javax.transaction-api-1.3.jar:/home/robscientist/.m2/repository/javax/xml/bind/jaxb-api/2.3.1/jaxb-api-2.3.1.jar:/home/robscientist/.m2/repository/javax/activation/javax.activation-api/1.2.0/javax.activation-api-1.2.0.jar:/home/robscientist/.m2/repository/org/hibernate/hibernate-core/5.3.7.Final/hibernate-core-5.3.7.Final.jar:/home/robscientist/.m2/repository/javax/persistence/javax.persistence-api/2.2/javax.persistence-api-2.2.jar:/home/robscientist/.m2/repository/org/javassist/javassist/3.23.1-GA/javassist-3.23.1-GA.jar:/home/robscientist/.m2/repository/net/bytebuddy/byte-buddy/1.9.10/byte-buddy-1.9.10.jar:/home/robscientist/.m2/repository/antlr/antlr/2.7.7/antlr-2.7.7.jar:/home/robscientist/.m2/repository/org/jboss/jandex/2.0.5.Final/jandex-2.0.5.Final.jar:/home/robscientist/.m2/repository/org/dom4j/dom4j/2.1.1/dom4j-2.1.1.jar:/home/robscientist/.m2/repository/org/hibernate/common/hibernate-commons-annotations/5.0.4.Final/hibernate-commons-annotations-5.0.4.Final.jar:/home/robscientist/.m2/repository/org/springframework/data/spring-data-jpa/2.1.5.RELEASE/spring-data-jpa-2.1.5.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/data/spring-data-commons/2.1.5.RELEASE/spring-data-commons-2.1.5.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/spring-orm/5.1.5.RELEASE/spring-orm-5.1.5.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/spring-aspects/5.1.5.RELEASE/spring-aspects-5.1.5.RELEASE.jar:/home/robscientist/.m2/repository/com/h2database/h2/1.4.197/h2-1.4.197.jar:/home/robscientist/.m2/repository/org/webjars/stomp-websocket/2.3.3/stomp-websocket-2.3.3.jar:/home/robscientist/.m2/repository/org/webjars/bootstrap/3.3.7/bootstrap-3.3.7.jar:/home/robscientist/.m2/repository/org/webjars/jquery/1.11.1/jquery-1.11.1.jar:/home/robscientist/.m2/repository/org/springframework/kafka/spring-kafka/2.2.4.RELEASE/spring-kafka-2.2.4.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/spring-context/5.1.5.RELEASE/spring-context-5.1.5.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/retry/spring-retry/1.2.4.RELEASE/spring-retry-1.2.4.RELEASE.jar:/home/robscientist/.m2/repository/org/apache/kafka/kafka-clients/2.0.1/kafka-clients-2.0.1.jar:/home/robscientist/.m2/repository/org/lz4/lz4-java/1.4.1/lz4-java-1.4.1.jar:/home/robscientist/.m2/repository/org/xerial/snappy/snappy-java/1.1.7.1/snappy-java-1.1.7.1.jar:/home/robscientist/.m2/repository/org/apache/kafka/kafka_2.12/2.2.0/kafka_2.12-2.2.0.jar:/home/robscientist/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.9.8/jackson-databind-2.9.8.jar:/home/robscientist/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.9.0/jackson-annotations-2.9.0.jar:/home/robscientist/.m2/repository/com/fasterxml/jackson/datatype/jackson-datatype-jdk8/2.9.8/jackson-datatype-jdk8-2.9.8.jar:/home/robscientist/.m2/repository/net/sf/jopt-simple/jopt-simple/5.0.4/jopt-simple-5.0.4.jar:/home/robscientist/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar:/home/robscientist/.m2/repository/org/scala-lang/scala-library/2.12.8/scala-library-2.12.8.jar:/home/robscientist/.m2/repository/org/scala-lang/scala-reflect/2.12.8/scala-reflect-2.12.8.jar:/home/robscientist/.m2/repository/com/typesafe/scala-logging/scala-logging_2.12/3.9.0/scala-logging_2.12-3.9.0.jar:/home/robscientist/.m2/repository/org/slf4j/slf4j-api/1.7.25/slf4j-api-1.7.25.jar:/home/robscientist/.m2/repository/com/101tec/zkclient/0.11/zkclient-0.11.jar:/home/robscientist/.m2/repository/org/apache/zookeeper/zookeeper/3.4.13/zookeeper-3.4.13.jar:/home/robscientist/.m2/repository/org/slf4j/slf4j-log4j12/1.7.25/slf4j-log4j12-1.7.25.jar:/home/robscientist/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/home/robscientist/.m2/repository/jline/jline/0.9.94/jline-0.9.94.jar:/home/robscientist/.m2/repository/org/apache/yetus/audience-annotations/0.5.0/audience-annotations-0.5.0.jar:/home/robscientist/.m2/repository/io/netty/netty/3.10.6.Final/netty-3.10.6.Final.jar:/home/robscientist/.m2/repository/org/springframework/boot/spring-boot-starter-reactor-netty/2.1.3.RELEASE/spring-boot-starter-reactor-netty-2.1.3.RELEASE.jar:/home/robscientist/.m2/repository/io/projectreactor/netty/reactor-netty/0.8.5.RELEASE/reactor-netty-0.8.5.RELEASE.jar:/home/robscientist/.m2/repository/io/netty/netty-codec-http/4.1.33.Final/netty-codec-http-4.1.33.Final.jar:/home/robscientist/.m2/repository/io/netty/netty-common/4.1.33.Final/netty-common-4.1.33.Final.jar:/home/robscientist/.m2/repository/io/netty/netty-buffer/4.1.33.Final/netty-buffer-4.1.33.Final.jar:/home/robscientist/.m2/repository/io/netty/netty-transport/4.1.33.Final/netty-transport-4.1.33.Final.jar:/home/robscientist/.m2/repository/io/netty/netty-resolver/4.1.33.Final/netty-resolver-4.1.33.Final.jar:/home/robscientist/.m2/repository/io/netty/netty-codec/4.1.33.Final/netty-codec-4.1.33.Final.jar:/home/robscientist/.m2/repository/io/netty/netty-codec-http2/4.1.33.Final/netty-codec-http2-4.1.33.Final.jar:/home/robscientist/.m2/repository/io/netty/netty-handler/4.1.33.Final/netty-handler-4.1.33.Final.jar:/home/robscientist/.m2/repository/io/netty/netty-handler-proxy/4.1.33.Final/netty-handler-proxy-4.1.33.Final.jar:/home/robscientist/.m2/repository/io/netty/netty-codec-socks/4.1.33.Final/netty-codec-socks-4.1.33.Final.jar:/home/robscientist/.m2/repository/io/netty/netty-transport-native-epoll/4.1.33.Final/netty-transport-native-epoll-4.1.33.Final-linux-x86_64.jar:/home/robscientist/.m2/repository/io/netty/netty-transport-native-unix-common/4.1.33.Final/netty-transport-native-unix-common-4.1.33.Final.jar:/home/robscientist/.m2/repository/io/projectreactor/reactor-core/3.2.6.RELEASE/reactor-core-3.2.6.RELEASE.jar:/home/robscientist/.m2/repository/org/reactivestreams/reactive-streams/1.0.2/reactive-streams-1.0.2.jar:/home/robscientist/.m2/repository/org/postgresql/postgresql/42.2.5/postgresql-42.2.5.jar:/home/robscientist/.m2/repository/org/projectlombok/lombok/1.18.6/lombok-1.18.6.jar:/home/robscientist/.m2/repository/org/springframework/boot/spring-boot-devtools/2.0.0.RELEASE/spring-boot-devtools-2.0.0.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/boot/spring-boot/2.1.3.RELEASE/spring-boot-2.1.3.RELEASE.jar:/home/robscientist/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/2.1.3.RELEASE/spring-boot-autoconfigure-2.1.3.RELEASE.jar
06:50:07.116 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.library.path=/usr/java/packages/lib:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
06:50:07.116 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=/tmp
06:50:07.116 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.compiler=<NA>
06:50:07.116 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.name=Linux
06:50:07.116 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.arch=amd64
06:50:07.117 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.version=4.15.0-50-generic
06:50:07.117 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.name=robscientist
06:50:07.117 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.home=/home/robscientist
06:50:07.117 [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.dir=/home/robscientist/STS-WORKSPACE/poc_chat
06:50:07.118 [main] INFO org.apache.zookeeper.ZooKeeper - Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=15000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@5bda8e08
06:50:07.120 [main] DEBUG org.apache.zookeeper.ClientCnxn - zookeeper.disableAutoWatchReset is false
06:50:07.129 [main] DEBUG kafka.utils.KafkaScheduler - Initializing task scheduler.
06:50:07.131 [main] INFO kafka.zookeeper.ZooKeeperClient - [ZooKeeperClient] Waiting until connected.
06:50:07.138 [main-SendThread(localhost:2181)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
06:50:07.144 [main-SendThread(localhost:2181)] INFO org.apache.zookeeper.ClientCnxn - Socket connection established to localhost/127.0.0.1:2181, initiating session
06:50:07.145 [main-SendThread(localhost:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Session establishment request sent on localhost/127.0.0.1:2181
06:50:07.403 [main-SendThread(localhost:2181)] INFO org.apache.zookeeper.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1000016739e0016, negotiated timeout = 15000
06:50:07.409 [main-EventThread] DEBUG kafka.zookeeper.ZooKeeperClient - [ZooKeeperClient] Received event: WatchedEvent state:SyncConnected type:None path:null
06:50:07.415 [main] INFO kafka.zookeeper.ZooKeeperClient - [ZooKeeperClient] Connected.
06:50:07.469 [main-SendThread(localhost:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x1000016739e0016, packet:: clientPath:/brokers/ids serverPath:/brokers/ids finished:false header:: 1,12 replyHeader:: 1,237,0 request:: '/brokers/ids,F response:: v{'0},s{5,5,1559484903664,1559484903664,0,5,0,0,0,1,191}
06:50:07.492 [main-SendThread(localhost:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x1000016739e0016, packet:: clientPath:/brokers/ids/0 serverPath:/brokers/ids/0 finished:false header:: 2,4 replyHeader:: 2,237,0 request:: '/brokers/ids/0,F response:: #7b226c697374656e65725f73656375726974795f70726f746f636f6c5f6d6170223a7b22504c41494e54455854223a22504c41494e54455854227d2c22656e64706f696e7473223a5b22504c41494e544558543a2f2f6b756e74613a39303932225d2c226a6d785f706f7274223a2d312c22686f7374223a226b756e7461222c2274696d657374616d70223a2231353539353332323533363131222c22706f7274223a393039322c2276657273696f6e223a347d,s{191,191,1559532253652,1559532253652,1,0,0,72057690466942976,180,0,191}
06:50:07.690 [main-SendThread(localhost:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Reading reply sessionid:0x1000016739e0016, packet:: clientPath:/brokers/topics/superTopic serverPath:/brokers/topics/superTopic finished:false header:: 3,3 replyHeader:: 3,237,-101 request:: '/brokers/topics/superTopic,F response::
Exception in thread "main" java.lang.NoSuchFieldError: DEFAULT_SSL_PRINCIPAL_MAPPING_RULES
at kafka.server.Defaults$.<init>(KafkaConfig.scala:224)
at kafka.server.Defaults$.<clinit>(KafkaConfig.scala)
at kafka.log.Defaults$.<init>(LogConfig.scala:36)
at kafka.log.Defaults$.<clinit>(LogConfig.scala)
at kafka.log.LogConfig$.<init>(LogConfig.scala:219)
at kafka.log.LogConfig$.<clinit>(LogConfig.scala)
at kafka.zk.AdminZkClient.validateTopicCreate(AdminZkClient.scala:128)
at kafka.zk.AdminZkClient.createTopicWithAssignment(AdminZkClient.scala:86)
at kafka.zk.AdminZkClient.createTopic(AdminZkClient.scala:56)
at com.predisurge.kafka.KafkaTopicCreator.createTopic(KafkaTopicCreator.java:41)
at com.predisurge.ChatApplication.main(ChatApplication.java:32)
After running the command to display my topics, I don't see the one I want to add.
IDE : STS 4, Kafka version : 2.2.0, ZK version : 3.5.5
not sure why you get this exception. Here's a solution that works for me, and just uses the KafkaAdminClient:
@SpringBootApplication
@EnableJpaAuditing
public class TopicCreator {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient kafkaAdminClient = KafkaAdminClient.create(properties);
CreateTopicsResult result = kafkaAdminClient.createTopics(
Stream.of("foo", "bar", "baz").map(
name -> new NewTopic(name, 3, (short) 1)
).collect(Collectors.toList())
);
result.all().get();
}
}
As the above code shows, for newer versions of Kafka you can create topics directly through Kafka. Does this help?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With