3 Replies Latest reply: Jan 28, 2013 7:11 AM by Maarten Smeets RSS

    dequeue same message twice in topic mode.

    894458
      I have written a concurrent consumer program to consume AQ's message. The queue is a multiple consumer queue, and i just created one subscriber on it. Then i started 10 threads to dequeue messages from that queue. when all messages are consumed I found that some messages are consumed twice. Is there any tips for this problem?
      Thanks very much.

      The dequeue log is as follows:
      First:
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsListenerWorker.run: Before dispatchOneMsg to dispatch a message to the listener
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsListenerWorker.dispatchOneMsg: entry
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] [Thread-10] getLock (OJMS.MessageConsumer.-4ab69733:132fd95ca2e:-8000.37) : after sync, timeout=0
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] [Thread-10] getLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.28) : after sync, timeout=0
      AQ deliveryMode: 1
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] [Thread-10] getLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.28) : acquired session lock, usecount=1
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] [Thread-10] getLock (OJMS.MessageConsumer.-4ab69733:132fd95ca2e:-8000.37) : acquired consumer lock, usecount=1
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsListenerWorker.dispatchOneMsg: lockReceive returns, got the lock
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.receiveForListener: entry
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: entry
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: visibility : 2
      AQ deliveryMode: 1
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: sec_timeout: 0 close_check_interval: 120
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: After getDbConnection
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: q_name: DEV1_TRANSACTION.UNIT_TEST_Q p_data_type: DEV1_TRANSACTION.GC_NOTIFY_T
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: Payload type-1
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: use_ociaq_lib = false
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsSession.getDequeueStmt: dequeue statement: begin dbms_aqin.aq$_dequeue_in(queue_name => ?, subscriber => ?, msgid => ?, correlation => ?, dequeue_mode => ?, navigation => ?, visibility => ?, wait => ?, enqueue_time => ?, state => ?, out_msgid => ?, out_correlation => ?, priority => ?, delay => ?, expiration => ?, attempts => ?, exception_queue => ?, remote_recipients => ?, sender_name => ?, sender_addr => ?, sender_protocol => ?, original_msgid => ?, payload_type => ?, raw_user_data => ?, object_user_data => ?, deq_cond => ?, signature => ?, out_sign => ?, transformation => ?, delivery_mode => ?, out_delivery_mode => ?); end;

      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: consumer_name: SUBSCRIBER1
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: dq_mode: 3navig: 1visibility: 2
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: adt_message retrieved
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: set AQ enqueue_time as 1318514224000
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: msg_id: AEDE32CF4AAEB32EE040010A1AD76B10 corrid: null priority: 1
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: msg_delay(secs): 0
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: exptime(secs): -1
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: attempts: 0
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: excp_q: null
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: recv_time: Thu Oct 13 22:01:20 CST 2011
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: exit-2
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.receiveForListener: exit
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsListenerWorker.dispatchOneMsg: Received the message: AEDE32CF4AAEB32EE040010A1AD76B10
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsSimpleScheduler.feedData: Got a non null message, the sleep time is reset to 0
      Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsListenerWorker.dispatchOneMsg: Before calling onMessage method
      2011-10-13 22:01:21 INFO (MockAqListener.java:49)[Thread-10] - Thread-10:3Message ID:AEDE32CF4AAEB32EE040010A1AD76B10 Message :AEDE32CF4AADB32EE040010A1AD76B10
      Thread-10 [Thu Oct 13 22:01:21 CST 2011] [Thread-10] getLock (OJMS.Session..-4ab69733:132fd95ca2e:-8000.28) : after sync, timeout=0
      Thread-10 [Thu Oct 13 22:01:21 CST 2011] [Thread-10] getLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.28) : Thread Thread-10 try to require lock mulitple times, grant it again.
      Thread-10 [Thu Oct 13 22:01:21 CST 2011] [Thread-10] getLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.28) : acquired session lock, usecount=2
      2011-10-13 22:01:21 INFO (MockAqListener.java:49)[Thread-19] - Thread-19:2Message ID:AEDE32CF4A86B32EE040010A1AD76B10 Message :AEDE32CF4A85B32EE040010A1AD76B10
      Thread-10 [Thu Oct 13 22:01:22 CST 2011] AQjmsSession.restartConsumers: entry
      Thread-10 [Thu Oct 13 22:01:22 CST 2011] [Thread-10] releaseLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.28) : after sync
      Thread-10 [Thu Oct 13 22:01:22 CST 2011] [Thread-10] unLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.28) : released session lock, EXIT, usecount=1
      2011-10-13 22:01:22 INFO (MockAqListener.java:55)[Thread-10] - Thread-10:3Message ID:AEDE32CF4AAEB32EE040010A1AD76B10 Message :AEDE32CF4AADB32EE040010A1AD76B10Commit
      Thread-10 [Thu Oct 13 22:01:22 CST 2011] AQjmsListenerWorker.dispatchOneMsg: After calling onMessage method
      Thread-10 [Thu Oct 13 22:01:22 CST 2011] [Thread-10] unLock (OJMS.MessageConsumer.-4ab69733:12e:-8000.67) : released session lock, EXIT, usecount=1
      Thread-10 [Thu Oct 13 22:01:22 CST 2011] [Thread-10] releaseLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.28) : after sync
      Thread-10 [Thu Oct 13 22:01:22 CST 2011] [Thread-10] unLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.28) : released session lock, EXIT, usecount=0
      Thread-10 [Thu Oct 13 22:01:22 CST 2011] AQjmsListenerWorker.dispatchOneMsg: unlock the session and EXIT



      Second:

      Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsListenerWorker.dispatchOneMsg: lockReceive returns, got the lock
      Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.receiveForListener: entry
      Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: entry
      Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: visibility : 2
      AQ deliveryMode: 1
      Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: sec_timeout: 0 close_check_interval: 120
      Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: After getDbConnection
      Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: q_name: DEV1_TRANSACTION.UNIT_TEST_Q p_data_type: DEV1_TRANSACTION.GC_NOTIFY_T
      Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: Payload type-1
      Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: use_ociaq_lib = false
      Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsSession.getDequeueStmt: dequeue statement: begin dbms_aqin.aq$_dequeue_in(queue_name => ?, subscriber => ?, msgid => ?, correlation => ?, dequeue_mode => ?, navigation => ?, visibility => ?, wait => ?, enqueue_time => ?, state => ?, out_msgid => ?, out_correlation => ?, priority => ?, delay => ?, expiration => ?, attempts => ?, exception_queue => ?, remote_recipients => ?, sender_name => ?, sender_addr => ?, sender_protocol => ?, original_msgid => ?, payload_type => ?, raw_user_data => ?, object_user_data => ?, deq_cond => ?, signature => ?, out_sign => ?, transformation => ?, delivery_mode => ?, out_delivery_mode => ?); end;
      Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: consumer_name: SUBSCRIBER1
      Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: dq_mode: 3navig: 1visibility: 2
      Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: adt_message retrieved
      Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: set AQ enqueue_time as 1318514224000
      Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: msg_id: AEDE32CF4AAEB32EE040010A1AD76B10 corrid: null priority: 1
      Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: msg_delay(secs): 0
      Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: exptime(secs): -1
      Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: attempts: 0
      Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: excp_q: null
      Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: recv_time: Thu Oct 13 22:01:29 CST 2011
      Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: exit-2
      Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.receiveForListener: exit
      Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsListenerWorker.dispatchOneMsg: Received the message: AEDE32CF4AAEB32EE040010A1AD76B10
      Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsSimpleScheduler.feedData: Got a non null message, the sleep time is reset to 0
      Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsListenerWorker.dispatchOneMsg: Before calling onMessage method
      2011-10-13 22:01:30 INFO (MockAqListener.java:49)[Thread-11] - Thread-11:5Message ID:AEDE32CF4AAEB32EE040010A1AD76B10 Message :AEDE32CF4AADB32EE040010A1AD76B10
      Thread-11 [Thu Oct 13 22:01:30 CST 2011] [Thread-11] getLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.26) : after sync, timeout=0
      Thread-11 [Thu Oct 13 22:01:30 CST 2011] [Thread-11] getLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.26) : Thread Thread-11 try to require lock mulitple times, grant it again.
      Thread-11 [Thu Oct 13 22:01:30 CST 2011] [Thread-11] getLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.26) : acquired session lock, usecount=2
      Thread-11 [Thu Oct 13 22:01:30 CST 2011] AQjmsSession.restartConsumers: entry
      Thread-11 [Thu Oct 13 22:01:30 CST 2011] [Thread-11] releaseLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.26) : after sync
      Thread-11 [Thu Oct 13 22:01:30 CST 2011] [Thread-11] unLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.26) : released session lock, EXIT, usecount=1
      2011-10-13 22:01:30 INFO (MockAqListener.java:55)[Thread-11] - Thread-11:5Message ID:AEDE32CF4AAEB32EE040010A1AD76B10 Message :AEDE32CF4AADB32EE040010A1AD76B10Commit
      Thread-11 [Thu Oct 13 22:01:30 CST 2011] AQjmsListenerWorker.dispatchOneMsg: After calling onMessage method
      Thread-11 [Thu Oct 13 22:01:30 CST 2011] [Thread-11] unLock (OJMS.MessageConsumer.-4ab69733:132fd95ca2e:-8000.43) : released consumer lock, usecount=0
      Thread-11 [Thu Oct 13 22:01:30 CST 2011] [Thread-11] releaseLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.26) : after sync
      Thread-11 [Thu Oct 13 22:01:30 CST 2011] [Thread-11] unLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.26) : released session lock, EXIT, usecount=0
      Thread-11 [Thu Oct 13 22:01:30 CST 2011] AQjmsListenerWorker.dispatchOneMsg: unlock the session and EXIT
      Thread-11 [Thu Oct 13 22:01:30 CST 2011] AQjmsListenerWorker.run: After dispatchOneMsg, Before get the next sleep time
      Thread-11 [Thu Oct 13 22:01:30 CST 2011] AQjmsListenerWorker.run: sleep 0 millisecond.
        • 1. Re: dequeue same message twice in topic mode.
          710157
          A shot in the dark, but you are committing after the dequeue, right?
          • 2. Re: dequeue same message twice in topic mode.
            921593
            I have the same problem, did you get any solution for this???

            Thanks in advance!!!
            • 3. Re: dequeue same message twice in topic mode.
              Maarten Smeets
              This issue looks like Bug: 13729601

              (from http://www.oracle.com/technetwork/middleware/docs/aiasoarelnotesps5-1455925.html)

              The dequeuer returns the same message in multiple threads in high concurrency environments when Oracle database 11.2 is used. This means that some messages are dequeued more than once. For example, in Oracle SOA Suite, if Service 1 suddenly raises a large number of business events that are subscribed to by Service 2, duplicate instances of Service 2 triggered by the same event may be seen in an intermittent fashion. The same behavior is not observed with a 10.2.0.5 database or in an 11.2 database with event10852 level 16384 set to disable the 11.2 dequeue optimizations.

              Workaround: Perform the following steps:

              Log in to the 11.2 database:
              CONNECT /AS SYSDBA

              Specify the following SQL command in SQL*Plus to disable the 11.2 dequeue optimizations:
              SQL> alter system set event='10852 trace name context forever,
              level 16384'scope=spfile;

              Restart the database.

              Edited by: Maarten Smeets on Jan 28, 2013 2:10 PM