I am using Spring Boot with Kafka, and want to listen to messages on an existing and working broker and topic.
The application.yml
is configured as follows:
spring:
kafka:
topic:
boot: my_topic
bootstrap-servers: hostname:9092
properties:
security.protocol: SSL
consumer:
auto-offset-reset: earliest
group-id: my_group
enable-auto-commit: false
fetch-max-wait: 500
max-poll-records: 1
The following error is being thrown:
2018-02-11 11:36:34.302 WARN 1676 --- [ntainer#0-0-C-1] o.a.k.common.network.SslTransportLayer : Failed to send SSL Close message
java.io.IOException: An existing connection was forcibly closed by the remote host
at sun.nio.ch.SocketDispatcher.write0(Native Method) ~[na:1.8.0_20]
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:51) ~[na:1.8.0_20]
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[na:1.8.0_20]
at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.8.0_20]
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470) ~[na:1.8.0_20]
at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) ~[kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) ~[kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:690) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:47) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.common.network.Selector.close(Selector.java:487) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:368) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.common.network.Selector.poll(Selector.java:291) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:136) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:197) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:248) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013) [kafka-clients-0.10.1.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979) [kafka-clients-0.10.1.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:556) [spring-kafka-1.1.7.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_20]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_20]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]
There is no possibility of changing the configuration of the kafka broker and producer. Is there anything missing in the client side kafka configuration within Spring Boot which can be added to resolve this?
Thanks
It appears your broker is not configured for SSL; I get the same error if I set SSL on the client; my broker doesn't use SSL...
[2018-02-12 11:04:45,700] WARN Unexpected error from /127.0.0.1; closing connection (org.apache.kafka.common.network.Selector) org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 369296128 larger than 104857600)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With