This discussion is archived
3 Replies Latest reply: Jun 16, 2011 4:48 AM by 64633 RSS

Messaging Pattern Bug

823083 Newbie
Currently Being Moderated
Hello,

I have developed some code to introduce a listener concept so that listeners can be created for specific destinations. Under the covers all that is happening is that we are creating a thread and doing a subscriber.getMessage() until an interrupt is recieved to signal the subscriber to stop listening. I think there is a potential bug when a call to subscriber.getMessage() is interrupted. The following is the scenario:

Create a Q1
Create Subscriber 1 (using listener concept as above)
Publish msg to Q1
Subscriber succesfully consumes msg
Remove listener by interupting thread which is doing subscriber.getMessage() and call unsubscribe on subscriber.
Create Subscriber 2 (using listener concept as above)
Publish msg to Q1
Subscriber2 does not consumes msg, In fact the message is delivered to old subscription even though unsubscribe has been done on it.

My environment is as follows:
- Coherence 3.6.0
- Coherence-3.6-common 1.7.3
- Coherence-3.6-messaging 2.7.4

My code is as follows: (excuse the large size):

package coherence.messaging;

import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import junit.framework.Assert;

import org.junit.Test;

import com.oracle.coherence.common.identifiers.Identifier;
import com.oracle.coherence.patterns.messaging.DefaultMessagingSession;
import com.oracle.coherence.patterns.messaging.Destination;
import com.oracle.coherence.patterns.messaging.Message;
import com.oracle.coherence.patterns.messaging.MessagingSession;
import com.oracle.coherence.patterns.messaging.Queue;
import com.oracle.coherence.patterns.messaging.QueueSubscription;
import com.oracle.coherence.patterns.messaging.Subscriber;
import com.oracle.coherence.patterns.messaging.Subscription;
import com.oracle.coherence.patterns.messaging.SubscriptionIdentifier;
import com.oracle.coherence.patterns.messaging.Topic;
import com.oracle.coherence.patterns.messaging.TopicSubscription;
import com.oracle.coherence.patterns.messaging.exceptions.SubscriberInterruptedException;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;

public class MultipleSubscribersIssueTest {

     private static final int NTHREADS = 1;

     /**
     * One subscriber consumes one message off a queue destination
     */
     @Test
     public void testAbnormalTermination() {
          
          //Get a session
          MessagingSession messagingSession = DefaultMessagingSession.getInstance();
          
          //Create a Q
          Identifier queueIdentifier = messagingSession.createQueue(QUEUE_DESTINATION_1);          
          outputAllMessagingData();
          
          //Subscribe to the Q
          Subscriber subscriber = messagingSession.subscribe(QUEUE_DESTINATION_1);          
          outputAllMessagingData();                         
          
          //Create Listener 1
          Listener listener1 = new Listener(subscriber);
          //Start the listening thread
          ExecutorService listenerExecutor = Executors.newFixedThreadPool(NTHREADS);
          listenerExecutor.execute(listener1);          
          outputAllMessagingData();
          
          //Publish one message to the Q
          messagingSession.publishMessage(queueIdentifier, "Message 1");
          outputAllMessagingData();
          
          //Calls interrupt on the listening thread, to terminate the blocking get call
          // and to unsubscribe to the Q
          listenerExecutor.shutdownNow();          
          outputAllMessagingData();

          //Another Subscriber to the same Q
          Subscriber subscriber2 = messagingSession.subscribe(QUEUE_DESTINATION_1);
          
          //Create Listener 2
          Listener listener2 = new Listener(subscriber2);          
          //Start the listening thread
          ExecutorService listenerExecutor2 = Executors.newFixedThreadPool(NTHREADS);
          listenerExecutor2.execute(listener2);
          outputAllMessagingData();

          //Publish another message to the Q
          //But it is delivered to the old subscription
          messagingSession.publishMessage(queueIdentifier, "Message 2");
          outputAllMessagingData();

          //Calls interrupt on the listening thread, to terminate the blocking get call
          // and to unsubscribe to the Q
          listenerExecutor.shutdownNow();          
          outputAllMessagingData();
          
          //Assert
          Assert.assertEquals(1, CacheFactory.getCache("coherence.messagingpattern.subscriptions").keySet().size());
     }
     
     class Listener implements Runnable {
          
          private Subscriber subscriber;
          
          //Pass in a subscriber
          public Listener(Subscriber subscriber) {
               this.subscriber = subscriber;
          }
          
          @Override
          public void run() {
               Thread.currentThread().setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
               public void uncaughtException(Thread t, Throwable e) {
                    //Never gets here
               System.out.println("Silent logger activated");
               }
               });

