Forum Stats

  • 3,757,792 Users
  • 2,251,266 Discussions
  • 7,869,916 Comments

Discussions

jms error on receive

bluefrog
bluefrog Member Posts: 1,512
edited Oct 12, 2011 3:54AM in Java Message Service (JMS)
Hi,

This is a duplicate post, that exists on the Oracle AQ forum -
9926612

It is related to JMS however since I am getting a very specific JMS error and thought
I'd post in this forum as I have had no replies in the AQ forum.

The database is 11.2.0.1 on Windows 32bit.The following describes the problem:
----------------------------------
I am attempting a receive a message from a Queue, that has a SYS.AQ$_JMS_TEXT_MESSAGE payload type.
the message was sucessfully placed onto the queue with this JMS session;
9925977

The message exists on the queue;
SQL> col Msg format a20
SQL> select msg_id, msg_state, qt.user_data.text_vc Msg
  2  from aq$jms_qtt_text qt
SQL> / 
 
MSG_ID                           MSG_STATE        MSG
-------------------------------- ---------------- ------------
E9375239910A42518AA0E10DCADC3CB4 READY            Test Message
The queue was created as follows;
exec dbms_aqadm.create_queue_table (queue_table=>'jms_qtt_text', queue_payload_type=>'SYS.AQ$_JMS_TEXT_MESSAGE');
exec dbms_aqadm.create_queue (Queue_name=>'jms_text_que',Queue_table=>'jms_qtt_text');
exec dbms_aqadm.start_queue (queue_name=>'jms_text_que');
I am using the following Java class and Java stored procedure;
SQL> create or replace and compile java source named BasicAQDequeue as
  2  import java.sql.Connection;
  3  import java.sql.Driver;
  4  import java.sql.DriverManager;
  5  import javax.jms.Queue;
  6  import javax.jms.QueueConnection;
  7  import javax.jms.QueueConnectionFactory;
  8  import javax.jms.QueueReceiver;
  9  import javax.jms.QueueSession;
 10  import javax.jms.TextMessage;
 11  import javax.jms.Session;
 12  import oracle.jms.AQjmsFactory;
 13  import oracle.jdbc.*;
 14  import oracle.jdbc.aq.*;
 15  import oracle.jms.*;
 16
 17  public class BasicAQDequeue {
 18
 19      private QueueReceiver     consumer;
 20      private Queue             queue;
 21      private QueueConnection   consumerConnection;
 22      private QueueSession      consumerSession;
 23      private static final String   QUEUE_NAME  = "JMS_TEXT_QUE";
 24      private static final String   QUEUE_OWNER = "XDB_DEV4";
 25      private static final int      ACK_MODE    = QueueSession.AUTO_ACKNOWLEDGE;
 26      private static final boolean  TRANSACTED  = false;
 27
 28      /* Creates and initializes the test. */
 29      public BasicAQDequeue() throws Exception {
 30          super();
 31          // init consumer session - Use Oracle default thin driver connection
 32          Connection conn     = DriverManager.getConnection("jdbc:default:connection:");
 33          consumerConnection  = AQjmsQueueConnectionFactory.createQueueConnection(conn);
 34          consumerSession     = consumerConnection.createQueueSession(TRANSACTED,ACK_MODE);
 35          queue               = consumerSession.createQueue(QUEUE_NAME);
 36          consumer            = consumerSession.createReceiver(queue);
 37      }
 38      public static void main (String args[]) throws Exception
 39        {
 40          BasicAQDequeue qTest  = new BasicAQDequeue();
 41          qTest.run();
 42          qTest.cleanup();
 43        }
 44      /** Closes the JMS connections.throws Exception */
 45      private void cleanup() throws Exception {
 46          // cleanup consumer
 47          System.out.println("Start cleanup.");
 48          consumerConnection.stop();
 49          consumer.close();
 50          consumerSession.close();
 51          consumerConnection.close();
 52          System.out.println("Done.");
 53      }
 54      /* The message send loop. Throws Exception */
 55      private void run() throws Exception {
 56          TextMessage message = (TextMessage) consumer.receive(10);
 57  //        consumerSession.commit(); // This line causes the error
 58          System.out.println("Message received: " + message.getText());
 59      }
 60  }
 61  / 
 
