I am creating a Web Application for events-planners to better manage their events. Each event they create needs a queue which means the app needs to create a queue when an event is created. So far I have been able to create the queues and they appear in the rabbitmq management console but when I try to add a queue to a listener it brings this error: java.lang.NullPointerException: Cannot invoke "org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.addQueueNames(String[])" because the return value of "co.ke.mpango.backend.config.webSocket.RabbitQueueServiceImpl.getMessageListenerContainerById Here is the code:
@Service
@Log4j2
public class RabbitQueueServiceImpl implements RabbitQueueService
{
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
@Override
public void addNewQueue(String queueName, String exchangeName, String routingKey) {
Queue queue = new Queue(queueName, true, false, false);
Binding binding = new Binding(
queueName,
Binding.DestinationType.QUEUE,
exchangeName,
routingKey,
null
);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(binding);
this.addQueueToListener(exchangeName,queueName);
}
@Override
public void addQueueToListener(String listenerId, String queueName) {
log.info("adding queue : " + queueName + " to listener with id : " + listenerId);
if (!checkQueueExistOnListener(listenerId,queueName)) {
this.getMessageListenerContainerById(listenerId).addQueueNames(queueName);
log.info("queue ");
} else {
log.info("given queue name : " + queueName + " not exist on given listener id : " + listenerId);
}
}
@Override
public void removeQueueFromListener(String listenerId, String queueName) {
log.info("removing queue : " + queueName + " from listener : " + listenerId);
if (checkQueueExistOnListener(listenerId,queueName)) {
this.getMessageListenerContainerById(listenerId).removeQueueNames(queueName);
log.info("deleting queue from rabbit management");
this.rabbitAdmin.deleteQueue(queueName);
} else {
log.info("given queue name : " + queueName + " not exist on given listener id : " + listenerId);
}
}
@Override
public Boolean checkQueueExistOnListener(String listenerId, String queueName) {
try {
log.info("checking queueName : " + queueName + " exist on listener id : " + listenerId);
log.info("getting queueNames");
String[] queueNames = this.getMessageListenerContainerById(listenerId).getQueueNames();
if (queueNames != null) {
log.info("checking " + queueName + " exist on active queues");
for (String name : queueNames) {
log.info("name : " + name + " with checking name : " + queueName);
if (name.equals(queueName)) {
log.info("queue name exist on listener, returning true");
return Boolean.TRUE;
}
}
return Boolean.FALSE;
} else {
log.info("there is no queue exist on listener");
return Boolean.FALSE;
}
} catch (Exception e) {
log.error("Error on checking queue exist on listener");
return Boolean.FALSE;
}
}
private AbstractMessageListenerContainer getMessageListenerContainerById(String listenerId) {
log.info("getting message listener container by id : " + listenerId);
return ((AbstractMessageListenerContainer) this.rabbitListenerEndpointRegistry
.getListenerContainer(listenerId)
);
}
}
Added: How could then one listener be defined?
RabbitListenerEndpointRegistry class is storage for all listener containers(SimpleRabbitListenerContainer, DirectRabbitListenerContainer) and for managing their life cycle. Before taking MessageListenerContainer from RabbitListenerEndpointRegistry you need to register this container inside the registry like this:
SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();
endpoint.setId(LISTENER_CONTAINER_ID);
endpoint.setMessageListener(message -> {
RabbitMQMessage<QueueItem> rabbitMQMessage = (RabbitMQMessage<QueueItem>) SerializationUtils.deserialize(message.getBody());
rabbitMQService.onMessage(rabbitMQMessage); //method where your message will be processed
});
//listenerContainerFactory is instance of DirectRabbitListenerContainerFactory or SimpleRabbitListenerContainerFactory
registry.registerListenerContainer(endpoint, listenerContainerFactory, true);
//start the registry if it is created manually
//registry.start();
and then you can take that container from registry by LISTENER_CONTAINER_ID
registry.getListenerContainer(LISTENER_CONTAINER_ID);
To manage dynamic queues it is better to use DirectMessageListenerContainer, this container will open a separate channel for each queue without restarting the container like SimpleRabbitListenerContainer does after adding a new queue and it will take some time until SimpleRabbitListenerContainer will be ready to receive messages from the queues.
See more differences here: https://docs.spring.io/spring-amqp/reference/html/#choose-container
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With