               while(Thread.currentThread().isInterrupted() == false) {
                    try {
                         //Blocking call - consume message off the Q
                         String message = (String)subscriber.getMessage();
                         System.out.println(message);
                    } catch (SubscriberInterruptedException ex2) {
                         subscriber.unsubscribe();
                         //subscriber.release(); //this doesn't clear up subscription either
                         Thread.currentThread().interrupt();
                    }
               }
               System.out.println("Exited while loop in thread");
          }
     };
     
     /**
     * Output all the messaging data stored in the cache
     */
     @SuppressWarnings("unchecked")
     public static void outputAllMessagingData() {
          //Retrieve Destination Details
          NamedCache messagingDestinations = CacheFactory.getCache("coherence.messagingpattern.destinations");
          System.out.println(messagingDestinations.getCacheName() + " has " + messagingDestinations.size() + " Objects");
          
          Set<Object> messagingDestinationKeys = (Set<Object>) messagingDestinations.keySet();
          for (Object messagingDestinationKey : messagingDestinationKeys) {
               Destination destination = (Destination) messagingDestinations.get(messagingDestinationKey);
               if(destination instanceof Queue) {
                    Queue queue = (Queue) destination;
                    System.out.println("Queue Name: " + queue.getName());
                    System.out.println("Queue Waiting: " + queue.getNumMessagesToDeliver());
                    System.out.println("Queue Redelivered: " + queue.getNumMessagesToRedeliver());
                    System.out.println("Queue Received: " + queue.getNumMessagesReceived());
                    System.out.println("Queue Delivered: " + queue.getNumMessagesDelivered());
                    System.out.println("Queue Subscribers: " + queue.getNumWaitingSubscriptions());
                    System.out.println("Queue Has Subscribers: " + queue.hasSubscriptions());
               }
               else {
                    Topic topic = (Topic) destination;
                    System.out.println("Topic Name: " + topic.getName());
                    System.out.println("Topic Subscriptions: " + topic.getSubscriptionIdentifiers().size());
                    
                    Set<SubscriptionIdentifier> subscriptionIds = topic.getSubscriptionIdentifiers();
                    for(SubscriptionIdentifier id : subscriptionIds) {
                         System.out.println("Topic Subscription Id: " + id.getSubscriberIdentifier());
                    }
               }
          }
          
          //Retrieve Message Details
          NamedCache messages = CacheFactory.getCache("coherence.messagingpattern.messages");
          System.out.println();
          System.out.println(messages.getCacheName() + " has " + messages.size() + " Objects");
          
          Set<Object> messageKeys = messages.keySet();
          for (Object messageKey : messageKeys) {
               Message message = (Message) messages.get(messageKey);
               System.out.println("Message Key : " + message.getKey());
               System.out.println("Queue Identifier: " + message.getDestinationIdentifier());               
               System.out.println("Message Identifier: " + message.getMessageIdentifier());
               System.out.println("Message Payload: " + message.getPayload());
          }

          //Retrieve Subscriber Details
          NamedCache subscriptions = CacheFactory.getCache("coherence.messagingpattern.subscriptions");
          System.out.println();
          System.out.println(subscriptions.getCacheName() + " has " + subscriptions.size() + " Objects");
          
          Set<Object> subscriptionKeys = subscriptions.keySet();
          for (Object subscriptionKey : subscriptionKeys) {
               Subscription subscription = (Subscription) subscriptions.get(subscriptionKey);
               if(subscription instanceof QueueSubscription) {
                    QueueSubscription queueSubscription = (QueueSubscription) subscription;
                    System.out.println("Queue Subscription Name : " + queueSubscription.getName());
                    System.out.println("Queue Subscription Identifier : " + queueSubscription.getIdentifier());
                    System.out.println("Queue Subscription Num Messages : " + queueSubscription.getNumMessages());
                    System.out.println("Queue Subscription Num Messages Ack : " + queueSubscription.getNumMessagesAcknowledged());
                    System.out.println("Queue Subscription Messages Recv : " + queueSubscription.getNumMessagesReceived());
               }
               else {
                    TopicSubscription topicSubscription = (TopicSubscription) subscription;
                    System.out.println("Topic Subscription Name : " + topicSubscription.getName());
                    System.out.println("Topic Subscription Identifier : " + topicSubscription.getIdentifier());
                    System.out.println("Topic Subscription Num Messages : " + topicSubscription.getNumMessages());
                    System.out.println("Topic Subscription Num Messages Ack : " + topicSubscription.getNumMessagesAcknowledged());
                    System.out.println("Topic Subscription Messages Recv : " + topicSubscription.getNumMessagesReceived());
               }
          }
          
          System.out.println();
     }


}
  • 1. Re: Messaging Pattern Bug
    823083 Newbie
    Currently Being Moderated
    Looking through the code it looks like the issue is in class AbstractSubscriber. The method unsubscribe() calls through to isActive(). The state when interrupted is State.Interrupted and therefore the following code in isActive() does not evaluate to true:

    public boolean isActive()
    {
    return getState() == State.Starting || getState() == State.Active;
    }
  • 2. Re: Messaging Pattern Bug
    823083 Newbie
    Currently Being Moderated
    Any resolution to this?
  • 3. Re: Messaging Pattern Bug
    64633 Newbie
    Currently Being Moderated
    Have you got any solutions for this?
    I believe we are experiencing somewhat the same problem. Our coherence.messagingpattern.messages increases and no one seems to consume them even though our consumer are waiting for messages.

    We are (usually) experiencing problem when nodes in the cluster are restarted and partitions have to be distributed to other nodes.