I have an issue using rabbitMQ to send a message from a service A to service B which also sends a notification to service C , the problem is, i had to put the @RabbitListener and the Rabbittemplate in the same method like so :
@Autowired private RabbitTemplate template;
@RabbitListener(queues=RabbitConfig.QUEUETD)
public ResponseEntity<String> AddSas_Campaign(SasCampaign sasCampaign){
if
//...
template.convertAndSend(RabbitConfig.EXCHANGE,RabbitConfig.ROUTING_KEY,sasCampaign);
return new ResponseEntity<String>( "New line inserted ", HttpStatus.OK);}
//...
}
else return new ResponseEntity<String>("Campaign Code exists",HttpStatus.OK);
}
and it is creating and infinite loop of (+2000/min) messages and exceptions non stop.
2021-06-29 14:34:16,560 WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.amqp.rabbit.listener.adapter.ReplyFailureException: Failed to send reply with payload 'InvocationResult [returnValue=<200 OK OK,Test SasCamapign inserted ,[]>, returnType=org.springframework.http.ResponseEntity<java.lang.String>, bean=tn.itserv.services.Sas_CampaignService@4232ecc, method=public org.springframework.http.ResponseEntity tn.itserv.services.Sas_CampaignService.AddSas_Campaign(tn.itserv.entities.SasCampaign)]'
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.doHandleResult(AbstractAdaptableMessageListener.java:476)
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.handleResult(AbstractAdaptableMessageListener.java:400)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:152)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:135)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
... 10 common frames omitted
Caused by: org.springframework.amqp.AmqpException: Cannot determine ReplyTo message property value: Request message does not contain reply-to property, and no default response Exchange was set.
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.getReplyToAddress(AbstractAdaptableMessageListener.java:576)
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.doHandleResult(AbstractAdaptableMessageListener.java:472)
... 14 common frames omitted
To be honest I haven't understood 100% how it works ,should I create different Queue to every exchange ? because it works partially (From Service A to B or From B to C ) But when i use both it creates these exceptions.
I've experienced a similar problem in production that rears its head a couple times a year. The concept of 'requeue on failure' seems like a bad idea, but it's what was implemented by the previous developer.
The cause is that the message is put back in the queue on exception. To fix it, you would not throw an exception from your RabbitListener. Try surrounding with a try catch and return a response entity with a server error.
@Autowired private RabbitTemplate template;
@RabbitListener(queues=RabbitConfig.QUEUETD)
public ResponseEntity<String> AddSas_Campaign(SasCampaign sasCampaign){
try {
if
//...
template.convertAndSend(RabbitConfig.EXCHANGE,RabbitConfig.ROUTING_KEY,sasCampaign);
return new ResponseEntity<String>( "New line inserted ", HttpStatus.OK);}
//...
}
else return new ResponseEntity<String>("Campaign Code exists",HttpStatus.OK);
} catch(Exception e) {
return new ResponseEntity<String>("Error on Message Processing", HttpStatus.INTERNAL_SERVER_ERROR);
}
}
So i realized that a @RabbitListener function should have no return value which means void instead of my ResponseEntity so my idea was like to create a new method that calls my method and has no return value:
@RabbitListener(queues=RabbitConfig.QUEUETD)
public void receiveFromService(SasCampaign sasCampaign){
AddSas_Campaign(sasCampaign);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With