Below ActiveMQ implementation is present in code. Sometimes, system stops working and become very slow. When I checked thread dump using JavaMelody - I have seen too many threads are on Runnable state for long time and is not being terminated.
ActiveMQ version - activemq-all-5.3.0.jar
Please refer below code :
Broker :
public class ActiveMQ extends HttpServlet {
private static final long serialVersionUID = -1234568008764323456;
private static final Logger logger = Logger.getLogger(ActiveMQ.class.getName());
public Listener listener;
private String msgBrokerUrl = "tcp://localhost:61602";
public BrokerService broker = null;
public TransportConnector connector = null;
@Override
public void init() throws ServletException {
    try {
        broker = new BrokerService();
        broker.setPersistent(false);
        broker.setUseJmx(false);
        connector = broker.addConnector(msgBrokerUrl);
        broker.setUseShutdownHook(true);
        System.out.println("BROKER LOADED");
        broker.start();
        broker.deleteAllMessages();
        listener = new Listener();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
}
Listener:
public class Listener implements MessageListener {
private String msgQueueName = "jms/queue/MessageQueue";
public Session session;
public Destination adminQueue;
public static String id;
public ActiveMQConnection connection;
public MessageConsumer consumer = null;
public Listener() {
    try {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                new URI("failover://(" + "tcp://localhost:61602" + "?wireFormat.cacheEnabled=false"
                        + "&wireFormat.maxInactivityDuration=0&wireFormat.tightEncodingEnabled=true)?maxReconnectDelay=1000"));
        connection = (ActiveMQConnection) connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        adminQueue = session.createQueue(msgQueueName);
        id = new Timestamp(new Date().getTime()).toString();
        consumer = this.session.createConsumer(this.adminQueue, "ID='" + id + "'");
        consumer.setMessageListener(this);
    } catch (JMSException e) {
        e.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
@SuppressWarnings("unchecked")
@Override
public void onMessage(Message message) {
    TextMessage msg = (TextMessage) message;
    try {
        String xmlMsg = msg.getText();
        // business logic
    } catch (JMSException ex) {
        ex.printStackTrace();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
Producer :
public class Producer {
private static String url = "tcp://localhost:61602";
private static String msgQueueName = "jms/queue/MessageQueue";
public ConnectionFactory connectionFactory = null;
public Connection connection = null;
public Session session = null;
public Destination destination = null;
public Producer() {
    connectionFactory = new ActiveMQConnectionFactory(url);
}
public void sendResponse(String xml, DataBean objDataBean) {
    sendToQueue(xml, msgQueueName, objDataBean);
}
private void sendToQueue(String xml, String msgQueueName, DataBean obj) {
    try {
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue(msgQueueName);
        MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage(xml);
        message.setJMSExpiration(1000);
        message.setStringProperty(obj.getMsgKey(), obj.getMsgValue());
        producer.send(message);
        xml = null;
        session.close();
        connection.close();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
public static void main(String[] args) {
    for (int msg = 0; msg < 20; msg++) {
        DataBean obj = getData();
        new Producer().sendResponse(xml, obj);
        ;
    }
}
}
Hanging Threads Exception details :
Type 1 :
ActiveMQ Transport: tcp:///127.0.0.1:41818
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:152)
java.net.SocketInputStream.read(SocketInputStream.java:122)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
java.io.DataInputStream.readInt(DataInputStream.java:387)
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:272)
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
java.lang.Thread.run(Thread.java:745)
Type 2 :
ActiveMQ Transport: tcp://localhost/127.0.0.1:61602
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:152)
java.net.SocketInputStream.read(SocketInputStream.java:122)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:58)
java.io.DataInputStream.readInt(DataInputStream.java:387)
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:272)
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
java.lang.Thread.run(Thread.java:745)
Please could you give some hints on this issue for further investigation.
Edit: I read few posts on internet and concluded that I must update activemq jar file and implement timeout but when I started reading about timeout setting then I got confused whether I should set timeout in producer and consumer or failover or on message or broker service. Timeout at each component has different purpose then where I should implement timeout considering above code and exception.
Creating a connection is very expensive and when you close it, the port is retained for up to 3 minutes to ensure it is shutdown cleanly.
You want to create connections only when you really have to avoid performance problems. I suggest you create the connection once, and keep that connection open unless you get an error. This can improve performance by 2 to 3 orders of magnitude.
This is a good performance tuning pattern which applies in many cases;
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With