This discussion is archived
6 Replies Latest reply: Aug 10, 2011 8:06 AM by pmackin RSS

Messaging - consumer problem

64633 Newbie
Currently Being Moderated
We are trying to utilize Coherence messaging pattern with queues but can not get it to work satisfactory when testing increasing and decreasing cluster nodes.
I get messages stuck in coherence.messagingpattern.messages cache that is never removed.

I start 6 nodes that both produces and consumes messages. Producer send one message every 500 ms on each node.

If I then decrease (killing java process, no unsubscribe) with one node and let the cluster rebalance itself between each shutdown until I get 4 four nodes I almost every time get messages stuck in coherence.messaging.messages-cache that will not be delivered anywhere.
Increasing with a couple of nodes usually make the coherence.messagingpattern.messages increase very rapidly as if no nodes are consuming messages from the new nodes.

What am I doing wrong?
I have tested same code on both 3.5.3, 3.6.1 and 3.7.0

What am I missing?
Is it a bug in the messaging pattern or do I have a bug in my code?

See code below for test. I have also made a [maven project|http://www.jakeri.net/tmp/messaging-test.zip] for test.


/Jakob

package net.jakeri.coherence.test;

import java.util.concurrent.Executors;

import com.oracle.coherence.common.identifiers.Identifier;
import com.oracle.coherence.patterns.messaging.DefaultMessagingSession;
import com.oracle.coherence.patterns.messaging.MessagingSession;
import com.oracle.coherence.patterns.messaging.Subscriber;

public class Server {

     public static String QUEUE = "JAKERI_TEST";

     /**
      * @param args
      * @throws InterruptedException
      */
     public static void main(String[] args) throws InterruptedException {
          System.out.println("Starting...");         
          System.out.println("About to start listener...");
          Executors.newSingleThreadExecutor().submit(new Listener());         
          System.out.println("About to start sender...");
          MessagingSession ms = DefaultMessagingSession.getInstance();
          Identifier queue = ms.createQueue(QUEUE);
         
          for (int i = 0;; i++) {
               Thread.sleep(500);
               String message = "Messsage_" + i;
               ms.publishMessage(queue, message);
               if (i % 10 == 0)
                    System.out.println("Sent: " + message);
          }
     }

     private static class Listener implements Runnable {

          @Override
          public void run() {

               MessagingSession ms = DefaultMessagingSession.getInstance();
               Identifier queue = ms.createQueue(QUEUE);
               Subscriber s = ms.subscribe(queue);

               while (true) {
                    String m = (String) s.getMessage();
                    System.out.println("Got: " + m);
               }
          }
     }
}

Legend

  • Correct Answers - 10 points
  • Helpful Answers - 5 points