Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

No pending reply: ConsumerRecord

I am trying to use ReplyingKafkaTemplate, and intermittently I keep seeing the message below.

No pending reply: ConsumerRecord(topic = request-reply-topic, partition = 8, offset = 1, CreateTime = 1544653843269, serialized key size = -1, serialized value size = 1609, headers = RecordHeaders(headers = [RecordHeader(key = kafka_correlationId, value = [-14, 65, 21, -118, 70, -94, 72, 87, -113, -91, 92, 72, -124, -110, -64, -94])], isReadOnly = false), key = null, with correlationId: [-18271255759235816475365319231847350110], perhaps timed out, or using a shared reply topic

It would stem from the code below

RequestReplyFuture<K, V, R> future = this.futures.remove(correlationId);
if (future == null) {
  if (this.sharedReplyTopic) {
    if (this.logger.isDebugEnabled()) {
      this.logger.debug(missingCorrelationLogMessage(record, correlationId));
    }
  }
  else if (this.logger.isErrorEnabled()) {
    this.logger.error(missingCorrelationLogMessage(record, correlationId));
  }
}

But happens only intemittently

I have also set the shared replyTopic to false as below and attempted to force a longer timeout

ReplyingKafkaTemplate<String, Object, Object> replyKafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
        replyKafkaTemplate.setSharedReplyTopic(false);
        replyKafkaTemplate.setReplyTimeout(10000);
        return replyKafkaTemplate;

My Container is as below

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    factory.setBatchListener(false);
    factory.getContainerProperties().setPollTimeout(1000);
    factory.getContainerProperties().setIdleEventInterval(10000L);
    factory.setConcurrency(3);
    factory.setReplyTemplate(kafkaTemplate());
    return factory;
}
like image 494
Kenneth Muhia Avatar asked Oct 29 '25 00:10

Kenneth Muhia


1 Answers

If it's intermittent, it's most likely the reply took too long to arrive. The message seems quite clear

perhaps timed out, or using a shared reply topic

Each client side instance must use it's own reply topic or dedicated partition.

EDIT

You get the log if a message is received with a correlation id that does not match the entries currently in this.futures (pending replies). This can only occur under the following circumstances:

  1. The request timed out (in which case there will be a corresponding WARN log).
  2. The template is stop()ped (in which case this.futures is cleared).
  3. An already processed reply is redelivered for some reason (shouldn't happen).
  4. The reply is received before the key is added to this.futures (can't happen since it's inserted before send()ing the record).
  5. The server side sends 2 or more replies for the same request.
  6. Some other application is sending data to the same reply topic. If you can reproduce it with DEBUG logging, it would help because then we log the correlation key on the send as well.
like image 196
Gary Russell Avatar answered Oct 30 '25 15:10

Gary Russell