6 Replies Latest reply: Aug 10, 2011 10:06 AM by pmackin RSS

    Messaging - consumer problem

      We are trying to utilize Coherence messaging pattern with queues but can not get it to work satisfactory when testing increasing and decreasing cluster nodes.
      I get messages stuck in coherence.messagingpattern.messages cache that is never removed.

      I start 6 nodes that both produces and consumes messages. Producer send one message every 500 ms on each node.

      If I then decrease (killing java process, no unsubscribe) with one node and let the cluster rebalance itself between each shutdown until I get 4 four nodes I almost every time get messages stuck in coherence.messaging.messages-cache that will not be delivered anywhere.
      Increasing with a couple of nodes usually make the coherence.messagingpattern.messages increase very rapidly as if no nodes are consuming messages from the new nodes.

      What am I doing wrong?
      I have tested same code on both 3.5.3, 3.6.1 and 3.7.0

      What am I missing?
      Is it a bug in the messaging pattern or do I have a bug in my code?

      See code below for test. I have also made a [maven project|http://www.jakeri.net/tmp/messaging-test.zip] for test.


      package net.jakeri.coherence.test;
      import java.util.concurrent.Executors;
      import com.oracle.coherence.common.identifiers.Identifier;
      import com.oracle.coherence.patterns.messaging.DefaultMessagingSession;
      import com.oracle.coherence.patterns.messaging.MessagingSession;
      import com.oracle.coherence.patterns.messaging.Subscriber;
      public class Server {
           public static String QUEUE = "JAKERI_TEST";
            * @param args
            * @throws InterruptedException
           public static void main(String[] args) throws InterruptedException {
                System.out.println("About to start listener...");
                Executors.newSingleThreadExecutor().submit(new Listener());         
                System.out.println("About to start sender...");
                MessagingSession ms = DefaultMessagingSession.getInstance();
                Identifier queue = ms.createQueue(QUEUE);
                for (int i = 0;; i++) {
                     String message = "Messsage_" + i;
                     ms.publishMessage(queue, message);
                     if (i % 10 == 0)
                          System.out.println("Sent: " + message);
           private static class Listener implements Runnable {
                public void run() {
                     MessagingSession ms = DefaultMessagingSession.getInstance();
                     Identifier queue = ms.createQueue(QUEUE);
                     Subscriber s = ms.subscribe(queue);
                     while (true) {
                          String m = (String) s.getMessage();
                          System.out.println("Got: " + m);