Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

not able to remove message from jms activemq queue

i am new to jms technology. I am using activeMQ console for monitoring queues. I am able create queue with message in it. But when I try to remove certain queue.. a exception is coming. Tried many things, but all in vein.. below is my code:

code

   BrokerService brokerService = new BrokerService();
        try {
            brokerService.start(true);
            brokerService.start();
            brokerService.autoStart();
            brokerService.setUseJmx(true);
            brokerService.addConnector("tcp://localhost:61616");
            Thread.sleep(10000);
            System.out.println("brokerService : "+brokerService.isStarted());
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }


        BrokerFacadeSupport facade=   new LocalBrokerFacade(brokerService);
        try {
            QueueViewMBean queue = facade.getQueue(queue_name);
            if(queue==null) {
                System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
            }
                    int count = this.jmsTemplate.browse(queue_name, new BrowserCallback<Integer>() {
                public Integer doInJms(final Session session, final QueueBrowser browser) throws JMSException {
                    Queue queue1 = browser.getQueue();
                    Enumeration enumeration = browser.getEnumeration();
                    int counter = 0;
                    while (enumeration.hasMoreElements()) {
                        Message msg = (Message) enumeration.nextElement();
                        msg.acknowledge();
                        ActiveMQTextMessage atm  = (ActiveMQTextMessage) msg;
                        atm.setDroppable(true);
                        atm.setReadOnlyProperties(false);
                        atm.setReadOnlyBody(false);
                        atm.acknowledge();
                        msg.setBooleanProperty("readOnlyProperties", false);
                                    msg.setBooleanProperty("readOnlyBody", false);
                                    msg.setBooleanProperty("droppable", true);
                        Enumeration enum_ = atm.getPropertyNames();
                        while(enum_.hasMoreElements()) {
                            String name  = (String) enum_.nextElement();
                            System.out.println("## : "+name);
                        }
                        try {
                            System.out.println("--"+atm.getJMSMessageID());
                            System.out.println();
                            queue.removeMessage(atm.getJMSMessageID());
                        }catch(Exception ex) {
                            ex.printStackTrace();
                        }
                        System.out.println(String.format("\tFound : %s", msg));
                        counter += 1;
                    }
                    return counter;
                }
            });
            queue.purge();
        }catch(Exception ex) {
            ex.printStackTrace();
        }

output

output to System.out.println(String.format("\tFound : %s", msg)); is :

Found : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:CRM-PC-50101-1528866712471-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:CRM-PC-50101-1528866712471-1:1:1:1, destination = queue://testNexus, transactionId = null, expiration = 0, timestamp = 1528866713408, arrival = 0, brokerInTime = 1528866713410, brokerOutTime = 1529047482640, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@12b82970, marshalledProperties = org.apache.activemq.util.ByteSequence@49abe550, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {_type=com.crm.jms.SampleObject}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {"msg":"hello world"}}

final output console

==>   _type : com.crm.jms.SampleObject
--ID:Crm-PC-50101-1528866712471-1:1:1:1:1
java.lang.NullPointerException
    at com.crm.jms.controller.QueueController$1.doInJms(QueueController.java:171)
    at com.crm.jms.controller.QueueController$1.doInJms(QueueController.java:1)
    at org.springframework.jms.core.JmsTemplate$14.doInJms(JmsTemplate.java:1033)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:484)
    at org.springframework.jms.core.JmsTemplate.browseSelected(JmsTemplate.java:1027)
    at org.springframework.jms.core.JmsTemplate.browse(JmsTemplate.java:989)
    at com.crm.jms.controller.QueueController.deleteQueue(QueueController.java:153)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
.
.
.
.
    Found : ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:Crm-PC-50101-1528866712471-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:Crm-PC-50101-1528866712471-1:1:1:1, destination = queue://testNexus, transactionId = null, expiration = 0, timestamp = 1528866713408, arrival = 0, brokerInTime = 1528866713410, brokerOutTime = 1529049245130, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@5b55f70f, marshalledProperties = org.apache.activemq.util.ByteSequence@3e116ae6, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {_type=com.crm.jms.SampleObject}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = {"msg":"hello world"}}
java.lang.NullPointerException
    at com.crm.jms.controller.QueueController.deleteQueue(QueueController.java:181)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:97)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
.
.
.
.
like image 549
JPG Avatar asked Oct 20 '25 07:10

JPG


2 Answers

Why do you acknowledge messages that you are browsing (and not consuming) ?

Enumeration enumeration = browser.getEnumeration();
...
Message msg = (Message) enumeration.nextElement();
msg.acknowledge();

As you noticed it, there is indeed no delete()/remove() method on javax.jms.Queue, but you can maybe reach the same results by just consuming the desired messages (still using the pure JMS API):

MessageConsumer consumer = session.createConsumer(myQueue, "JMSMessageID = '" + messageId + "'");
Message message = consumer.receive(TIMEOUT);
message.acknowledge();
like image 68
TacheDeChoco Avatar answered Oct 21 '25 21:10

TacheDeChoco


The problem is that you are not starting an BrokerService so the facade.getQueue(queueName) method will return null. Why? Because facade object don't know which active mq server you are asking for.

Of course you are getting some information about your message because you are connecting with an jmsTemplate to an running broker server.

Exceptions are thrown on queue.purge(); and queue.removeMessage(msg.getJMSMessageID()); because queue is null.

I would suggest removing the BrokerService and BrokerServiceFacade stuff and get the Queue object inside the callback by browser.getQueue() method from QueueBrowser argument.

like image 39
Marcin Bukowiecki Avatar answered Oct 21 '25 20:10

Marcin Bukowiecki