1 Reply Latest reply on May 10, 2011 6:22 AM by 800561

    Consumer consuming slowly - High UNACK message

    vmiharia
      I have glassfish 2.1 with OpenMQ 4.3.

      I have other posts related to the similar project. But I am seeing all these different symptoms, so I am putting it in different threads.


      I am running into this problem, where the consumers are consuming messages very slowly ( 200/min). It starts off fine (3000/min) but in 20-25 hours it suddenly slows down to the slow rate. If anybody has any suggestion on how to diagnose the exact problem, I will appreciate that. I am also trying out using OpenMQ 4.5 version by just replacing the bin/lib files. But a solution with OpenMQ 4.3 is definitly preferred..

      I also see high number of High Unack messages


      -----------------------------------------------------------------------------------------------------
      Name Type State Producers Consumers Msgs
      Total Wildcard Total Wildcard Count Remote UnAck Avg Size
      -----------------------------------------------------------------------------------------------------
      BillingQueue Queue RUNNING 0 - 0 - 0 0 0 0.0
      testStatsQueue Queue RUNNING 0 - 0 - 72 0 77 1441.6111
      EsafQueue Queue RUNNING 0 - 0 - 0 0 0 0.0
      InitialDestination Topic RUNNING 0 0 5 0 440 80 20 1328.0
      Queue Queue RUNNING 0 - 0 - 0 0 0 0.0
      mq.sys.dmq Queue RUNNING 0 - 0 - 0 0 0 0.0




      Here is the snippet of the consumer code :

      I create Connection in @postConstruct and close it in @preDestroy, so that I don't have to do it everytime.


      private ResultMessage[] doRetrieve(String username, String password, String jndiDestination, String filter, int maxMessages, long timeout, RetrieveType type)
      throws InvalidCredentialsException, InvalidFilterException, ConsumerException {

      // ******************************************************************
      // Resources
      // ******************************************************************

      Session session = null;


      try {

      if (log.isTraceEnabled()) log.trace("Creating transacted session with JMS broker.");
      session = connection.createSession(true, Session.SESSION_TRANSACTED);

      // **************************************************************
      // Locate bound destination and create consumer
      // **************************************************************

      if (log.isTraceEnabled()) log.trace("Searching for named destination: " + jndiDestination);
      Destination destination = (Destination) ic.lookup(jndiDestination);

      if (log.isTraceEnabled()) log.trace("Creating consumer for named destination " + jndiDestination);
      MessageConsumer consumer = (filter == null || filter.trim().length() == 0) ? session.createConsumer(destination) : session.createConsumer(destination, filter);

      if (log.isTraceEnabled()) log.trace("Starting JMS connection.");
      connection.start();

      // **************************************************************
      // Consume messages
      // **************************************************************

      if (log.isDebugEnabled()) log.trace("Creating retrieval containers.");
      List<ResultMessage> processedMessages = new ArrayList<ResultMessage>(maxMessages);
      BytesMessage jmsMessage = null;

      for (int i = 0 ; i < maxMessages ; i++) {

      // **********************************************************
      // Attempt message retrieve
      // **********************************************************

      if (log.isTraceEnabled()) log.trace("Attempting retrieval: " + i);
      switch (type) {
      case BLOCKING :
      jmsMessage = (BytesMessage) consumer.receive();
      break;
      case IMMEDIATE :
      jmsMessage = (BytesMessage) consumer.receiveNoWait();
      break;
      case TIMED :
      jmsMessage = (BytesMessage) consumer.receive(timeout);
      break;
      }

      // **********************************************************
      // Process retrieved message
      // **********************************************************

      if (jmsMessage != null) {

      if (log.isTraceEnabled()) log.trace("Message retrieved\n" + jmsMessage);

      // ******************************************************
      // Extract message
      // ******************************************************

      if (log.isTraceEnabled()) log.trace("Extracting result message container from JMS message.");
      byte[] extracted = new byte[(int) jmsMessage.getBodyLength()];
      jmsMessage.readBytes(extracted);

      // ******************************************************
      // Decompress message
      // ******************************************************

      if (jmsMessage.propertyExists(COMPRESSED_HEADER) && jmsMessage.getBooleanProperty(COMPRESSED_HEADER)) {
      if (log.isTraceEnabled()) log.trace("Decompressing message.");
      extracted = decompress(extracted);
      }

      // ******************************************************
      // Done processing message
      // ******************************************************

      if (log.isTraceEnabled()) log.trace("Message added to retrieval container.");

      String signature = jmsMessage.getStringProperty(DIGITAL_SIGNATURE);
      processedMessages.add(new ResultMessage(extracted, signature));

      } else
      if (log.isTraceEnabled()) log.trace("No message was available.");

      }

      // **************************************************************
      // Package return container
      // **************************************************************

      if (log.isTraceEnabled()) log.trace("Packing retrieved messages to return.");
      ResultMessage[] collectorMessages = new ResultMessage[processedMessages.size()];
      for (int i = 0 ; i < collectorMessages.length ; i++)
      collectorMessages = processedMessages.get(i);

      if (log.isTraceEnabled()) log.trace("Returning " + collectorMessages.length + " messages.");
      return collectorMessages;

      } catch (NamingException ex) {
      sessionContext.setRollbackOnly();
      log.error("Unable to locate named queue: " + jndiDestination, ex);
      throw new ConsumerException("Unable to locate named queue: " + jndiDestination, ex);
      } catch (InvalidSelectorException ex) {
      sessionContext.setRollbackOnly();
      log.error("Invalid filter: " + filter, ex);
      throw new InvalidFilterException("Invalid filter: " + filter, ex);
      } catch (IOException ex) {
      sessionContext.setRollbackOnly();
      log.error("Message decompression failed.", ex);
      throw new ConsumerException("Message decompression failed.", ex);
      } catch (GeneralSecurityException ex) {
      sessionContext.setRollbackOnly();
      log.error("Message decryption failed.", ex);
      throw new ConsumerException("Message decryption failed.", ex);
      } catch (JMSException ex) {
      sessionContext.setRollbackOnly();
      log.error("Unable to consumer messages.", ex);
      throw new ConsumerException("Unable to consume messages.", ex);
      } catch (Throwable ex) {
      sessionContext.setRollbackOnly();
      log.error("Unexpected error.", ex);
      throw new ConsumerException("Unexpected error.", ex);
      } finally {
      try {
      if (session != null) session.close();
      } catch (JMSException ex) {
      log.error("Unexpected error.", ex);
      }
      }

      }

      Edited by: vineet on May 9, 2011 7:53 PM