Forum Stats

  • 3,837,785 Users
  • 2,262,300 Discussions
  • 7,900,393 Comments

Discussions

Propagating from a queue

3004
3004 Member Posts: 204,171 Green Ribbon
I need a clarification regarding advanced queuing.The problem is,

I created a queue table 'qt' and created a queue 'q1' for it.
I added subscriber 's1' for queue 'q1'.
Then i created another queue 'q2' and subscribered as 's2' to queue 'q1'.
After scheduling the propagation for queue 'q1', i enqueued a message to 'q1'.

The message is ought to be delivered to subscriber 's1' and to queue q2.

As for i understood, the aq$qt views should show 2 records one for queue 'q1' and other for queue 'q2'.

But i find 2 records for queue 'q1' and no records for 'q2' in the aq$qt view.
And also i am able to dequeue only the one with consumer_name as 's1' and not the other message.

how to dequeue the message ?

below is the script for creation of queue,enqueuing and dequeuing,

--create object type
create type message_type as object (subject VARCHAR2(30),text VARCHAR2(80));

--create queue table qt,creating queues q1,q2 and starting the queues.
begin
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'qt',
multiple_consumers => TRUE,
queue_payload_type => 'message_type',
compatible => '8.1');
end;
/
execute dbms_aqadm.create_queue(queue_name => 'q1', queue_table =>'qt');
/
execute dbms_aqadm.create_queue(queue_name => 'q2', queue_table =>'qt');
/
execute dbms_aqadm.start_queue ( queue_name => 'q1');
/
execute dbms_aqadm.start_queue ( queue_name => 'q2');
/

--adding subcriber s1 to queue q1
declare
s sys.aq$_agent;
begin
s := sys.aq$_agent('s1', null,NULL);
DBMS_AQADM.ADD_SUBSCRIBER (queue_name => 'q1', subscriber => s);
end;
/
--adding q2 as subscriber to queue q1
declare
i sys.aq$_agent;
begin
i := sys.aq$_agent('s2', 'q2',NULL);
DBMS_AQADM.ADD_SUBSCRIBER (queue_name => 'q1', subscriber => i);
end;
/
--scheduling propagation
begin
DBMS_AQADM.SCHEDULE_PROPAGATION (queue_name => 'q1',
destination => null,
start_time => SYSDATE,
duration => NULL,
next_time => NULL,
latency => 10);
end;
/
--enqueuing message into queue q1
declare
enqopt dbms_aq.enqueue_options_t;
msgprop dbms_aq.message_properties_t;
enq_msgid RAW(16);
message message_type;
begin
message := message_type ('M1','message1');
dbms_aq.enqueue(queue_name => 'q1',
enqueue_options => enqopt,
message_properties => msgprop,
payload => message,
msgid => enq_msgid);
commit;
end;
/
--dequeuing message
declare
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message message_type;
no_messages exception;
Pragma exception_init(no_messages,-25228);
begin
dequeue_options.consumer_name := 's1';
dequeue_options.NAVIGATION := 1;
dequeue_options.wait :=dbms_aq.no_wait;
dbms_aq.dequeue(queue_name => 'q1',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
commit;
dbms_output.put_line ('MessageId := ' | | RAWTOHEX(message_handle));
exception
when no_messages then
dbms_output.put_line('no messages');
end;
/

Comments

  • 3004
    3004 Member Posts: 204,171 Green Ribbon
    Make sure that the values of system parameters job_queue_processes and aq_tm_processes are non-zero. Job queue processes are used to propagate messages. AQ_TM_PROCESSES are used to do background AQ jobs.
This discussion has been closed.