I am trying to use ReplyingKafkaTemplate, and intermittently I keep seeing the message below.
No pending reply: ConsumerRecord(topic = request-reply-topic, partition = 8, offset = 1, CreateTime = 1544653843269, serialized key size = -1, serialized value size = 1609, headers = RecordHeaders(headers = [RecordHeader(key = kafka_correlationId, value = [-14, 65, 21, -118, 70, -94, 72, 87, -113, -91, 92, 72, -124, -110, -64, -94])], isReadOnly = false), key = null, with correlationId: [-18271255759235816475365319231847350110], perhaps timed out, or using a shared reply topic
It would stem from the code below
RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
if (future == null) {
  if (this.sharedReplyTopic) {
    if (this.logger.isDebugEnabled()) {
      this.logger.debug(missingCorrelationLogMessage(record, correlationId));
    }
  }
  else if (this.logger.isErrorEnabled()) {
    this.logger.error(missingCorrelationLogMessage(record, correlationId));
  }
}
But happens only intemittently
I have also set the shared replyTopic to false as below and attempted to force a longer timeout
ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
        replyKafkaTemplate.setSharedReplyTopic(false);
        replyKafkaTemplate.setReplyTimeout(10000);
        return replyKafkaTemplate;
My Container is as below
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(false);
    factory.getContainerProperties().setPollTimeout(1000);
    factory.getContainerProperties().setIdleEventInterval(10000L);
    factory.setConcurrency(3);
    factory.setReplyTemplate(kafkaTemplate());
    return factory;
}
If it's intermittent, it's most likely the reply took too long to arrive. The message seems quite clear
perhaps timed out, or using a shared reply topic
Each client side instance must use it's own reply topic or dedicated partition.
EDIT
You get the log if a message is received with a correlation id that does not match the entries currently in this.futures (pending replies). This can only occur under the following circumstances:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With