5 Replies Latest reply: Jul 2, 2010 12:07 PM by 807581 RSS

    consumerFlowLimit set to 0 working incorrectly

    807581
      I've got a simple program which creates a queue and sets the consumerFlowLimit on that queue. Then it creates a producer for the queue which loads the queue with 10 messages. The payload of each message is the order it was created in. So the first message is a text message "1", the second message is a text message "2", etc.

      The program then creates two consumers. The first is called "c5" because it reads a single message, prints the message, then sleeps for 5 seconds. The second is named "c1" because it does the same, but sleeps for only one second.

      When I set consumerFlowLimit on the queue to -1 (unlimited) I get what I expect. The 10 messages get randomly "pre-assigned" to the queues, and I get output (something) like this:

      c1 read message: 2 Mon Jul 14 08:37:53 PDT 2008
      c5 read message: 1 Mon Jul 14 08:37:53 PDT 2008
      c1 read message: 5 Mon Jul 14 08:37:54 PDT 2008
      c1 read message: 7 Mon Jul 14 08:37:55 PDT 2008
      c1 read message: 9 Mon Jul 14 08:37:56 PDT 2008
      c5 read message: 3 Mon Jul 14 08:37:58 PDT 2008
      c5 read message: 4 Mon Jul 14 08:38:03 PDT 2008
      c5 read message: 6 Mon Jul 14 08:38:08 PDT 2008
      c5 read message: 8 Mon Jul 14 08:38:13 PDT 2008
      c5 read message: 10 Mon Jul 14 08:38:18 PDT 2008

      What I want, however, is for the messages to be pulled off and printed in order, from 1 to 10. So I don't want the broker pre-assigning any of the messages -- I want the messages assigned to consumers only when they ask for them. So, I set the consumerFlowLimit to 0. But the setting of 0 behaves exactly like it does for the setting of -1. I think this is a bug and want to know if I should enter it as such. I see the same behavior for both the 1.3 and 1.4 brokers.

      I do know that I'm successfully setting consumerFlowLimit, not only because I query and print it after I set it , but because when I set it to 1, it works as expected: only one message gets pre-assigned and output looks like this:

      c5 read message: 1 Mon Jul 14 09:12:32 PDT 2008
      c1 read message: 2 Mon Jul 14 09:12:32 PDT 2008
      c1 read message: 4 Mon Jul 14 09:12:33 PDT 2008
      c1 read message: 5 Mon Jul 14 09:12:34 PDT 2008
      c1 read message: 6 Mon Jul 14 09:12:35 PDT 2008
      c1 read message: 7 Mon Jul 14 09:12:36 PDT 2008
      c5 read message: 3 Mon Jul 14 09:12:37 PDT 2008
      c1 read message: 8 Mon Jul 14 09:12:37 PDT 2008
      c1 read message: 10 Mon Jul 14 09:12:38 PDT 2008
      c5 read message: 9 Mon Jul 14 09:12:42 PDT 2008


      Thanks if you can help.
        • 1. Re: consumerFlowLimit set to 0 working incorrectly
          807581
          I believe this message in the 4.1 release notes contains the answer to my question:

          There has been some confusion about how to configure the broker for round-robin delivery. The solution is simple and configurable.

          1. Set the destination attribute maxNumActiveConsumers to -1. This turns on round-robin delivery.
          2. Set the destination attribute consumerFlowLimit to 1. This specifies the number of messages delivered to a single consumer before delivery progresses to the next consumer. For different chunking, set this attribute to the desired value. By default, one hundred messages are delivered to each consumer.

          However, I have a new question. How do I set maxNumActiveConsumers for a destination? It's not settable via the JMX interface...

          Thanks.
          • 2. Re: consumerFlowLimit set to 0 working incorrectly
            807581
            I wish someone from Sun were interested in this thread cause I'm convinced this is a pretty serious bug. See how the last message is blocked for 4 seconds because it's been pre-assigned to the c5 queue? The c1 thread is ready to handle this message, but it can't get at it. There is no way to avoid this, and that's a huge problem for a simple producer/consumer scenario. Consumers sit idle when there is work available on the queue.

            Would posting my sample code help?

            consumerFlowLimit: 1
            maxNumActiveConsumers: -1

            Thread c5 running
            Thread c1 running
            c1 read message: 2 Tue Jul 15 10:50:33 PDT 2008
            c5 read message: 1 Tue Jul 15 10:50:33 PDT 2008
            c1 read message: 3 Tue Jul 15 10:50:34 PDT 2008
            c1 read message: 5 Tue Jul 15 10:50:35 PDT 2008
            c1 read message: 6 Tue Jul 15 10:50:36 PDT 2008
            c1 read message: 7 Tue Jul 15 10:50:37 PDT 2008
            c5 read message: 4 Tue Jul 15 10:50:38 PDT 2008
            c1 read message: 8 Tue Jul 15 10:50:38 PDT 2008
            c1 read message: 10 Tue Jul 15 10:50:39 PDT 2008
            c5 read message: 9 Tue Jul 15 10:50:43 PDT 2008
            • 3. Re: consumerFlowLimit set to 0 working incorrectly
              807581
              Challenge to Sun: Explain why there are no settings that allow this program to finish in 8 seconds every time it runs. 1 producer queues 10 messages. 2 consumers drain the queue. One sleeps 1 second after it dequeues a message, the other sleeps 5 seconds. Two messages should be dequeued at the 0 second marker and the 5 second marker, and one message at other second markers. 10 messages in 8 seconds. Should be easy, but openmq can't do it.

              Command line looks like this once you compile it:

              java -cp build\classes;lib\imqjmx.jar;lib\jms.jar;lib\imq.jar my.example.Flow 1 -1

              The parameters are for the consumerFlowLimit and maxNumActiveConsumers attributes. Set them any way you like. Nothing will get you to 8 seconds consistently.

              ***********************************************************************************

              package my.example;

              import java.util.Date;

              import javax.jms.Connection;
              import javax.jms.DeliveryMode;
              import javax.jms.Destination;
              import javax.jms.MessageConsumer;
              import javax.jms.MessageProducer;
              import javax.jms.Session;
              import javax.jms.TextMessage;
              import javax.management.Attribute;
              import javax.management.MBeanServerConnection;
              import javax.management.ObjectName;
              import javax.management.remote.JMXConnector;

              import com.sun.messaging.AdminConnectionFactory;
              import com.sun.messaging.ConnectionConfiguration;
              import com.sun.messaging.jms.management.server.DestinationAttributes;
              import com.sun.messaging.jms.management.server.DestinationType;
              import com.sun.messaging.jms.management.server.MQObjectName;

              public class Flow {

              private static com.sun.messaging.ConnectionFactory
              connectionFactory = new com.sun.messaging.ConnectionFactory();

              private String brokerName;
              private int brokerPort;
              private long flowLimit;
              private int activeConsumers;

              public Flow(String brokerName, int brokerPort, long flowLimit, int activeConsumers) {
              this.brokerName = brokerName;
              this.brokerPort = brokerPort;
              this.flowLimit = flowLimit;
              this.activeConsumers = activeConsumers;
              }

              public void start() {

              try {
              connectionFactory.setProperty(ConnectionConfiguration.imqAddressList,
              this.brokerName + ":" + this.brokerPort);

              Connection connection = connectionFactory.createConnection();

              connection.start();
              Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

              String queueName = "myQueue";
              Destination myQueue = session.createQueue(queueName);

              MessageProducer producer = session.createProducer(myQueue);
              producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

              AdminConnectionFactory acf = new AdminConnectionFactory();
              JMXConnector jmxc = acf.createConnection("admin", "admin");
              MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
              ObjectName destConfigName = MQObjectName.createDestinationConfig(DestinationType.QUEUE, queueName);
              Attribute attr = new Attribute(DestinationAttributes.CONSUMER_FLOW_LIMIT, flowLimit);
              mbsc.setAttribute(destConfigName, attr);
              attr = new Attribute(DestinationAttributes.MAX_NUM_ACTIVE_CONSUMERS, activeConsumers);
              mbsc.setAttribute(destConfigName, attr);
              Long longAttrValue = (Long)mbsc.getAttribute(destConfigName, DestinationAttributes.CONSUMER_FLOW_LIMIT);
              Integer attrValue = (Integer)mbsc.getAttribute(destConfigName, DestinationAttributes.MAX_NUM_ACTIVE_CONSUMERS);
              jmxc.close();



              System.out.println( "consumerFlowLimit: " + longAttrValue );
              System.out.println( "maxNumActiveConsumers: " + attrValue);



              // Pre-load the queue with 10 messages numbered from 1 to 10
              for (int i = 1; i <= 10; i++) {
              TextMessage message = session.createTextMessage();
              message.setText(String.valueOf(i));
              producer.send(message);
              }



              System.out.println("");

              // Kick off two consumers
              new Thread(new Consumer("c5", 5)).start();
              new Thread(new Consumer("c1", 1)).start();



              } catch (Exception e) {
              System.out.println( "Exception occurred: " + e);
              }
              }

              public static void main(String[] args) {
              String limit = "1";
              String activeConsumers = "-1";

              if (args.length > 0) {
              limit = args[0];
              }

              if (args.length > 1) {
              activeConsumers = args[1];
              }

              Flow flow = new Flow("localhost", 7676, Long.parseLong(limit), Integer.parseInt(activeConsumers));
              flow.start();

              }

              class Consumer implements Runnable {
              private String name;
              private int sleepSeconds;

              public Consumer(String name, int sleepSeconds) {
              this.name = name;
              this.sleepSeconds = sleepSeconds;
              }

              public void run() {
              try {

              String queueName = "myQueue";

              Connection connection = connectionFactory.createConnection();
              Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

              Destination myQueue = session.createQueue(queueName);
              MessageConsumer consumer = session.createConsumer(myQueue);
              connection.start();
              while (true) {
              TextMessage message = (TextMessage) consumer.receive();
              System.out.println(this.name + " read message: " + message.getText() + "\t" + (new Date()));
              Thread.sleep(sleepSeconds * 1000);
              }
              } catch (Exception e) {
              System.out.println("Exception occurred: " + e);
              }

              }
              }
              }
              • 4. Re: consumerFlowLimit set to 0 working incorrectly
                807581
                Posting to bring this thread back to life: We are experiencing the same issue and it is a very serious bug.

                The current implementation of openMQ will guarantee that messages will be delivered inefficiently when using multiple consumers. What some of us are asking for is a proper round-robin implementation where the consumer is given the next available message without any buffering or pre-assigning from the broker.
                • 5. Re: consumerFlowLimit set to 0 working incorrectly
                  807581
                  1). The prefetch issue on Consumer.receive() with imqConsumerFlowLimit=1, CR6727929, has been fixed in 4.5 (availabe in build12)

                  2). Setting consumerFlowLimit to 0 is equivalent to -1 (doc CR 6966416 has been filed to clarify this)