1 Reply Latest reply: May 10, 2011 1:22 AM by ak RSS

    Consumer consuming slowly - High UNACK message

    850110
      I have glassfish 2.1 with OpenMQ 4.3.

      I have other posts related to the similar project. But I am seeing all these different symptoms, so I am putting it in different threads.


      I am running into this problem, where the consumers are consuming messages very slowly ( 200/min). It starts off fine (3000/min) but in 20-25 hours it suddenly slows down to the slow rate. If anybody has any suggestion on how to diagnose the exact problem, I will appreciate that. I am also trying out using OpenMQ 4.5 version by just replacing the bin/lib files. But a solution with OpenMQ 4.3 is definitly preferred..

      I also see high number of High Unack messages


      -----------------------------------------------------------------------------------------------------
      Name Type State Producers Consumers Msgs
      Total Wildcard Total Wildcard Count Remote UnAck Avg Size
      -----------------------------------------------------------------------------------------------------
      BillingQueue Queue RUNNING 0 - 0 - 0 0 0 0.0
      testStatsQueue Queue RUNNING 0 - 0 - 72 0 77 1441.6111
      EsafQueue Queue RUNNING 0 - 0 - 0 0 0 0.0
      InitialDestination Topic RUNNING 0 0 5 0 440 80 20 1328.0
      Queue Queue RUNNING 0 - 0 - 0 0 0 0.0
      mq.sys.dmq Queue RUNNING 0 - 0 - 0 0 0 0.0




      Here is the snippet of the consumer code :

      I create Connection in @postConstruct and close it in @preDestroy, so that I don't have to do it everytime.


      private ResultMessage[] doRetrieve(String username, String password, String jndiDestination, String filter, int maxMessages, long timeout, RetrieveType type)
      throws InvalidCredentialsException, InvalidFilterException, ConsumerException {

      // ******************************************************************
      // Resources
      // ******************************************************************

      Session session = null;


      try {

      if (log.isTraceEnabled()) log.trace("Creating transacted session with JMS broker.");
      session = connection.createSession(true, Session.SESSION_TRANSACTED);

      // **************************************************************
      // Locate bound destination and create consumer
      // **************************************************************

      if (log.isTraceEnabled()) log.trace("Searching for named destination: " + jndiDestination);
      Destination destination = (Destination) ic.lookup(jndiDestination);

      if (log.isTraceEnabled()) log.trace("Creating consumer for named destination " + jndiDestination);
      MessageConsumer consumer = (filter == null || filter.trim().length() == 0) ? session.createConsumer(destination) : session.createConsumer(destination, filter);

      if (log.isTraceEnabled()) log.trace("Starting JMS connection.");
      connection.start();

      // **************************************************************
      // Consume messages
      // **************************************************************

      if (log.isDebugEnabled()) log.trace("Creating retrieval containers.");
      List<ResultMessage> processedMessages = new ArrayList<ResultMessage>(maxMessages);
      BytesMessage jmsMessage = null;

      for (int i = 0 ; i < maxMessages ; i++) {

      // **********************************************************
      // Attempt message retrieve
      // **********************************************************

      if (log.isTraceEnabled()) log.trace("Attempting retrieval: " + i);
      switch (type) {
      case BLOCKING :
      jmsMessage = (BytesMessage) consumer.receive();
      break;
      case IMMEDIATE :
      jmsMessage = (BytesMessage) consumer.receiveNoWait();
      break;
      case TIMED :
      jmsMessage = (BytesMessage) consumer.receive(timeout);
      break;
      }

      // **********************************************************
      // Process retrieved message
      // **********************************************************

      if (jmsMessage != null) {

      if (log.isTraceEnabled()) log.trace("Message retrieved\n" + jmsMessage);

      // ******************************************************
      // Extract message
      // ******************************************************

      if (log.isTraceEnabled()) log.trace("Extracting result message container from JMS message.");
      byte[] extracted = new byte[(int) jmsMessage.getBodyLength()];
      jmsMessage.readBytes(extracted);

      // ******************************************************
      // Decompress message
      // ******************************************************

      if (jmsMessage.propertyExists(COMPRESSED_HEADER) && jmsMessage.getBooleanProperty(COMPRESSED_HEADER)) {
      if (log.isTraceEnabled()) log.trace("Decompressing message.");
      extracted = decompress(extracted);
      }

      // ******************************************************
      // Done processing message
      // ******************************************************

      if (log.isTraceEnabled()) log.trace("Message added to retrieval container.");

      String signature = jmsMessage.getStringProperty(DIGITAL_SIGNATURE);
      processedMessages.add(new ResultMessage(extracted, signature));

      } else
      if (log.isTraceEnabled()) log.trace("No message was available.");

      }

      // **************************************************************
      // Package return container
      // **************************************************************

      if (log.isTraceEnabled()) log.trace("Packing retrieved messages to return.");
      ResultMessage[] collectorMessages = new ResultMessage[processedMessages.size()];
      for (int i = 0 ; i < collectorMessages.length ; i++)
      collectorMessages = processedMessages.get(i);

      if (log.isTraceEnabled()) log.trace("Returning " + collectorMessages.length + " messages.");
      return collectorMessages;

      } catch (NamingException ex) {
      sessionContext.setRollbackOnly();
      log.error("Unable to locate named queue: " + jndiDestination, ex);
      throw new ConsumerException("Unable to locate named queue: " + jndiDestination, ex);
      } catch (InvalidSelectorException ex) {
      sessionContext.setRollbackOnly();
      log.error("Invalid filter: " + filter, ex);
      throw new InvalidFilterException("Invalid filter: " + filter, ex);
      } catch (IOException ex) {
      sessionContext.setRollbackOnly();
      log.error("Message decompression failed.", ex);
      throw new ConsumerException("Message decompression failed.", ex);
      } catch (GeneralSecurityException ex) {
      sessionContext.setRollbackOnly();
      log.error("Message decryption failed.", ex);
      throw new ConsumerException("Message decryption failed.", ex);
      } catch (JMSException ex) {
      sessionContext.setRollbackOnly();
      log.error("Unable to consumer messages.", ex);
      throw new ConsumerException("Unable to consume messages.", ex);
      } catch (Throwable ex) {
      sessionContext.setRollbackOnly();
      log.error("Unexpected error.", ex);
      throw new ConsumerException("Unexpected error.", ex);
      } finally {
      try {
      if (session != null) session.close();
      } catch (JMSException ex) {
      log.error("Unexpected error.", ex);
      }
      }

      }

      Edited by: vineet on May 9, 2011 7:53 PM
        • 1. Re: Consumer consuming slowly - High UNACK message
          ak
          The issues that you have posted relating to consumer message delivery in the above test case might be all related to bugs 6961586 and 6972137 which have been fixed in 4.4u2p2 and 4.5. The symtoms caused by these 2 bugs are not limited to the ones in the bug descriptions, e.g. the root cause of bug 6972137 directly affect consumer "busy" state notification. Please upgrade or try 4.5. Since your application runs in GlassFish server, please follow the notes provided in the following thread to try 4.5 with GlassFish 2.x
          How to check the update number for OpenMQ