This discussion is archived
3 Replies Latest reply: Jan 28, 2013 5:11 AM by 732544 RSS

dequeue same message twice in topic mode.

894458 Newbie
Currently Being Moderated
I have written a concurrent consumer program to consume AQ's message. The queue is a multiple consumer queue, and i just created one subscriber on it. Then i started 10 threads to dequeue messages from that queue. when all messages are consumed I found that some messages are consumed twice. Is there any tips for this problem?
Thanks very much.

The dequeue log is as follows:
First:
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsListenerWorker.run: Before dispatchOneMsg to dispatch a message to the listener
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsListenerWorker.dispatchOneMsg: entry
Thread-10 [Thu Oct 13 22:01:20 CST 2011] [Thread-10] getLock (OJMS.MessageConsumer.-4ab69733:132fd95ca2e:-8000.37) : after sync, timeout=0
Thread-10 [Thu Oct 13 22:01:20 CST 2011] [Thread-10] getLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.28) : after sync, timeout=0
AQ deliveryMode: 1
Thread-10 [Thu Oct 13 22:01:20 CST 2011] [Thread-10] getLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.28) : acquired session lock, usecount=1
Thread-10 [Thu Oct 13 22:01:20 CST 2011] [Thread-10] getLock (OJMS.MessageConsumer.-4ab69733:132fd95ca2e:-8000.37) : acquired consumer lock, usecount=1
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsListenerWorker.dispatchOneMsg: lockReceive returns, got the lock
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.receiveForListener: entry
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: entry
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: visibility : 2
AQ deliveryMode: 1
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: sec_timeout: 0 close_check_interval: 120
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: After getDbConnection
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: q_name: DEV1_TRANSACTION.UNIT_TEST_Q p_data_type: DEV1_TRANSACTION.GC_NOTIFY_T
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: Payload type-1
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: use_ociaq_lib = false
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsSession.getDequeueStmt: dequeue statement: begin dbms_aqin.aq$_dequeue_in(queue_name => ?, subscriber => ?, msgid => ?, correlation => ?, dequeue_mode => ?, navigation => ?, visibility => ?, wait => ?, enqueue_time => ?, state => ?, out_msgid => ?, out_correlation => ?, priority => ?, delay => ?, expiration => ?, attempts => ?, exception_queue => ?, remote_recipients => ?, sender_name => ?, sender_addr => ?, sender_protocol => ?, original_msgid => ?, payload_type => ?, raw_user_data => ?, object_user_data => ?, deq_cond => ?, signature => ?, out_sign => ?, transformation => ?, delivery_mode => ?, out_delivery_mode => ?); end;

Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: consumer_name: SUBSCRIBER1
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: dq_mode: 3navig: 1visibility: 2
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: adt_message retrieved
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: set AQ enqueue_time as 1318514224000
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: msg_id: AEDE32CF4AAEB32EE040010A1AD76B10 corrid: null priority: 1
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: msg_delay(secs): 0
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: exptime(secs): -1
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: attempts: 0
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: excp_q: null
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: recv_time: Thu Oct 13 22:01:20 CST 2011
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.dequeue: exit-2
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsConsumer.receiveForListener: exit
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsListenerWorker.dispatchOneMsg: Received the message: AEDE32CF4AAEB32EE040010A1AD76B10
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsSimpleScheduler.feedData: Got a non null message, the sleep time is reset to 0
Thread-10 [Thu Oct 13 22:01:20 CST 2011] AQjmsListenerWorker.dispatchOneMsg: Before calling onMessage method
2011-10-13 22:01:21 INFO (MockAqListener.java:49)[Thread-10] - Thread-10:3Message ID:AEDE32CF4AAEB32EE040010A1AD76B10 Message :AEDE32CF4AADB32EE040010A1AD76B10
Thread-10 [Thu Oct 13 22:01:21 CST 2011] [Thread-10] getLock (OJMS.Session..-4ab69733:132fd95ca2e:-8000.28) : after sync, timeout=0
Thread-10 [Thu Oct 13 22:01:21 CST 2011] [Thread-10] getLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.28) : Thread Thread-10 try to require lock mulitple times, grant it again.
Thread-10 [Thu Oct 13 22:01:21 CST 2011] [Thread-10] getLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.28) : acquired session lock, usecount=2
2011-10-13 22:01:21 INFO (MockAqListener.java:49)[Thread-19] - Thread-19:2Message ID:AEDE32CF4A86B32EE040010A1AD76B10 Message :AEDE32CF4A85B32EE040010A1AD76B10
Thread-10 [Thu Oct 13 22:01:22 CST 2011] AQjmsSession.restartConsumers: entry
Thread-10 [Thu Oct 13 22:01:22 CST 2011] [Thread-10] releaseLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.28) : after sync
Thread-10 [Thu Oct 13 22:01:22 CST 2011] [Thread-10] unLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.28) : released session lock, EXIT, usecount=1
2011-10-13 22:01:22 INFO (MockAqListener.java:55)[Thread-10] - Thread-10:3Message ID:AEDE32CF4AAEB32EE040010A1AD76B10 Message :AEDE32CF4AADB32EE040010A1AD76B10Commit
Thread-10 [Thu Oct 13 22:01:22 CST 2011] AQjmsListenerWorker.dispatchOneMsg: After calling onMessage method
Thread-10 [Thu Oct 13 22:01:22 CST 2011] [Thread-10] unLock (OJMS.MessageConsumer.-4ab69733:12e:-8000.67) : released session lock, EXIT, usecount=1
Thread-10 [Thu Oct 13 22:01:22 CST 2011] [Thread-10] releaseLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.28) : after sync
Thread-10 [Thu Oct 13 22:01:22 CST 2011] [Thread-10] unLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.28) : released session lock, EXIT, usecount=0
Thread-10 [Thu Oct 13 22:01:22 CST 2011] AQjmsListenerWorker.dispatchOneMsg: unlock the session and EXIT



