7 Replies Latest reply on Nov 7, 2013 11:22 AM by steph0h

    DBMS_AQ.REGISTER() only for multiple consumers?

    steph0h

      Hello,

       

      My system is Oracle database 11G.

      What I'm trying to achieve is registering a procedure for dequeueing messages from a single consumer queue.

      I found multiple examples on the internet, but these examples all seem to work on queue tables defined for multiple consumers.

       

      I can register my callback procedure like this:

       

      declare

        reginfo1 sys.aq$_reg_info;

        reginfolist sys.aq$_reg_info_list;

      begin

        reginfo1:=sys.aq$_reg_info('PI.STEPTEST_QUEUE'

                                  ,1

                                  ,'plsql://PI.STEP_DEQUEUE'

                                  ,HEXTORAW('FF')

                                  );

        reginfolist:=sys.aq$_reg_info_list(reginfo1);

        dbms_aq.register(reginfolist,1);

      end;

      /


      But unfortunately the messages won't get dequeued by the callback procedure.

       

      When I register the callback like this I get an error:

       

      declare

        reginfo1 sys.aq$_reg_info;

        reginfolist sys.aq$_reg_info_list;

      begin

        reginfo1:=sys.aq$_reg_info('PI.STEPTEST_QUEUE:ADMIN'

                                  ,1

                                  ,'plsql://PI.STEP_DEQUEUE'

                                  ,HEXTORAW('FF')

                                  );

        reginfolist:=sys.aq$_reg_info_list(reginfo1);

        dbms_aq.register(reginfolist,1);

      end;

      /

       

      The error is:

       

      ORA-25256: Empfänger kann nicht mit Single-Consumer- oder Exception-Queue angegeben werden

      ORA-06512: in "SYS.DBMS_AQ", Zeile 737

      ORA-06512: in Zeile 11

      25256. 00000 -  "consumer cannot be specified with a single-consumer queue or an exception queue "

      *Cause:    An attempt was made to specify a consumer in the subscription

                 string when registering for notification on a single-consumer

                 queue or an exception queue.

      *Action:   Do not specify the consumer in the subscription string.

       

      ... which gives me the impression, that it should also work for single conumer queue tables.

       

      So: Does registering callbacks indeed only work for multiple consumer queue tables?

       

      Thanks,

      Stephan

        • 1. Re: DBMS_AQ.REGISTER() only for multiple consumers?
          Hugo_4711

          Hi Stephan,

           

          registering a callback procedure works for single consumer queues.

          Take a look at: https://forums.oracle.com/thread/2594159

           

          Check your queue definition and your procedure. The first way you used to register your callback looks ok, so maybe the problem lies in

          your procedure.

          Hope that helps

           

          Just seen it - you didn't commit in your registration.

          1 person found this helpful
          • 2. Re: DBMS_AQ.REGISTER() only for multiple consumers?
            WGabriel

            Hello,

             

            you got the error in the second code part, because you have specified a consumer name (":ADMIN"), which

            is not allowed. It is not possible to give you a further hint because you haven't neither shown your AQ setup nor the PL/SQL callback, unfortunately.

             

            Kind regards,

            WoG 

            • 3. Re: DBMS_AQ.REGISTER() only for multiple consumers?
              steph0h

              Thanks for the hint!

               

              Below is the complete example, which still does not work: I can enqueue data, I can manually dequeue data with the callback-procedure, but messages won't get dequeued automatically, although the procedure is registered for the queue. What am I missing?

               

              Thanks,

              Stephan

              -----------------

               

              -- testqueue -------------------------------------------------------------------

              create type step_type as object (x number);

              exec DBMS_AQADM.STop_QUEUE('STEPTEST_QUEUE');

              exec DBMS_AQADM.drop_QUEUE('STEPTEST_QUEUE');

              exec dbms_aqadm.drop_queue_table('PI.STEPTEST_QUETAB');

              exec dbms_aqadm.create_queue_table('PI.STEPTEST_QUETAB','STEP_TYPE');

              begin
              dbms_aqadm.create_queue(
              queue_name => 'STEPTEST_QUEUE'
              , queue_table => 'PI.STEPTEST_QUETAB'
              , queue_type => DBMS_AQADM.NORMAL_QUEUE
              , max_retries => 0
              , retry_delay => 0
              , retention_time => 1800
              , dependency_tracking => FALSE
              , comment => 'STEPTEST'
              );
              DBMS_AQADM.START_QUEUE('STEPTEST_QUEUE');
              end;
              /

              create table step_dequeued_vals(x number,t timestamp);

              create or replace procedure step_dequeue is
                l_payload step_type;
                dequeue_options dbms_aq.dequeue_options_t;
                message_properties dbms_aq.message_properties_t;
                message_handle RAW(16);
              begin
                DBMS_AQ.deQUEUE (
                queue_name => 'STEPTEST_QUEUE'
                , dequeue_options => dequeue_options
                , message_properties => message_properties
                , payload => l_payload
                , msgid => message_handle
                );
                insert into step_dequeued_vals (x,t )values(l_payload.x,current_timestamp);
                commit;
              end;
              /


              -- register --------------------------------------------------------------------

              declare
                reginfo1 sys.aq$_reg_info;
                reginfolist sys.aq$_reg_info_list;
              begin
                reginfo1:=sys.aq$_reg_info('PI.STEPTEST_QUEUE'
                                          ,DBMS_AQ.NAMESPACE_AQ
                                          ,'plsql://PI.STEP_DEQUEUE?PR=0'
                                          ,HEXTORAW('FF')
                                          );
                reginfolist:=sys.aq$_reg_info_list(reginfo1);
                dbms_aq.register(reginfolist,1);
              --  dbms_aq.unregister(reginfolist,1);
                commit;
              end;
              /

              -- enqueue ---------------------------------------------------------------------
              declare
                l_payload step_type;
                enqueue_options dbms_aq.enqueue_options_t;
                message_properties dbms_aq.message_properties_t;
                message_handle RAW(16);
              begin
                l_payload := step_type(42);
                DBMS_AQ.ENQUEUE (
                queue_name => 'STEPTEST_QUEUE'
                , enqueue_options => enqueue_options
                , message_properties => message_properties
                , payload => l_payload
                , msgid => message_handle
                );
                dbms_output.put_line(message_handle);
                commit;
              end;
              /

              select * from AQ$STEPTEST_QUETAB;

              select * from step_dequeued_vals;

              • 4. Re: DBMS_AQ.REGISTER() only for multiple consumers?
                Hugo_4711

                Hi Stephan,

                 

                the parameters of your callback should be:

                 

                  PROCEDURE step_dequeue

                  (context  RAW

                  ,reginfo  sys.AQ$_reg_info

                  ,descr    sys.AQ$_descriptor

                  ,payload  VARCHAR2    -- this could also be datatype RAW

                  ,payloadl NUMBER

                  )

                 

                Otherwise it wouldn't work.

                 

                And not to forget:

                 

                Before you dequeue you need the following lines:

                 

                    dequeue_options.msgid                := descr.msg_id;
                    dequeue_options.consumer_name := descr.consumer_name;

                • 5. Re: DBMS_AQ.REGISTER() only for multiple consumers?
                  steph0h


                  Hi Hugo,

                   

                  Thanks for your correctiosn - I applied tem, but still no luck. I even tried an example given in https://forums.oracle.com/thread/52051?start=0&tstart=0

                  I think I'll give up on this and use DBMS_AQ.LISTEN instead. Although I would have preferred using REGISTER.

                   

                  regards,

                  Stephan

                  • 6. Re: DBMS_AQ.REGISTER() only for multiple consumers?
                    Hugo_4711

                    Hi Stephan,

                     

                    don't give up ...

                     

                    Is your database an Enterprise Edition?

                     

                    I built the following example for you, tested it - and it works on an 11.2.0.2 Enterprise Edition.

                     

                    Create  Type hugo4711_payload as object

                    ( text_2          varchar2(4000));

                    /

                     

                    CREATE TABLE hugo4711_log (text1 VARCHAR2(4000));

                    /

                    --

                    BEGIN

                      DBMS_AQADM.CREATE_QUEUE_TABLE(

                         Queue_table        => 'TESTUSER.HUGO4711_TAB'

                        ,Queue_payload_type => 'HUGO4711_PAYLOAD'

                        ,multiple_consumers => FALSE

                        ,Sort_list          => 'ENQ_TIME'

                        ,COMMENT            => 'Testqueue');

                    END;

                    /

                    --

                    -- Queue anlegen

                    --

                    BEGIN

                      DBMS_AQADM.CREATE_QUEUE(

                         Queue_name          => 'TESTUSER.HUGO4711_QUEUE',

                         Queue_table         => 'TESTUSER.HUGO4711_TAB',

                         Queue_type          => dbms_aqadm.NORMAL_QUEUE);

                    END;

                    /

                     

                    --

                    BEGIN

                    dbms_aqadm.start_queue( queue_name => 'TESTUSER.HUGO4711_QUEUE' );

                    END;

                    /

                     

                    CREATE OR REPLACE PROCEDURE

                       hugo4711_handle_message

                       (

                          context           RAW,

                          reginfo           sys.aq$_reg_info,

                          descr             sys.aq$_descriptor,

                          payload           VARCHAR2,

                          payloadl          NUMBER

                       )

                    AS

                       dequeue_options   DBMS_AQ.dequeue_options_t;

                       enqueue_options   DBMS_AQ.enqueue_options_t;

                       message_prop      DBMS_AQ.message_properties_t;

                       message_hdl       raw(16);

                       v_payload         hugo4711_payload;

                    BEGIN

                       dequeue_options.msgid := descr.msg_id;

                       dequeue_options.consumer_name := descr.consumer_name;

                     

                       dbms_aq.dequeue(descr.queue_name, dequeue_options, message_prop, v_payload, message_hdl);

                     

                       INSERT INTO hugo4711_log (text1) VALUES (v_payload.text_2);

                     

                       COMMIT;

                    END;

                     

                     

                    DECLARE

                       reginfo1 sys.aq$_reg_info;

                       reginfolist sys.aq$_reg_info_list;

                    BEGIN

                       reginfo1 := sys.aq$_reg_info('TESTUSER.HUGO4711_QUEUE', DBMS_AQ.NAMESPACE_AQ, 'plsql://hugo4711_handle_message?PR=0',HEXTORAW('FF'));

                     

                       reginfolist := sys.aq$_reg_info_list(reginfo1);

                    sys.dbms_aq.register(reginfolist, 1);

                     

                       commit;

                    END;

                    /

                     

                    begin

                    dbms_lock.sleep(10);

                    end;

                    /

                     

                     

                    DECLARE

                        v_payload         hugo4711_payload;

                     

                        enqueue_options dbms_aq.enqueue_options_t;

                        message_properties dbms_aq.message_properties_t;

                        msgid              raw(16);

                     

                    BEGIN

                       v_payload := hugo4711_payload(null);

                     

                       v_payload.text_2 := 'Test Test Test';

                     

                       dbms_aq.enqueue(queue_name => 'TESTUSER.HUGO4711_QUEUE',

                                          enqueue_options => enqueue_options,

                                          message_properties => message_properties,

                                          payload => v_payload,

                                          msgid => msgid);

                       commit;

                    END;

                    /

                     

                    begin

                    dbms_lock.sleep(60);

                    end;

                    /

                     

                    SELECT * from hugo4711_log;

                    /


                    Hope this example works in your environment.


                    • 7. Re: DBMS_AQ.REGISTER() only for multiple consumers?
                      steph0h

                      Found it by help of our DBA. It was some settting of a system parameter that prevents db-jobs from running (this is a developement database). Enabling db-jobs again also made callback-functions working. This seems to be a known bug.