Java created.
 
SQL> create or replace procedure BasicAQDequeue as
  2  language java name 'BasicAQDequeue.main(java.lang.String[])';
  3  / 
 
Procedure created.
 
SQL>
I am getting the following error when I run a test;
SQL> begin
  2    BasicAQDequeue;
  3  end;
  4  / 
Exception in thread "Root Thread" java.lang.NullPointerException
        at BasicAQDequeue.run(BASICAQDEQUEUE:57)
        at BasicAQDequeue.main(BASICAQDEQUEUE:40)
Which suggests that there is no data in the message variable.
So the question in this scenario is why has the "receive" method not worked?

This is my main question, and if I would appreciate getting this answered above my next question.

When I uncomment line 57;
57  //        consumerSession.commit(); // This line causes the error
I get the following error;
SQL> l
  1  begin
  2    BasicAQDequeue;
  3* end;
SQL> / 
Exception in thread "Root Thread" javax.jms.IllegalStateException:
JMS-227: Illegal attempt to commit on a non transacted JMS Session
The question for this scenario is why is the commit not acceptable?
I appreciate there are two questions in this post, but the code is the same.

Any help would be much appreciated.

thanks

Edited by: bluefrog on Oct 11, 2011 11:14 AM
Tagged:

Best Answer

Answers

  • ramp
    ramp Member Posts: 2,358
    edited Oct 11, 2011 7:29AM
    So the question in this scenario is why has the "receive" method not worked?
    You have used a receive(long timeout) method which would read a message if there is one or return if there are no messages to be read within the timeout period. which is set to 10 ms in your code. Obviously your posted message has not made it to the queue when your client connects. Maybe your problems are elsewhere and the message was never posted at all. I suggest you debug using the receive() method (no args) which blocks until a message is available to be read.


    why is the commit not acceptable?
    You use a non transacted session. Look at line 26 and 34 in the code you posted - the 'TRANSACTED' variable. The api clearly states that commit() would throw an IllegalStateException when not called by a transacted session.

    Edited by: ramp on Oct 11, 2011 4:59 PM
    ramp
  • bluefrog
    bluefrog Member Posts: 1,512
    Hi Ramp,

    thanks for the help.

    I made sure the message does however exist on the queue before I attempted the test to receive.
    So I don't think its because the message is not on the queue;

    This is some proof;
    SQL> alter session set nls_date_format = 'dd/mm/yyyy hh24:mi';
    
    Session altered.
    
    SQL> select sysdate from dual;
    
    SYSDATE
    ----------------
    11/10/2011 12:57
    
    SQL> col Msg format a20
    SQL> select msg_id, msg_state, qt.user_data.text_vc Msg from aq$jms_qtt_text qt;
    
    MSG_ID                           MSG_STATE        MSG
    -------------------------------- ---------------- --------------------
    E9375239910A42518AA0E10DCADC3CB4 READY            Test Message
    
    SQL> begin BasicAQDequeue; end;
      2  /
    begin BasicAQDequeue; end;
    *
    ERROR at line 1:
    ORA-29532: Java call terminated by uncaught Java exception: java.lang.NullPointerException
    ORA-06512: at "XDB_DEV4.BASICAQDEQUEUE", line 1
    ORA-06512: at line 1
    As you can see the message is in a READY state, which means the receive should work.

    I agree with the issue of the TRANSACTION though.
  • ramp
    ramp Member Posts: 2,358
    I was rooting for the most obvious cause :)

    I have not seen many uses of the session.createQueue() that returns a Queue identity, but I assume that's the AQ equivalent of looking up the Queue from an InitialContext as is usually the norm. Other than that your example is pretty straightforward and I do not see a reason why it should not work assuming you checked for typos in queue name and are connecting to the correct environment (your connection factory does not have the details)
  • bluefrog
    bluefrog Member Posts: 1,512
    Hi,

    The connectionFactory uses the default thin database driver since the Java Stored procedure is running within the Database, so no logon connection details need to be supplied.

    This is proof that the connection does in fact work;
    If comment out the "receive" method call and replace it with a prepared select statement as follows;
    create or replace and compile java source named BasicAQDequeue as
    import java.sql.Connection;
    import java.sql.Driver;
    import java.sql.DriverManager;
    import javax.jms.Queue;
    import javax.jms.QueueConnection;
    import javax.jms.QueueConnectionFactory;
    import javax.jms.QueueReceiver;
    import javax.jms.QueueSession;
    import javax.jms.Message;
    import javax.jms.TextMessage;
    import javax.jms.Session;
    import oracle.jms.AQjmsFactory;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import oracle.jdbc.*;
    import oracle.jdbc.aq.*;
    import oracle.jms.*;
    
    public class BasicAQDequeue {
    
        private QueueReceiver     consumer;
        private Queue             queue;
        private QueueConnection   consumerConnection;
        private QueueSession      consumerSession;
        private Connection        conn;
        private static final String   QUEUE_NAME  = "JMS_TEXT_QUE";
        private static final int      ACK_MODE    = QueueSession.AUTO_ACKNOWLEDGE;
        private static final boolean  TRANSACTED  = true;
    
        /* Creates and initializes the test. */
        public BasicAQDequeue() throws Exception {
            super();
            // init consumer session - Use Oracle default thin driver connection
            conn                = DriverManager.getConnection("jdbc:default:connection:");
            consumerConnection  = AQjmsQueueConnectionFactory.createQueueConnection(conn);
            consumerSession     = consumerConnection.createQueueSession(TRANSACTED,ACK_MODE);
            queue               = consumerSession.createQueue(QUEUE_NAME);
            consumer            = consumerSession.createReceiver(queue);
        }
        public static void main (String args[]) throws Exception
          {
            BasicAQDequeue qTest  = new BasicAQDequeue();
            qTest.run();
            qTest.cleanup();
          }
        /** Closes the JMS connections.throws Exception */
        private void cleanup() throws Exception {
            // cleanup consumer
            System.out.println("Start cleanup.");
            consumerConnection.stop();
            consumer.close();
            consumerSession.close();
            consumerConnection.close();
            System.out.println("Done.");
        }
        /* The message send loop. Throws Exception */
        private void run() throws Exception {
            TextMessage message;
    
            // Test to see if connection works
            PreparedStatement pStmt = conn.prepareStatement("select count(*) from emp");
            ResultSet rSet = pStmt.executeQuery();
            rSet.next();
            System.out.println("Number of Employees is: " + rSet.getString(1));
            rSet.close();
    
    
    //        message = (TextMessage)consumer.receive(1000);
    //        consumerSession.commit(); // This line causes the error
    //        System.out.println("Message received: " + message.getText());
        }
    }
    /
    As you can see, the result is shown;
    SQL> create or replace procedure BasicAQDequeue as
      2  language java name 'BasicAQDequeue.main(java.lang.String[])';
      3  /
    
    Procedure created.
    
    SQL> exec dbms_java.set_output(10000);
    
    PL/SQL procedure successfully completed.
    
    SQL> set serveroutput on
    SQL> begin
      2    BasicAQDequeue;
      3  end;
      4  /
    Number of Employees is: 1
    Start cleanup.
    Done.
    
    PL/SQL procedure successfully completed.
    
    SQL> select count(*) from emp;
    
      COUNT(*)
    ----------
             1
  • bluefrog wrote:
    Hi Ramp,

    thanks for the help.

    I made sure the message does however exist on the queue before I attempted the test to receive.
    So I don't think its because the message is not on the queue;
    It is still possible that you're not giving the receiver enough time to deliver the message. Try increasing the timeout from 10ms to several seconds.

    Nigel
    Nigel Deakin-Oracle
  • bluefrog
    bluefrog Member Posts: 1,512
    Hi

    I've tried with 10000 and receive(), which is forever, but still same outcome.

    In addition, I've tried dequeing the message using PL/SQL and the message appears;
    set serveroutput on
    declare
      queue_options         dbms_aq.dequeue_options_t;
      message_properties    dbms_aq.message_properties_t;
      message_id            raw(2000);
      my_message            SYS.AQ$_JMS_TEXT_MESSAGE;
      msg_text varchar2(32767);
    
    begin
        dbms_aq.dequeue(queue_name          => 'JMS_TEXT_QUE'
                       ,dequeue_options     => queue_options
                       ,message_properties  => message_properties
                       ,payload             => my_message
                       ,msgid               => message_id);
    
        commit;
    
        my_message.get_text(msg_text);
    
        dbms_output.put_line('JMS message: '||msg_text);
    end;
    /
  • Nigel Deakin-Oracle
    Nigel Deakin-Oracle Member Posts: 115
    Accepted Answer
    Thanks for trying that.

    Here's a new idea. Did you remember to call consumerConnection.start() anywhere? Messages will not be delivered to a consumer until its connection (javax.jms.Connection) is started.

    Nigel
  • bluefrog
    bluefrog Member Posts: 1,512
    edited Oct 12, 2011 3:54AM
    Great!
    Thanks so much, it now works!
    SQL> create or replace and compile java source named BasicAQDequeue as
      2  import java.sql.Connection;
      3  import java.sql.Driver;
      4  import java.sql.DriverManager;
      5  import javax.jms.Queue;
      6  import javax.jms.QueueConnection;
      7  import javax.jms.QueueConnectionFactory;
      8  import javax.jms.QueueReceiver;
      9  import javax.jms.QueueSender;
     10  import javax.jms.QueueSession;
     11  import javax.jms.Message;
     12  import javax.jms.TextMessage;
     13  import javax.jms.Session;
     14  import oracle.jms.AQjmsFactory;
     15  import java.sql.PreparedStatement;
     16  import java.sql.ResultSet;
     17  import oracle.jdbc.*;
     18  import oracle.jdbc.aq.*;
     19  import oracle.jms.*;
     20  
     21  public class BasicAQDequeue {
     22  
     23      private QueueReceiver     consumer;
     24      private Queue             queue;
     25      private QueueConnection   consumerConnection;
     26      private QueueSession      consumerSession;
     27      private Connection        conn;
     28      private static final String   QUEUE_NAME  = "JMS_TEXT_QUE";
     29      private static final int      ACK_MODE    = QueueSession.AUTO_ACKNOWLEDGE;
     30      private static final boolean  TRANSACTED  = true;
     31  
     32      /* Creates and initializes the test. */
     33      public BasicAQDequeue() throws Exception {
     34          super();
     35          // init consumer session - Use Oracle default thin driver connection
     36          conn                = DriverManager.getConnection("jdbc:default:connection:");
     37          consumerConnection  = AQjmsQueueConnectionFactory.createQueueConnection(conn);
     38          consumerConnection.start();
     39          consumerSession     = consumerConnection.createQueueSession(TRANSACTED,ACK_MODE);
     40          queue               = consumerSession.createQueue(QUEUE_NAME);
     41          consumer            = consumerSession.createReceiver(queue);
     42      }
     43      public static void main (String args[]) throws Exception
     44        {
     45          BasicAQDequeue qTest  = new BasicAQDequeue();
     46          qTest.run();
     47          qTest.cleanup();
     48        }
     49      /** Closes the JMS connections.throws Exception */
     50      private void cleanup() throws Exception {
     51          // cleanup consumer
     52          System.out.println("Start cleanup.");
     53          consumerConnection.stop();
     54          consumer.close();
     55          consumerSession.close();
     56          consumerConnection.close();
     57          System.out.println("Done");
     58      }
     59      /* The message send loop. Throws Exception */
     60      private void run() throws Exception {
     61  
     62          TextMessage message;
     63          message = (TextMessage)consumer.receive(1000);
     64          consumerSession.commit();
     65          System.out.println("Message received: " + message.getText());
     66      }
     67  }
     68  /
    
    Java created.
    
    SQL> create or replace procedure BasicAQDequeue as
      2  language java name 'BasicAQDequeue.main(java.lang.String[])';
      3  /
    
    Procedure created.
    
    SQL> 
    SQL> exec dbms_java.set_output(10000);
    
    PL/SQL procedure successfully completed.
    
    SQL> 
    SQL> set serveroutput on
    SQL> begin
      2    BasicAQDequeue;
      3  end;
      4  /
    Message received: Test Message 1
    Start cleanup.
    Done
    
    PL/SQL procedure successfully completed.
    Edited by: bluefrog on Oct 12, 2011 8:52 AM
This discussion has been closed.