I have a health thread that checks the state of my Kafka cluster every 5 seconds from my worker application. Every now and then however, I get TimeoutException
:
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
I have tools to externally monitor my cluster as well (Cruise Control
, Grafana
) and none of them points to any problems in the cluster. Also, my worker application is constantly consuming messages and none seem to fail.
Why do I occasionally gets this timeout? If the broker is not down, than I am thinking something in my configs is off. I set the timeout for 5 seconds which seems like more than enough.
My AdminClient configs:
@Bean
public AdminClient adminClient() {
return KafkaAdminClient.create(adminClientConfigs());
}
public Map<String, Object> adminClientConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, serverAddress);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
return props;
}
How I check the cluster (I than run logic on the broker list):
@Autowired
private AdminClient adminClient;
private void addCluster() throws ExecutionException, InterruptedException {
adminClient.describeCluster().nodes().get().forEach(node -> brokers.add(node.host()));
}
2 things:
The default request timeout is 30secs. By setting it to a smaller value you augment the risk of timeouts for a slow request. If one request out of 1000 (0.1%) takes more than 5 seconds, because you query it every few seconds, you'll see several failures every day.
To investigate why some calls take longer, you can do several things:
Check the Kafka client logs. describeCluster()
may require to initiate a new connection to the cluster. In that case, the client will also have to send an ApiVersionsRequest
and depending on your config, may establish a TLS connection and/or perform SASL authentication. If any of these happen, it should be clear in the client logs. (You may need to bump the log level a bit to see all these).
Check the broker request metrics. describeCluster()
translate into a MetadataRequest
sent to a broker. You can track the time requests take to be process. See the metrics described in the docs, in your case, especially: kafka.network:type=RequestMetrics,name=*,request=Metadata
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With