0 Replies Latest reply: Jun 12, 2013 10:20 PM by user11153996 RSS

    Single queue: concurrent processing of messages in multiple consumers

    user11153996

      Hi,

      I am new to jms . The goal is to  process messages concurrently from a queue in an asynchronous listener's onMessage method  by attaching a listener instance to multiple consumer's with each consumer using its own session and running in a separate thread, that way the messages are passed on to the different consumers for concurrent processing. 

       

      1) Is it ossible to process messsages concurrently from a single queue by creating multiple consumers ?

      2)  I came up with the below code, but would like to get your thoughts on whether the below code looks correct for what I want to accomplish.  

       

       

       

       

       

      import java.io.IOException;

      import java.io.InputStreamReader;

      import java.util.Properties;

       

      import javax.jms.Connection;

      import javax.jms.ConnectionFactory;

      import javax.jms.Destination;

      import javax.jms.JMSException;

      import javax.jms.Message;

      import javax.jms.MessageConsumer;

      import javax.jms.MessageListener;

      import javax.jms.Session;

      import javax.jms.TextMessage;

       

      import org.slf4j.Logger;

      import org.slf4j.LoggerFactory;

       

      import com.walmart.platform.jms.client.JMSConnectionFactory;

       

      public class QueueConsumer implements Runnable, MessageListener {

       

        public static void main(String[] args) {

            

       

         // Create an instance of the client

          QueueConsumer consumer1 = new QueueConsumer();

          QueueConsumer consumer2 = new QueueConsumer();

          try {

          consumer1.init("oms","US.Q.CHECKOUT-ORDER.1.0.JSON");   //US.Q.CHECKOUT-ORDER.1.0.JSON   is the queue name

          consumer2.init("oms","US.Q.CHECKOUT-ORDER.1.0.JSON");

          }catch( JMSException ex ){

          ex.printStackTrace();

          System.exit(-1);

          }

       

          // Start the client running

          Thread newThread1 = new Thread(consumer1);

          Thread newThread2 = new Thread(consumer1);

          newThread1.start();newThread2.start();

        

          InputStreamReader aISR = new InputStreamReader(System.in);

                char aAnswer = ' ';

                do {

                    try {

        aAnswer = (char) aISR.read();

                          }

      catch (IOException e)

        {

        // TODO Auto-generated catch block

        e.printStackTrace();

         }

      } while ((aAnswer != 'q') && (aAnswer != 'Q'));

               

                newThread1.interrupt();

                newThread2.interrupt();

                try {

        newThread1.join();newThread2.join();

        } catch (InterruptedException e) {

        // TODO Auto-generated catch block

        e.printStackTrace();

        }

               

                System.out

        .println("--------------------exiting main thread------------------------"+Thread.currentThread().getId());

              System.exit(0);

      }

       

      // values will be read from a resource properties file

      private static String connectionFactoryName = null;

      private static String queueName = null;

       

      // thread safe object ref

        private static ConnectionFactory qcf = null;

        private static Connection queueConnection = null;

       

      // not thread safe

        private Session ses = null;

        private Destination queue = null;

        private MessageConsumer msgConsumer = null;

       

        public static final Logger logger = LoggerFactory

        .getLogger(QueueConsumer.class);

       

       

       

        public QueueConsumer() {

        super();

        }

       

        public void onMessage(Message msg) {

        if (msg instanceof TextMessage) {

        try {

       

        System.out

        .println("listener is "+Thread.currentThread().getId()+"--------------------Message recieved from queue is ------------------------"

        + ((TextMessage) msg).getJMSMessageID());

       

       

        } catch (JMSException ex) {

        ex.printStackTrace();

       

        }

        }

       

        }

       

        public void run() {

        // Start listening

        try {

        queueConnection.start();

        } catch (JMSException e) {

       

        e.printStackTrace();

       

        System.exit(-1);

        }

        while (!Thread.currentThread().isInterrupted()) {

        synchronized (this) {

        try {

        wait();

        } catch (InterruptedException ex) {

        break;

        }

        }

        }

       

        }

       

       

       

      /**

        * This method is called to set up and initialize the necessary Session,

        * destination and message listener

        * @param queue2

        * @param factoryName

        *

        */

        public void init(String factoryName, String queue2) throws JMSException {

        try {

       

        qcf = new JMSConnectionFactory(factoryName);

       

        /* create the connection */

        queueConnection = qcf.createConnection();

       

        /*

        * Create a session that is non-transacted and is client

        * acknowledged

        */

        ses = queueConnection.createSession(false,

        Session.CLIENT_ACKNOWLEDGE);

        queue = ses.createQueue(queue2);

        logger.info("Subscribing to destination: " + queue2);

       

        msgConsumer = ses.createConsumer(queue);

       

        /* set the listener  */

        msgConsumer.setMessageListener(this);

       

        System.out.println("Listening on queue " +queue2);

       

        } catch (Exception e) {

        e.printStackTrace();

        System.exit(-1);

        }

       

        }

       

       

       

        private static void setConnectionFactoryName(String name) {

        connectionFactoryName = name;

        }

       

        private static String getQueueName() {

        return queueName;

        }

       

        private static void setQueueName(String name) {

        queueName = name;

        }

       

      }