I am trying to connect Kafka Connect with elastic search sink. I am not using in confluent but in standalone mode. This is my elasticsearch connector configuration.
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=mysql-jdbc-mall
key.ignore=true
schema.ignore=true
connection.url=http://172.**.*.**:5601
type.name=kafka-connect
elastic.security.protocol=SSL
key.converter.schemas.enable=false
value.converter.schemas.enable=false
My connect-standalone.properties is
bootstrap.servers=Ni****ing:9092
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java
And when I run the connector I get the issue as.
[2020-01-21 09:31:03,676] ERROR Failed to start task elasticsearch-sink-0 (org.apache.kafka.connect.runtime.Worker:464)
io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100)
at io.confluent.connect.avro.AvroConverterConfig.<init>(AvroConverterConfig.java:27)
at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:58)
at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:268)
at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:440)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.createConnectorTasks(StandaloneHerder.java:311)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:336)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:214)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115)
[2020-01-21 09:31:03,677] INFO Created connector elasticsearch-sink (org.apache.kafka.connect.cli.ConnectStandalone:112)
Update
As I dont have a '/etc/schema-registry' file. I changed my connect-standalone.properties to
bootstrap.servers=Nifi-Staging:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java
but when I use JSONConverter I get this error.
[2020-01-21 16:12:04,939] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
java.lang.NullPointerException
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.getServerVersion(JestElasticsearchClient.java:231)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:142)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:133)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:122)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:51)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:300)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-01-21 16:12:04,946] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2020-01-21 16:12:04,946] INFO Stopping ElasticsearchSinkTask (io.confluent.connect.elasticsearch.ElasticsearchSinkTask:190)
io.confluent.connect.avro.AvroConverter
is required to define the schema.registry.url
Remove both schemas.enable
props because they only apply to JSON ; Avro always has a schema, then add the URLs instead
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://...
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://...
You can find a sample Connect property file under the etc/schema-registry
folder
If you are not using Avro, change the converter to match your data. The key and value can be completely different types, too
In addition, the elasticsearch url should be different; such as something running on port 9200, not kibana on 5601
I am not using in confluent but in standalone mode.
I assume you mean the confluent
command? That just runs kafka-connect-distributed
for you, and distributed mode is actually preferred.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With