4 Replies Latest reply: Apr 7, 2011 1:37 PM by 850110 RSS

    Very high cpu utilization with mq broker

    850110
      Hi all,

      I see a very high cpu utilization (400% on 8 cpu server) when I connect consumers to OpenQ. It increase close to 100% for every consumer I add. Slowly, the consumer comes to a halt, as the producers are sending messages at a good rate too.

      Environment Setup

      Glassfish version 2.1

      com.sun.messaging.jmq Version Information Product Compatibility Version: 4.3 Protocol Version: 4.3 Target JMS API Version: 1.1

      Cluster set up using persistent storage. snippet from broker log.

      Java Runtime: 1.6.0_14 Sun Microsystems Inc. /home/user/foundation/jdk-1.6/jre [06/Apr/2011:12:48:44 EDT] IMQ_HOME=/home/user/foundation/sges/imq [06/Apr/2011:12:48:44 EDT] IMQ_VARHOME=/home/user/foundation/installation/node-agent-server1/server1/imq [06/Apr/2011:12:48:44 EDT] Linux 2.6.18-164.10.1.el5xen i386 server1 (8 cpu) user [06/Apr/2011:12:48:44 EDT] Java Heap Size: max=394432k, current=193920k [06/Apr/2011:12:48:44 EDT] Arguments: -javahome /home/user/foundation/jdk-1.6 -Dimq.autocreate.queue=false -Dimq.autocreate.topic=false -Dimq.cluster.masterbroker=mq://server1:37676/ -Dimq.cluster.brokerlist=mq://server1:37676/,mq://server2:37676/ -Dimq.cluster.nowaitForMasterBroker=true -varhome /home/user/foundation/installation/node-agent-server1/server1/imq -startRmiRegistry -rmiRegistryPort 37776 -Dimq.imqcmd.user=admin -passfile /tmp/asmq5711749746025968663.tmp -save -name clusterservercom -port 37676 -bgnd -silent [06/Apr/2011:12:48:44 EDT] [B1004]: Starting the portmapper service using tcp [ 37676, 50, * ] with min threads 1 and max threads of 1 [06/Apr/2011:12:48:45 EDT] [B1060]: Loading persistent data...

      I followed step in http://middlewaremagic.com/weblogic/?p=4884 to narrow it down to Threads that was causing high cpu. Both were around 94%.



      Following is the stack for those threads.

      "Thread-jms[224]" prio=10 tid=0xd635f400 nid=0x5665 runnable [0xd18fe000] java.lang.Thread.State: RUNNABLE at com.sun.messaging.jmq.jmsserver.data.TransactionList.isConsumedInTransaction(TransactionList.java:697) at com.sun.messaging.jmq.jmsserver.core.Session.detatchConsumer(Session.java:918) - locked <0xf3d35730> (a java.util.Collections$SynchronizedMap) at com.sun.messaging.jmq.jmsserver.core.Session.detatchConsumer(Session.java:810) at com.sun.messaging.jmq.jmsserver.data.handlers.ConsumerHandler.destroyConsumer(ConsumerHandler.java:577) at com.sun.messaging.jmq.jmsserver.data.handlers.ConsumerHandler.handle(ConsumerHandler.java:422) at com.sun.messaging.jmq.jmsserver.data.PacketRouter.handleMessage(PacketRouter.java:181) at com.sun.messaging.jmq.jmsserver.service.imq.IMQIPConnection.readData(IMQIPConnection.java:1489) at com.sun.messaging.jmq.jmsserver.service.imq.IMQIPConnection.process(IMQIPConnection.java:644) at com.sun.messaging.jmq.jmsserver.service.imq.OperationRunnable.process(OperationRunnable.java:170) at com.sun.messaging.jmq.jmsserver.util.pool.BasicRunnable.run(BasicRunnable.java:493) at java.lang.Thread.run(Thread.java:619) Locked ownable synchronizers: - None

      "Thread-jms[214]" prio=10 tid=0xd56c8000 nid=0x566c waiting for monitor entry [0xd2838000] java.lang.Thread.State: BLOCKED (on object monitor) at com.sun.messaging.jmq.jmsserver.data.TransactionInformation.isConsumedMessage(TransactionList.java:2544) - locked <0xdbeeb538> (a com.sun.messaging.jmq.jmsserver.data.TransactionInformation) at com.sun.messaging.jmq.jmsserver.data.TransactionList.isConsumedInTransaction(TransactionList.java:697) at com.sun.messaging.jmq.jmsserver.core.Session.detatchConsumer(Session.java:918) - locked <0xe4c9abf0> (a java.util.Collections$SynchronizedMap) at com.sun.messaging.jmq.jmsserver.core.Session.detatchConsumer(Session.java:810) at com.sun.messaging.jmq.jmsserver.data.handlers.ConsumerHandler.destroyConsumer(ConsumerHandler.java:577) at com.sun.messaging.jmq.jmsserver.data.handlers.ConsumerHandler.handle(ConsumerHandler.java:422) at com.sun.messaging.jmq.jmsserver.data.PacketRouter.handleMessage(PacketRouter.java:181) at com.sun.messaging.jmq.jmsserver.service.imq.IMQIPConnection.readData(IMQIPConnection.java:1489) at com.sun.messaging.jmq.jmsserver.service.imq.IMQIPConnection.process(IMQIPConnection.java:644) at com.sun.messaging.jmq.jmsserver.service.imq.OperationRunnable.process(OperationRunnable.java:170) at com.sun.messaging.jmq.jmsserver.util.pool.BasicRunnable.run(BasicRunnable.java:493) at java.lang.Thread.run(Thread.java:619) Locked ownable synchronizers: - None

      "Thread-jms[213]" prio=10 tid=0xd65be800 nid=0x5670 runnable [0xd1a28000] java.lang.Thread.State: RUNNABLE at com.sun.messaging.jmq.jmsserver.data.TransactionList.isConsumedInTransaction(TransactionList.java:697) at com.sun.messaging.jmq.jmsserver.core.Session.detatchConsumer(Session.java:918) - locked <0xe4c4bad8> (a java.util.Collections$SynchronizedMap) at com.sun.messaging.jmq.jmsserver.core.Session.detatchConsumer(Session.java:810) at com.sun.messaging.jmq.jmsserver.data.handlers.ConsumerHandler.destroyConsumer(ConsumerHandler.java:577) at com.sun.messaging.jmq.jmsserver.data.handlers.ConsumerHandler.handle(ConsumerHandler.java:422) at com.sun.messaging.jmq.jmsserver.data.PacketRouter.handleMessage(PacketRouter.java:181) at com.sun.messaging.jmq.jmsserver.service.imq.IMQIPConnection.readData(IMQIPConnection.java:1489) at com.sun.messaging.jmq.jmsserver.service.imq.IMQIPConnection.process(IMQIPConnection.java:644) at com.sun.messaging.jmq.jmsserver.service.imq.OperationRunnable.process(OperationRunnable.java:170) at com.sun.messaging.jmq.jmsserver.util.pool.BasicRunnable.run(BasicRunnable.java:493) at java.lang.Thread.run(Thread.java:619) Locked ownable synchronizers: - None


      Any ideas will be appreciated.


      --
        • 1. Re: Very high cpu utilization with mq broker
          ak
          The stacktraces showed above indicates the broker was processing a number of closing consumer requests. Closing consumer can be CPU intensive on broker side if there are a lot of messages for the consumer or open transactions. Does your application consume messages in transactions ? If yes, if you lower the value of imq.txn.reapLimit (default 500) - a broker property to limit maximum completed transactions waiting for cleanup - do you see any difference ? Please also note that 4.3 is old. There have been a number of bugs fixed in later releases that affect consumer closing. The latest release is 4.5. You should upgrade your version of MQ.
          • 2. Re: Very high cpu utilization with mq broker
            850110
            Thanks ak, for the response.

            Yes, the messages are consumed in transactions. I set imq.txn.reapLimit=200 in Start Arguments in jvm configuration.

            I verified that it is being set in the log.txt file for the broker:
            -Dimq.autocreate.queue=false -Dimq.autocreate.topic=false -Dimq.txn.reapLimit=250

            It did not make any difference. Do I need to set this property somewhere else ?

            As far as upgrading MQ is concerned, I am using glassfish 2.1. And I think MQ 4.3 is packaged with it. Can you suggest a safe way to upgrade to OpenMQ 4.5 in a running environment. I can bring down the cluster temporarily. Can I just change the jar file somwhere to use MQ4.5 ?


            Here is the snippet of the consumer code :

            I create Connection in @postConstruct and close it in @preDestroy, so that I don't have to do it everytime.


            private ResultMessage[] doRetrieve(String username, String password, String jndiDestination, String filter, int maxMessages, long timeout, RetrieveType type)
            throws InvalidCredentialsException, InvalidFilterException, ConsumerException {

            // ******************************************************************
            // Resources
            // ******************************************************************

            Session session = null;


            try {

            if (log.isTraceEnabled()) log.trace("Creating transacted session with JMS broker.");
            session = connection.createSession(true, Session.SESSION_TRANSACTED);

            // **************************************************************
            // Locate bound destination and create consumer
            // **************************************************************

            if (log.isTraceEnabled()) log.trace("Searching for named destination: " + jndiDestination);
            Destination destination = (Destination) ic.lookup(jndiDestination);

            if (log.isTraceEnabled()) log.trace("Creating consumer for named destination " + jndiDestination);
            MessageConsumer consumer = (filter == null || filter.trim().length() == 0) ? session.createConsumer(destination) : session.createConsumer(destination, filter);

            if (log.isTraceEnabled()) log.trace("Starting JMS connection.");
            connection.start();

            // **************************************************************
            // Consume messages
            // **************************************************************

            if (log.isDebugEnabled()) log.trace("Creating retrieval containers.");
            List<ResultMessage> processedMessages = new ArrayList<ResultMessage>(maxMessages);
            BytesMessage jmsMessage = null;

            for (int i = 0 ; i < maxMessages ; i++) {

            // **********************************************************
            // Attempt message retrieve
            // **********************************************************

            if (log.isTraceEnabled()) log.trace("Attempting retrieval: " + i);
            switch (type) {
            case BLOCKING :
            jmsMessage = (BytesMessage) consumer.receive();
            break;
            case IMMEDIATE :
            jmsMessage = (BytesMessage) consumer.receiveNoWait();
            break;
            case TIMED :
            jmsMessage = (BytesMessage) consumer.receive(timeout);
            break;
            }

            // **********************************************************
            // Process retrieved message
            // **********************************************************

            if (jmsMessage != null) {

            if (log.isTraceEnabled()) log.trace("Message retrieved\n" + jmsMessage);

            // ******************************************************
            // Extract message
            // ******************************************************

            if (log.isTraceEnabled()) log.trace("Extracting result message container from JMS message.");
            byte[] extracted = new byte[(int) jmsMessage.getBodyLength()];
            jmsMessage.readBytes(extracted);



            // ******************************************************
            // Decompress message
            // ******************************************************

            if (jmsMessage.propertyExists(COMPRESSED_HEADER) && jmsMessage.getBooleanProperty(COMPRESSED_HEADER)) {
            if (log.isTraceEnabled()) log.trace("Decompressing message.");
            extracted = decompress(extracted);
            }

            // ******************************************************
            // Done processing message
            // ******************************************************

            if (log.isTraceEnabled()) log.trace("Message added to retrieval container.");

            String signature = jmsMessage.getStringProperty(DIGITAL_SIGNATURE);
            processedMessages.add(new ResultMessage(extracted, signature));

            } else
            if (log.isTraceEnabled()) log.trace("No message was available.");

            }

            // **************************************************************
            // Package return container
            // **************************************************************

            if (log.isTraceEnabled()) log.trace("Packing retrieved messages to return.");
            ResultMessage[] collectorMessages = new ResultMessage[processedMessages.size()];
            for (int i = 0 ; i < collectorMessages.length ; i++)
            collectorMessages[i] = processedMessages.get(i);

            if (log.isTraceEnabled()) log.trace("Returning " + collectorMessages.length + " messages.");
            return collectorMessages;

            } catch (NamingException ex) {
            sessionContext.setRollbackOnly();
            log.error("Unable to locate named queue: " + jndiDestination, ex);
            throw new ConsumerException("Unable to locate named queue: " + jndiDestination, ex);
            } catch (InvalidSelectorException ex) {
            sessionContext.setRollbackOnly();
            log.error("Invalid filter: " + filter, ex);
            throw new InvalidFilterException("Invalid filter: " + filter, ex);
            } catch (IOException ex) {
            sessionContext.setRollbackOnly();
            log.error("Message decompression failed.", ex);
            throw new ConsumerException("Message decompression failed.", ex);
            } catch (GeneralSecurityException ex) {
            sessionContext.setRollbackOnly();
            log.error("Message decryption failed.", ex);
            throw new ConsumerException("Message decryption failed.", ex);
            } catch (JMSException ex) {
            sessionContext.setRollbackOnly();
            log.error("Unable to consumer messages.", ex);
            throw new ConsumerException("Unable to consume messages.", ex);
            } catch (Throwable ex) {
            sessionContext.setRollbackOnly();
            log.error("Unexpected error.", ex);
            throw new ConsumerException("Unexpected error.", ex);
            } finally {
            try {
            if (session != null) session.close();
            } catch (JMSException ex) {
            log.error("Unexpected error.", ex);
            }
            }

            }




            Thanks for your help.

            Edited by: vineet on Apr 7, 2011 10:06 AM
            • 3. Re: Very high cpu utilization with mq broker
              ak
              Broker properties can be set in GlassFish with start-args attribute (using the imqbrokerd's -vmargs option for each property) in jms-service element in domain.xml. If with substantially low imq.txn.reapLimit setting, there is no difference, then unlikely isConsumedInTransaction() is the one for the high CPU usage unless your application had a lot of in-flight transactions at the time the consumers closing.

              You can upgrade GlassFish 2.1 to a GlassFish 2.1 patch that contains version of 4.4u2p2
              create Connection in @postConstruct and close it in @preDestroy, so that I don't have to do it everytime.
              Please see following issue on caching connection in EJB
              http://java.net/jira/browse/GLASSFISH-15558

              Please also note that closing and recreating consumer (closing session or connection closes its consumers) on each Session bean invocation is very costly when there are a lot of messages for the consumer.
              • 4. Re: Very high cpu utilization with mq broker
                850110
                Thanks again !

                These threads were the ones with highest cpu utilization consistently.

                I tested the consumer also by creating non-tansacted session. That did not help either. So, it's not the transacted vs non-transacted sessions.

                But you are right that closing the session, and recreating consumer for every invocation might be resource intensive. But not closing the session every invocation, causes JMS to run out of connection. Also, in a transacted session, the messages are sent, only if the session is committed or closed..

                I cannot really upgrade glassfish version either. I was hoping there would be some article explaining how to configure glassfish v2 to use a different OpenMQ than the one that was packaged. I have installed OpenMQ 4.5, but not sure how to integrate it with glassfish, so it starts and stops with glassfish.

                I will look more into how can I reduce the consumers closing and recreating at every invocation.

                Thanks...