This discussion is archived
6 Replies Latest reply: Sep 8, 2010 7:03 AM by 805009 RSS

Durable Subscriber not working

807581 Newbie
Currently Being Moderated
Hi,
I am very new to JMS. I have 2 classes for implementing Publisher and Durable Subscriber with MessageListener. When I try to publish the messages, it publishes them to the topic. But when I try to start the subscriber, it is never able to read the messages. Even the onMessage() method of MessageListener is not invoked. Tried with session.commit() in publisher but no success. Please help.
Regards,
Anupam
public class LXKJMSPublisher {
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicPublisher topicPublisher = null;
int exitResult = 0;
String                topicName = "topic1";
public LXKJMSPublisher() {
TopicConnectionFactory topicConnectionFactory = null;
try {
topicConnectionFactory =
LXKJMSUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = LXKJMSUtilities.getTopic(topicName, topicSession);
topicPublisher = topicSession.createPublisher(topic);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
     System.exit(1);
}
}
public void publishMessages(HashMap hmInfo, String sEvent) {
TextMessage message = null;
StringBuilder sbMsgDetails = new StringBuilder(360);
try {
message = topicSession.createTextMessage();
if(LXKUtil.isNotBlank(sEvent) &&
          "PartRelease".equals(sEvent) && hmInfo != null){
     sbMsgDetails.append((String)hmInfo.get("PartName"));
}
message.setText(sbMsgDetails.toString());
// message.setText("MESSAGE NUMBER 1");
if(LXKUtil.isBlank(sEvent)){
     throw new JMSException("Event could not be found.");
}
if(hmInfo == null){
     throw new JMSException("Message details to be published not found.");
}
System.out.println("PUBLISHER: Publishing message: "
+ message.getText());
topicPublisher.publish(message);
// Send a non-text control message indicating end of messages.
topicPublisher.publish(topicSession.createMessage());
topicSession.commit();
} catch (JMSException e) {
     System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
}
}
public void finish() {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
public static void main(String[] args) {
     LXKJMSPublisher pub = new LXKJMSPublisher();
HashMap hmtopic = new HashMap(4);
hmtopic.put("PartName", "12344");
pub.publishMessages(hmtopic, "PartRelease");
     pub.finish();
     
     LXKJMSUtilities.exit(pub.exitResult);
}
}
public class LXKJMSDurableSubscriber {
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicSubscriber topicSubscriber = null;
TextListener topicListener = null;
int exitResult = 0;
long                timeStamp = System.currentTimeMillis();
String                topicName = "topic1";
String                subscriber = null;
private class TextListener implements MessageListener, ExceptionListener {         
final LXKJMSUtilities.DoneLatch monitor =
new LXKJMSUtilities.DoneLatch();
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage msg = (TextMessage) message;

try {
System.out.println("SUBSCRIBER: Reading message: "
+ msg.getText());
} catch (JMSException e) {
System.out.println("Exception in onMessage(): "
+ e.toString());
}
} else {
monitor.allDone();
}
}
public void onException(JMSException exception)
     {
     System.err.println("something bad happended: " + exception);
     }
}
public LXKJMSDurableSubscriber() {
TopicConnectionFactory topicConnectionFactory = null;

try {
topicConnectionFactory =
LXKJMSUtilities.getTopicConnectionFactory();
topicConnection =
topicConnectionFactory.createTopicConnection();
topicConnection.setClientID(Long.toString(timeStamp));
topicSession = topicConnection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
topic = LXKJMSUtilities.getTopic(topicName, topicSession);
} catch (Exception e) {
System.out.println("Connection problem: " + e.toString());
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException ee) {}
}
     System.exit(1);
}
}
public void startSubscriber() {
try {
System.out.println("Starting subscriber");
topicConnection.stop();
topicSubscriber = topicSession.createDurableSubscriber(topic,
          subscriber);
topicListener = new TextListener();
topicSubscriber.setMessageListener(topicListener);
topicConnection.start();
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
}
}
public void closeSubscriber() {
try {
topicListener.monitor.waitTillDone();
System.out.println("Closing subscriber");
topicSubscriber.close();
} catch (JMSException e) {
System.out.println("Exception occurred: " + e.toString());
exitResult = 1;
}
}
public void finish() {
if (topicConnection != null) {
try {
topicSession.unsubscribe(subscriber);
topicConnection.close();
} catch (JMSException e) {
exitResult = 1;
}
}
}
public static void main(String[] args) {
     LXKJMSDurableSubscriber dse = new LXKJMSDurableSubscriber();
if (args.length != 1) {
     System.out.println("Usage: java LXKJMSDurableSubscriber <subscriber_name>");
     System.exit(1);
     }
String subscriberName = new String(args[0]);
dse.subscriber = subscriberName;
System.out.println("Subscriber name is ==> " + dse.subscriber);
     dse.startSubscriber();
     dse.closeSubscriber();
     dse.finish();      
     LXKJMSUtilities.exit(dse.exitResult);
}
}
  • 1. Re: Durable Subscriber not working
    807581 Newbie
    Currently Being Moderated
    Let me add a few lines as to what is required for me. I want to invoke the Publisher class as a bean from jsp and publish some messages to a topic. The messages shall be published whenever the jsp is invoked. At the time the publisher publishes messages, the subscriber will not be online. I have a separate Subscriber class which will be invoked by user whenever he wants to check the messages that have been published to the topic. It should fetch the messages from the topic and display them to the user. Currently, when I invoke the publisher class and try to check if the messages have been published to the topic through openmq admin console Topic properties, I can see the Current Number of Messages as 0. And invoking the subscriber does not display any messages.

    Could you please let me know the correct approach to fulfill my requirement?
  • 2. Re: Durable Subscriber not working
    805009 Newbie
    Currently Being Moderated
    It may help if I clarify how a durable subscription works. This is a long-lived persistent entity which exists in the JMS server even if there are no clients connected, and which accumulates all persistent messages that are sent to the topic after the subscriber is created. To create a durable subscription, call session.createDurableSubscriber(). To destroy a durable subscription call session.unsubscribe(). After you destroy the subscription it will no longer accumulate messages.

    So in your application you need to make sure you call session.createDurableSubscriber() before the first message is sent to the topic. Otherwise (if there are no other subscribers) messages will simply be thrown away. If you want to, you can create the durable subscriber and then close the connection (without calling unsubscribe()) and shut down the application; the durable subscription will continue to accumulate messages.

    Your subscriber application can "activate" the durable subscription (by calling createDurableSubscriber()) at any time and then consume messages from it. It can close the connection (without calling unsubscribe()) and shut down the application whenever it want to; the durable subscription will continue to accumulate messages.

    Only when you no longer need to accumulate messages in the durable subscription should you call unsubscribe(). This will destroy it and it will no longer accumulate messages.

    Looking at your code, make sure the durable subcription is created before the first message is sent to the topic. And remove the call to unsubscribe() unless you really want to destroy the subscription.

    Nigel
  • 3. Re: Durable Subscriber not working
    807581 Newbie
    Currently Being Moderated
    Hi Nigel,

    Thanks a lot for the information. I did exactly as you mentioned. I removed the unsubscribe calls from the Subscriber class and before the first message was published, I invoked the Subscriber class and after that I invoked my Publisher class. It worked and I could see the count of messages in the topic from admin console. And since I had closed the subscriber, the Durable Subscriptions tab of the topic properties showed subscriber as INACTIVE and also the client ID and number of messages for the subscriber. But I have a problem. Even though I call my Subscriber class again to check the messages by calling the createDurableSubscriber() method, I can see that the subscriber is still inactive. I have used the same Subscriber name and client ID but still it remains inactive and is unable to read the messages. Is there something I'm still missing? If I set the client ID as a unique timestamp, will it do any harm? When I do that I could see subscribers created with same name but different client IDs. I am still not sure about this client ID concept.

    Regards,
    Anupam

    public void startSubscriber() {
    try {
    System.out.println("Starting subscriber");
    topicConnection.stop();
    topicSubscriber = topicSession.createDurableSubscriber(topic,
              subscriber);
    topicListener = new TextListener();
    topicSubscriber.setMessageListener(topicListener);
    topicConnection.start();

    } catch (JMSException e) {
    System.out.println("Exception occurred: " + e.toString());
    exitResult = 1;
    }
    }

    public void closeSubscriber() {
    try {
    //topicListener.monitor.waitTillDone();
    System.out.println("Closing subscriber");
    topicSubscriber.close();
    } catch (JMSException e) {
    System.out.println("Exception occurred: " + e.toString());
    exitResult = 1;
    }
    }
    public static void main(String[] args) {
         LXKJMSDurableSubscriber dse = new LXKJMSDurableSubscriber();

    if (args.length != 1) {
         System.out.println("Usage: java LXKJMSDurableSubscriber <subscriber_name>");
         System.exit(1);
         }

    String subscriberName = new String(args[0]);
    dse.subscriber = subscriberName;
    System.out.println("Subscriber name is ==> " + dse.subscriber);

         dse.startSubscriber();
         dse.closeSubscriber();
              
         LXKJMSUtilities.exit(dse.exitResult);
    }
  • 4. Re: Durable Subscriber not working
    805009 Newbie
    Currently Being Moderated
    When you "activate" an existing durable subscription you need to use the same clientID and durable subscription name that you used when creating it the first time.

    Nigel
  • 5. Re: Durable Subscriber not working
    807581 Newbie
    Currently Being Moderated
    Nigel,

    I have found out the problem now. I need to comment out subscriber.close() to read the messages. So now I am able to publish as well consume the messages. During the process I came to know few things like for a subscriber to consume messages, he should be subscribed to the topic even before messages were published.
    Since now I have to implement this in my application I have few questions to ask to make my concept clear. Suppose there are 3 subscribers sb1, sb2 and sb3 already subscribed to a topic say topic1. The publisher goes and publishes a message to the topic. So each of the subscriber sb1, sb2 and sb3 would be able to see the message that has been published. Now if sb4 subscribes himself to the topic later on after message has been published, I can see that he is not able to see any message allocated to his bucket. For this I have few questions -

    a) Is it possible through some means of configuration to always make a few number of subscribers to be automatically subscribed to a specific topic when the broker goes online? Or it is something that can only be handled by code?

    b) Is there any API to find the subscribers that are subscribed to a topic currently?

    c) What is the solution if a subscriber subscribes to a topic after a message has been published and he wants to see the message?

    Regards,
    Anupam
  • 6. Re: Durable Subscriber not working
    805009 Newbie
    Currently Being Moderated
    a) Is it possible through some means of configuration to always make a few number of subscribers to be automatically subscribed to a specific topic when the broker goes online? Or it is something that can only be handled by code?
    You should use the JMS API to create a durable subscription. Note that you only need to do this once. A durable subscription will survive a broker restart.
    Is there any API to find the subscribers that are subscribed to a topic currently?
    Not using the JMS API.

    With Message Queue you can do it using the (non-standard) JMX API.
    What is the solution if a subscriber subscribes to a topic after a message has been published and he wants to see the message?
    Use a queue instead?

    Nigel