4 Replies Latest reply: Jun 16, 2010 3:50 PM by Tom B RSS

    Help me to optimize the below message sender code

    698590
      Hi,

      Can anyone please help me to optimize the below message sender code which will get called heavily (at-least a million times a day)

      public class JMSMQManager
      {
           public static String JAVA_NAMING_FACTORY_INITIAL_VALUE          =null;
           public static String JAVA_NAMING_PROVIDER_URL_VALUE          =null;     
           public static String CONNECTION_FACTORY_VALUE          =null;
           public static String MQ_QUEUE_VALUE                    =null;
           private static Properties jndiProps =null;
      private String errorMessage;
      private Queue queue;
      private QueueConnection queueConnection;
      private QueueConnectionFactory queueConnectionFactory;
      private QueueSender queueSender;
      private QueueSession queueSession;
                
      public JMSMQManager
      {
      }

      void loadProperties();
      {         
           InputStream inpPropsFile =null;
                try
                {     
                     /**
                     * jndiProps has to be intitialized only once.
                     */               
                     if(jndiProps==null){
                          synchronized(this){
                               jndiProps = new Properties();
                               inpPropsFile = MetricsManager.class.getClassLoader().getResourceAsStream(Appcontants.JNDI_PROPS_FILE);
                               jndiProps.load(inpPropsFile);
                               JAVA_NAMING_FACTORY_INITIAL_VALUE = jndiProps.getProperty(JAVA_NAMING_FACTORY_INITIAL_KEY);
                               JAVA_NAMING_PROVIDER_URL_VALUE      = jndiProps.getProperty(JAVA_NAMING_PROVIDER_URL_KEY);                    
                               CONNECTION_FACTORY_VALUE = jndiProps.getProperty(CONNECTION_FACTORY_KEY);
                               MQ_QUEUE_VALUE          = jndiProps.getProperty(MQ_QUEUE_KEY);                

                          }
                     }               
                     
                }catch (Exception e) {
                     log.error("Error");
                }finally{
                     if(inpPropsFile!=null){
                          try{
                               inpPropsFile.close();
                          }catch(Exception e){                         
                          }
                     }
                }
      }

      public void init() throws NamingException, JMSException
      {
      loadProperties();

      try{     
                     InitialContext mqInitialContext = getInitialContext();
                     if(mqInitialContext!=null){
                          queueConnectionFactory = (QueueConnectionFactory) mqInitialContext.lookup(Appcontants.CONNECTION_FACTORY_VALUE);
           queueConnection = queueConnectionFactory.createQueueConnection();
           queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
           queue = (Queue)mqInitialContext.lookup(MQ_QUEUE_VALUE);
           queueSender = queueSession.createSender(queue);
           queueConnection.start();
                     }
                }catch (Exception ne) {
                     log.error("Error");
                }
      }

      private InitialContext getInitialContext()
      {       
                //TODO: Need to cache the lookup and the initial context.will do once the queue is stabilized in all the environments.                
                InitialContext mqInitialContext=null;
                try{
                     Hashtable<String,String> env = new Hashtable<String,String>();
                     env.put(Context.INITIAL_CONTEXT_FACTORY, JAVA_NAMING_FACTORY_INITIAL_VALUE);
                     env.put(Context.PROVIDER_URL, JAVA_NAMING_PROVIDER_URL_VALUE);
                     mqInitialContext = new InitialContext(env);

                }catch (Exception ne) {
                     errorMessage ="Error occured in initializing the context";
                     log.error(errorMessage);
                }
      return mqInitialContext;
      }


      public void sendMsg(Serializable msg) throws JMSException {
      try
      {          
           init();
      ObjectMessage message = queueSession.createObjectMessage();
      message.setObject(msg);
      queueSender.send(message);
      }
      catch (JMSException jmse) {
           jmse.printStackTrace();
           log.error("Exception during instantiation.", jmse);     
      }
      catch (Exception e) {
           e.printStackTrace();
           log.error("Exception during instantiation.", e);
      }finally{
           cleanup();
      }
           }


      protected void cleanup()
      {
      try
      {
           if(queueSender!=null){
                queueSender.close();
           }      
      }
      catch(Exception e)
      {
      e.printStackTrace();
      }
      try
      {
           if(queueSession!=null){
      queueSession.close();           
           }
      }
      catch(Exception e)
      {
      e.printStackTrace();
      }
      try
      {
           if(queueConnection!=null){
                queueConnection.close();
           }
      }
      catch(Exception e)
      {
      e.printStackTrace();
      }
      }



      /**
      * public method to get hold of the single instance of this class
      */
      public static JMSMQManager getInstance() throws Exception
      {     
      return new JMSMQManager();
      }
      }
        • 1. Re: Help me to optimize the below message sender code
          687626
          For each sendMessage(), you are calling init(), which does the jndi lookup. This is costly. Instead modify the code so that jndi lookup and jms objects ( connection, session , producer etc) are created only once and are used for all subsequent calls of sendMessage(). Note Session & Message Producers are single threaded , so you can create a pool of sessions & producers which could be used for invocation of sendMessage() calls from multiple threads.

          Edited by: atheek1 on Jun 11, 2010 6:19 AM
          • 2. Re: Help me to optimize the below message sender code
            Tom B
            See below for a simple sample producer pool that I put together for an old performance guide white-paper.

            Tom
            import javax.jms.*;
            import javax.naming.InitialContext;
            import javax.naming.NamingException;
            import java.util.HashMap;
            import java.util.Iterator;
            import java.util.LinkedList;
            import weblogic.jms.extensions.WLSession;
            
            /**
             * This example shows a way to create a pool of JMS queue producers.
             * Producers are created on an as needed basis and are cached based on queue
             * JNDI name.  Each producer gets its own jms session
             * and jms connection.  Producers are removed when the pool
             * is closed, or when the JMS provider calls their exception listener.
             *
             * @author Copyright (c) 2002 by BEA Systems, Inc. All Rights Reserved.
             */
            
            public final class SenderPool {
            
              private class SenderPoolElt implements ExceptionListener {
                String queueName;   // JNDI name of queue
                InitialContext ctx;
                QueueConnectionFactory qcf;
                QueueConnection qc;
                QueueSession qsession;
                QueueSender qsender;
                Queue queue;
                TextMessage textMessage;
                
                // called by messaging system on connection or session failure
                public void onException(JMSException e) {
                  e.printStackTrace();
                  remove(this);
                  close();
                }
            
                void close() {
                  if (ctx != null) try { ctx.close(); } catch (Exception ignore) {};
                  if (qc != null)  try { qc.close();  } catch (Exception ignore) {};
                }
              }
            
              // a hash map of linked lists of producers that use the same queue
              private HashMap pLists = new HashMap();
            
              private boolean closed;
            
              public SenderPool() {
              }
            
              // get a sender from the pool or create one
              private SenderPoolElt get(String queueName)
              throws JMSException, NamingException {
                synchronized(pLists) {
                  LinkedList producers = (LinkedList)pLists.get(queueName);
                  if (producers != null && producers.size() > 0)
                    return (SenderPoolElt)producers.removeFirst();
                }
                SenderPoolElt spe = new SenderPoolElt();
                try {
                  spe.ctx = new InitialContext();
                  spe.qcf = (QueueConnectionFactory)
                               spe.ctx.lookup("javax.jms.QueueConnectionFactory");
                  spe.qc = spe.qcf.createQueueConnection();
                  spe.qsession = spe.qc.createQueueSession(false, 0);
                  spe.queueName = queueName;
                  spe.queue = (Queue)spe.ctx.lookup(queueName);
                  spe.qsender = spe.qsession.createSender(spe.queue); 
                  spe.textMessage = spe.qsession.createTextMessage();
                  ((WLSession)spe.qsession).setExceptionListener(spe);
                  spe.qc.setExceptionListener(spe);
                } catch ( JMSException je ) {
                  spe.close();
                  throw je;
                } catch ( NamingException ne ) {
                  spe.close();
                  throw ne;
                } 
                return spe;
              }
            
              // called by spe when it gets an exception
              private void remove(SenderPoolElt spe) {
                synchronized(pLists) {
                  LinkedList producers = (LinkedList)pLists.get(spe.queueName);
                  if (producers != null) producers.remove(spe);
                }
              }    
            
              /**
               * Send a text message using a producer in the pool
               * that is associated with the given queue.
               */
              public String send(String queueJNDIName, String text, boolean persistent)
              throws JMSException, NamingException {
                synchronized(pLists) {
                  if (closed) throw new JMSException("Producer pool closed");
                }
            
                SenderPoolElt spe;
            
                try {
                  spe = get(queueJNDIName);
                } catch (Exception e) {
                  // just for the heck of it, try one more time
                  spe = get(queueJNDIName);
                }
            
                spe.textMessage.clearBody();
                spe.textMessage.setText(text);
                spe.qsender.setDeliveryMode((persistent)?DeliveryMode.PERSISTENT
                                                        :DeliveryMode.NON_PERSISTENT);
            
                try {
                  spe.qsender.send(spe.textMessage);
                } catch (JMSException e) {
                  spe.close();
                  throw e;
                }
            
                String messageId = spe.textMessage.getJMSMessageID();
                spe.textMessage.clearBody();
            
                boolean isClosed = false;  
            
                synchronized(pLists) {
                  if (closed) {
                    // may have closed during send, must clean up spe, but don't
                    // want to close it under lock
                    isClosed = true;
                  } else {
                    // put sender in pool now that we are done    
                    LinkedList producers = (LinkedList)pLists.get(spe.queueName);
                    if (producers == null) {
                      producers = new LinkedList();
                      pLists.put(spe.queueName, producers);
                    }
                    producers.add(spe);
                  }
                }
            
                if (isClosed) spe.close();
                return messageId;
              }
            
              /**
               * Close all producers in the pool.
               */
              public void close() {
                // don't want to call spe.close under a lock, so we use a state flag
                synchronized(pLists) {
                  if (closed) return;
                  closed = true;
                }
                for (Iterator iter = pLists.values().iterator(); iter.hasNext();) {
                  LinkedList producers = (LinkedList)iter.next();
                  do {
                    if (producers.size() == 0) break;
                    ((SenderPoolElt)producers.removeFirst()).close(); 
                  } while (true);
                }
                pLists.clear(); // save the garbage collector some thinking
                pLists = null;
              }
            }
            
            Here is an example of using pool “mySenderPool” within an asynchronous consumer.  It forwards text messages persistently. 
            
            public void onMessage(Message m) {
              Exception e = null;
              try {
                mySenderPool.send(“MyApplicationQueue”, ((TextMessage)m).getText(), true);
              } catch (NamingException ne) {
                e = ne;
              } catch (JMSException je) {
                e = ne;
              } catch (ClassCastException cce) {
                // expected a text message
                e = cce;
              }
              if (e != null) {
                e.printStackTrace();
                // on MDB:    if tx required, set rollback-only on context
                //            else throw a runtime exception to force a recover
                // on client: if transacted session.rollback()
                //            else session.cleanup()
              }
            }
            • 3. Re: Help me to optimize the below message sender code
              698590
              Thanks for your reply. Finally I optimized the code like as shown below. Here MQJMSManager is a singleton and the constructor will get called only once for initializing the queue which includes starting the connection. The application will use logMsg for sending the message and the logMsg will gets called for at least a million times a day. What do you think about this design? Any major drawbacks to this design?

              The issue is that I am getting “MQJMS2007: failed to send message to MQ queue” error for 10% of the messages.

              public class MQJMSManager implements ExceptionListener
              {   
              private static MQJMSManager mQJMSManagerObj = null;
              private transient QueueSender queueSender = null;
              private transient QueueSession queueSession = null;
              private QueueConnection queueConnection = null;

              private Logger log = LogFactory.getLogger(MQJMSManager.class);

              public void onException(JMSException e) {
                   log.error("ExceptionListener.onException called");
              e.printStackTrace();
              cleanup();
              }

              public MQJMSManager() throws AppException{   
                   
              Queue myQueue = null;
                   try {
                        queueConnection = MQServiceLocator.getInstance().getQueueConnectionFactory(true).createQueueConnection();
                        myQueue = MQServiceLocator.getInstance().getQueue(true);
                        queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                        queueSender = queueSession.createSender(metricsQueue);
                        queueConnection.start();
                        
                   }catch (JMSException je) {
                        throw new AppException("Failed to intialize the MQ Connection",je);          
                   }catch (Exception ex) {
                        throw new AppException("Failed to intialize the MQ Connection",je);
                   }
              }
                   

              public void logMsg(Serializable msgObj) throws JMSException {
              try
              {
              ObjectMessage message = queueSession.createObjectMessage();
              message.setObject(msgObj);
              queueSender.send(message);
              }
              catch (JMSException jmse) {
                   log.error("Exception during instantiation.", jmse);
              }
              catch (Exception e) {
                   log.error("Exception during instantiation.", e);
              }
              }

              protected void cleanup() {
                   try {
                        queueSender.close();
                   } catch (Exception e) {
                        e.printStackTrace();
                        log.error("Exception occured in closing the queue sender", e);     
                   }
                   try {
                        queueSession.close();
                   } catch (Exception e) {
                        e.printStackTrace();
                        log.error("Exception occured in closing the queue session", e);     
                   }
                   try {
                        queueConnection.close();
                   } catch (Exception e) {
                        e.printStackTrace();
                        log.error("Exception occured in closing the queue connection", e);     
                   }
              }
              /**
              * public method to get hold of the single instance of this class
              */
              public static MQJMSManager getInstance() throws Exception
              {
              if (mQJMSManagerObj == null) {
                   synchronized(MQJMSManager .class){
                        mQJMSManagerObj = new MQJMSManager();
                   }
              }
              return mQJMSManagerObj;
              }
              }
              • 4. Re: Help me to optimize the below message sender code
                Tom B
                JMS in general doesn't allow multi-threading sessions or their producers, so if you are multi-threading that might be the problem. The pooling sample above handles multi-threading by ensuring that concurrent requests end up with a dedicated session and producer. It looks like you could make slight modifications to the pooling sample to get what you need. If this doesn't help, I recommend digging into the IBM MQ documentation.

                Regards,

                Tom