i am new to jms
technology. I am using activeMQ
console for monitoring queues. I am able create queue with message in it. But when I try to remove certain queue.. a exception is coming. Tried many things, but all in vein.. below is my code:
code
BrokerService brokerService = new BrokerService();
try {
brokerService.start(true);
brokerService.start();
brokerService.autoStart();
brokerService.setUseJmx(true);
brokerService.addConnector("tcp://localhost:61616");
Thread.sleep(10000);
System.out.println("brokerService : "+brokerService.isStarted());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
BrokerFacadeSupport facade= new LocalBrokerFacade(brokerService);
try {
QueueViewMBean queue = facade.getQueue(queue_name);
if(queue==null) {
System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
}
int count = this.jmsTemplate.browse(queue_name, new BrowserCallback<Integer>() {
public Integer doInJms(final Session session, final QueueBrowser browser) throws JMSException {
Queue queue1 = browser.getQueue();
Enumeration enumeration = browser.getEnumeration();
int counter = 0;
while (enumeration.hasMoreElements()) {
Message msg = (Message) enumeration.nextElement();
msg.acknowledge();
ActiveMQTextMessage atm = (ActiveMQTextMessage) msg;
atm.setDroppable(true);
atm.setReadOnlyProperties(false);
atm.setReadOnlyBody(false);
atm.acknowledge();
msg.setBooleanProperty("readOnlyProperties", false);
msg.setBooleanProperty("readOnlyBody", false);
msg.setBooleanProperty("droppable", true);
Enumeration enum_ = atm.getPropertyNames();
while(enum_.hasMoreElements()) {
String name = (String) enum_.nextElement();
System.out.println("## : "+name);
}
try {
System.out.println("--"+atm.getJMSMessageID());
System.out.println();
queue.removeMessage(atm.getJMSMessageID());
}catch(Exception ex) {
ex.printStackTrace();
}
System.out.println(String.format("\tFound : %s", msg));
counter += 1;
}
return counter;
}
});
queue.purge();
}catch(Exception ex) {
ex.printStackTrace();
}
output
output to System.out.println(String.format("\tFound : %s", msg));
is :
Found : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:CRM-PC-50101-1528866712471-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:CRM-PC-50101-1528866712471-1:1:1:1, destination = queue://testNexus, transactionId = null, expiration = 0, timestamp = 1528866713408, arrival = 0, brokerInTime = 1528866713410, brokerOutTime = 1529047482640, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@12b82970, marshalledProperties = org.apache.activemq.util.ByteSequence@49abe550, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {_type=com.crm.jms.SampleObject}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {"msg":"hello world"}}
final output console
==> _type : com.crm.jms.SampleObject
--ID:Crm-PC-50101-1528866712471-1:1:1:1:1
java.lang.NullPointerException
at com.crm.jms.controller.QueueController$1.doInJms(QueueController.java:171)
at com.crm.jms.controller.QueueController$1.doInJms(QueueController.java:1)
at org.springframework.jms.core.JmsTemplate$14.doInJms(JmsTemplate.java:1033)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:484)
at org.springframework.jms.core.JmsTemplate.browseSelected(JmsTemplate.java:1027)
at org.springframework.jms.core.JmsTemplate.browse(JmsTemplate.java:989)
at com.crm.jms.controller.QueueController.deleteQueue(QueueController.java:153)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
.
.
.
.
Found : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:Crm-PC-50101-1528866712471-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:Crm-PC-50101-1528866712471-1:1:1:1, destination = queue://testNexus, transactionId = null, expiration = 0, timestamp = 1528866713408, arrival = 0, brokerInTime = 1528866713410, brokerOutTime = 1529049245130, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@5b55f70f, marshalledProperties = org.apache.activemq.util.ByteSequence@3e116ae6, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {_type=com.crm.jms.SampleObject}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {"msg":"hello world"}}
java.lang.NullPointerException
at com.crm.jms.controller.QueueController.deleteQueue(QueueController.java:181)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
.
.
.
.
Why do you acknowledge messages that you are browsing (and not consuming) ?
Enumeration enumeration = browser.getEnumeration();
...
Message msg = (Message) enumeration.nextElement();
msg.acknowledge();
As you noticed it, there is indeed no delete()/remove() method on javax.jms.Queue, but you can maybe reach the same results by just consuming the desired messages (still using the pure JMS API):
MessageConsumer consumer = session.createConsumer(myQueue, "JMSMessageID = '" + messageId + "'");
Message message = consumer.receive(TIMEOUT);
message.acknowledge();
The problem is that you are not starting an BrokerService
so the facade.getQueue(queueName)
method will return null
. Why? Because facade object don't know which active mq server you are asking for.
Of course you are getting some information about your message because you are connecting with an jmsTemplate
to an running broker server.
Exceptions are thrown on queue.purge();
and queue.removeMessage(msg.getJMSMessageID());
because queue
is null
.
I would suggest removing the BrokerService
and BrokerServiceFacade
stuff and get the Queue
object inside the callback by browser.getQueue()
method from QueueBrowser
argument.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With