Second:

Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsListenerWorker.dispatchOneMsg: lockReceive returns, got the lock
Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.receiveForListener: entry
Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: entry
Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: visibility : 2
AQ deliveryMode: 1
Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: sec_timeout: 0 close_check_interval: 120
Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: After getDbConnection
Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: q_name: DEV1_TRANSACTION.UNIT_TEST_Q p_data_type: DEV1_TRANSACTION.GC_NOTIFY_T
Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: Payload type-1
Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: use_ociaq_lib = false
Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsSession.getDequeueStmt: dequeue statement: begin dbms_aqin.aq$_dequeue_in(queue_name => ?, subscriber => ?, msgid => ?, correlation => ?, dequeue_mode => ?, navigation => ?, visibility => ?, wait => ?, enqueue_time => ?, state => ?, out_msgid => ?, out_correlation => ?, priority => ?, delay => ?, expiration => ?, attempts => ?, exception_queue => ?, remote_recipients => ?, sender_name => ?, sender_addr => ?, sender_protocol => ?, original_msgid => ?, payload_type => ?, raw_user_data => ?, object_user_data => ?, deq_cond => ?, signature => ?, out_sign => ?, transformation => ?, delivery_mode => ?, out_delivery_mode => ?); end;
Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: consumer_name: SUBSCRIBER1
Thread-11 [Thu Oct 13 22:01:28 CST 2011] AQjmsConsumer.dequeue: dq_mode: 3navig: 1visibility: 2
Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: adt_message retrieved
Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: set AQ enqueue_time as 1318514224000
Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: msg_id: AEDE32CF4AAEB32EE040010A1AD76B10 corrid: null priority: 1
Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: msg_delay(secs): 0
Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: exptime(secs): -1
Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: attempts: 0
Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: excp_q: null
Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: recv_time: Thu Oct 13 22:01:29 CST 2011
Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.dequeue: exit-2
Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsConsumer.receiveForListener: exit
Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsListenerWorker.dispatchOneMsg: Received the message: AEDE32CF4AAEB32EE040010A1AD76B10
Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsSimpleScheduler.feedData: Got a non null message, the sleep time is reset to 0
Thread-11 [Thu Oct 13 22:01:29 CST 2011] AQjmsListenerWorker.dispatchOneMsg: Before calling onMessage method
2011-10-13 22:01:30 INFO (MockAqListener.java:49)[Thread-11] - Thread-11:5Message ID:AEDE32CF4AAEB32EE040010A1AD76B10 Message :AEDE32CF4AADB32EE040010A1AD76B10
Thread-11 [Thu Oct 13 22:01:30 CST 2011] [Thread-11] getLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.26) : after sync, timeout=0
Thread-11 [Thu Oct 13 22:01:30 CST 2011] [Thread-11] getLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.26) : Thread Thread-11 try to require lock mulitple times, grant it again.
Thread-11 [Thu Oct 13 22:01:30 CST 2011] [Thread-11] getLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.26) : acquired session lock, usecount=2
Thread-11 [Thu Oct 13 22:01:30 CST 2011] AQjmsSession.restartConsumers: entry
Thread-11 [Thu Oct 13 22:01:30 CST 2011] [Thread-11] releaseLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.26) : after sync
Thread-11 [Thu Oct 13 22:01:30 CST 2011] [Thread-11] unLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.26) : released session lock, EXIT, usecount=1
2011-10-13 22:01:30 INFO (MockAqListener.java:55)[Thread-11] - Thread-11:5Message ID:AEDE32CF4AAEB32EE040010A1AD76B10 Message :AEDE32CF4AADB32EE040010A1AD76B10Commit
Thread-11 [Thu Oct 13 22:01:30 CST 2011] AQjmsListenerWorker.dispatchOneMsg: After calling onMessage method
Thread-11 [Thu Oct 13 22:01:30 CST 2011] [Thread-11] unLock (OJMS.MessageConsumer.-4ab69733:132fd95ca2e:-8000.43) : released consumer lock, usecount=0
Thread-11 [Thu Oct 13 22:01:30 CST 2011] [Thread-11] releaseLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.26) : after sync
Thread-11 [Thu Oct 13 22:01:30 CST 2011] [Thread-11] unLock (OJMS.Session.-4ab69733:132fd95ca2e:-8000.26) : released session lock, EXIT, usecount=0
Thread-11 [Thu Oct 13 22:01:30 CST 2011] AQjmsListenerWorker.dispatchOneMsg: unlock the session and EXIT
Thread-11 [Thu Oct 13 22:01:30 CST 2011] AQjmsListenerWorker.run: After dispatchOneMsg, Before get the next sleep time
Thread-11 [Thu Oct 13 22:01:30 CST 2011] AQjmsListenerWorker.run: sleep 0 millisecond.
  • 1. Re: dequeue same message twice in topic mode.
    710157 Newbie
    Currently Being Moderated
    A shot in the dark, but you are committing after the dequeue, right?
  • 2. Re: dequeue same message twice in topic mode.
    921593 Newbie
    Currently Being Moderated
    I have the same problem, did you get any solution for this???

    Thanks in advance!!!
  • 3. Re: dequeue same message twice in topic mode.
    732544 Newbie
    Currently Being Moderated
    This issue looks like Bug: 13729601

    (from http://www.oracle.com/technetwork/middleware/docs/aiasoarelnotesps5-1455925.html)

    The dequeuer returns the same message in multiple threads in high concurrency environments when Oracle database 11.2 is used. This means that some messages are dequeued more than once. For example, in Oracle SOA Suite, if Service 1 suddenly raises a large number of business events that are subscribed to by Service 2, duplicate instances of Service 2 triggered by the same event may be seen in an intermittent fashion. The same behavior is not observed with a 10.2.0.5 database or in an 11.2 database with event10852 level 16384 set to disable the 11.2 dequeue optimizations.

    Workaround: Perform the following steps:

    Log in to the 11.2 database:
    CONNECT /AS SYSDBA

    Specify the following SQL command in SQL*Plus to disable the 11.2 dequeue optimizations:
    SQL> alter system set event='10852 trace name context forever,
    level 16384'scope=spfile;

    Restart the database.

    Edited by: Maarten Smeets on Jan 28, 2013 2:10 PM

Legend

  • Correct Answers - 10 points
  • Helpful Answers - 5 points