7 Replies Latest reply on Jul 4, 2018 1:29 PM by 838673

    Oracle AQ dequeue is not asynchronous

    838673

      I have 2 schemas on the same instance. Both have their own queue configured with tables, an enqueue and dequeue procedure. All is working functionally.

      In one schema (schema 1), the dequeue procedure takes about a second to complete. In the other (schema 2), the dequeue  procedure takes about 2 minutes to complete.

      My understanding was that Oracle AQ was asynchronous. I have noticed that when the longer procedure is being processed, even when the procedures in the other queue have been dequeued, they need to wait for the longer procedure to complete processing before they are processed. I thought AQ processes were asynchronous?

      I've looked into this in more details from the queue tables.

      If the longer process from schema 2 is dequeued first, it is removed from it's local queue table, but can be seen in the sys.AQ_SRVNTFN_TABLE_1. If I then run 3 enqueue processes in schema 1, when they are dequeued, the 3 rows remain in the local queue able, and are also copied in to the sys.AQ_SRVNTFN_TABLE_1 table. Once the longer job has finished, the 3 rows in schema 1  are removed from their queue table, the dequeue procedure is processed, and all 4 rows disappear from the sys.AQ_SRVNTFN_TABLE_1 table.

      Is there something I need to setup to ensure both queues can run asynchronously?

      Thanks for any help offered.

        • 1. Re: Oracle AQ dequeue is not asynchronous
          WGabriel

          Hello,

           

          first of all you should also tell something about your enque and dequeue options for both AQs.

           

          e.g. 
              dequeue_options.wait        := DBMS_AQ.NO_WAIT; 
              dequeue_options.visibility  := DBMS_AQ.ON_COMMIT; 
          

           

          Kind regards,

          WGabriel

          • 2. Re: Oracle AQ dequeue is not asynchronous
            838673

            OK: reading around this, I think you have already identified the issue, but I'm just trying to figure out what settings I need.

            So my objective is to have all queues running asynchronously and immediate, with no waits and no dependencies on either jobs in the queue, or jobs in others.

             

            In my example above, schema 1 enqueue and dequeue looks like this:

            enqueue:

            o_payload := aq_ps_sns_type(

              sysdate

            ,v_com_id

            ,null

            ,p_user_id

            ,p_message

            ,p_arn

            ,p_publish_as_json

            ,TO_CHAR(SYSTIMESTAMP, 'DD-MON-YYYY HH24:MI:SS.FF3' ));

             

            DBMS_AQ.ENQUEUE(

            queue_name => 'aq_ps_sns',

            enqueue_options => r_enqueue_options,

            message_properties => r_message_properties,

            payload => o_payload,

            msgid => v_message_handle

            );

             

            COMMIT;

             

            dequeue:

            r_dequeue_options.msgid := descr.msg_id;

            r_dequeue_options.consumer_name := descr.consumer_name;

             

            DBMS_AQ.DEQUEUE(

            queue_name => descr.queue_name,

            dequeue_options => r_dequeue_options,

            message_properties => r_message_properties,

            payload => o_payload,

            msgid => v_message_handle

            );

             

            for the longer job in schema 2:

            enqueue:

            o_payload := aq_campaign_emails_type(

            null

            ,null

            ,null

            ,null

            ,null

            ,null

            ,null

            ,TO_CHAR(SYSTIMESTAMP, 'DD-MON-YYYY HH24:MI:SS.FF3' )

            ,p_id

            ,null);

             

             

            DBMS_AQ.ENQUEUE(

            queue_name => 'PLSQL_OBJECTS_V3.aq_campaign_emails',

            enqueue_options => r_enqueue_options,

            message_properties => r_message_properties,

            payload => o_payload,

            msgid => v_message_handle

            );

            COMMIT;

             

            dequeue:

            r_dequeue_options.msgid         := descr.msg_id;

            r_dequeue_options.consumer_name := descr.consumer_name;

            r_dequeue_options.wait          := DBMS_AQ.NO_WAIT;

             

             

            DBMS_AQ.DEQUEUE(

            queue_name => descr.queue_name,

            dequeue_options => r_dequeue_options,

            message_properties => r_message_properties,

            payload => o_payload,

            msgid => v_message_handle

            );

            commit;

             

            Thanks for the help!

            • 3. Re: Oracle AQ dequeue is not asynchronous
              838673

              Adding to my reply above, I am guessing to get all jobs in the queues running asynchronously, all the dequeue procedures would need:

               

              r_dequeue_options.wait        := DBMS_AQ.NO_WAIT;  

              r_dequeue_options.visibility  := DBMS_AQ.IMMEDIATE;

               

              and the enqueue procedures would need:

               

              r_enqueue_options.visibility  := DBMS_AQ.IMMEDIATE;

              • 4. Re: Oracle AQ dequeue is not asynchronous
                WGabriel

                Hello,

                 

                it is not clear how your two schemas are correlated via the AQs. How did you setup your dequeue process?

                Do you use a separate dequeue process for each schema?

                 

                Kind regards,

                WGabriel

                • 5. Re: Oracle AQ dequeue is not asynchronous
                  838673

                  Thanks for the help with this.

                  So both schemas are within the same Oracle instance, (and can both see each other if that is relevant). So they share the same sys account, which I think is where jobs are begin bottle necked in the sys.AQ_SRVNTFN_TABLE_1 table.

                  Both schemas have their own queue, queue table, subscriber and enqueue and dequeue processes, and work independently of each other from a functional stand point. I've altered the options on each of the dequeue procedures as outlined below, but because it appears all the jobs from both queues are being processed by the same sys account in a synchronous manner, there is a long wait for one queue while it waits for jobs to be processed by the other queue. My understanding is that queues can run asynchronously, but I can't get this to work.

                  Here are my two current dequeue procedures, running in different schemas, but on the same instance:

                   

                  dequeue in schema 1 (short job)

                   

                  create or replace PROCEDURE aq_ps_sns_dequeue(

                  context RAW,

                  reginfo SYS.AQ$_REG_INFO,

                  descr SYS.AQ$_DESCRIPTOR,

                  payload RAW,

                  payloadl NUMBER

                  ) AS

                   

                  r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;

                  r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;

                  v_message_handle RAW(16);

                  o_payload aq_ps_sns_type;

                   

                  rec_info environment_info%rowtype;

                   

                    --buzz a handset if native

                    v_destination_buzz  varchar2(2000);

                    v_buzz              clob;

                   

                  BEGIN

                   

                  rec_info := GETENVINFO;

                   

                  r_dequeue_options.msgid := descr.msg_id;

                  r_dequeue_options.consumer_name := descr.consumer_name;

                    r_dequeue_options.wait        := DBMS_AQ.NO_WAIT;  

                  r_dequeue_options.visibility  := DBMS_AQ.IMMEDIATE;  

                   

                  DBMS_AQ.DEQUEUE(

                  queue_name => descr.queue_name,

                  dequeue_options => r_dequeue_options,

                  message_properties => r_message_properties,

                  payload => o_payload,

                  msgid => v_message_handle

                  );

                   

                   

                  INSERT INTO data_objects_sbc.log_ps_sns_stats ( message )

                  VALUES ( 'Log [' || o_payload.p_notes || '] ' ||

                  o_payload.p_start||'/'||o_payload.p_com_id||'/'||o_payload.p_store_id||'/'||

                  o_payload.p_staff_id||'/'||o_payload.p_payload||'/'||o_payload.p_token||'/'||

                  o_payload.p_debug||'/'||o_payload.p_notes||

                  ' dequeued at [' || TO_CHAR( SYSTIMESTAMP,

                  'DD-MON-YYYY HH24:MI:SS.FF3' ) || ']' );

                  COMMIT;

                   

                  v_destination_buzz := rec_info.gv_helpdesk_url||'/REST/redleaf.publish_to_sns_endpoint?p_message='||o_payload.p_payload||

                  '&p_targetarn='||o_payload.p_token||'&p_publish_as_json='||o_payload.p_debug;

                  v_buzz := remote_http_pck.get_http_ssl_clob(p_url => v_destination_buzz);

                   

                  INSERT INTO data_objects_sbc.log_ps_sns_stats ( message )

                  VALUES (v_destination_buzz||'/'||v_buzz||

                  ' sent at [' || TO_CHAR( SYSTIMESTAMP,

                  'DD-MON-YYYY HH24:MI:SS.FF3' ) || ']' );

                  COMMIT;

                   

                  exception

                  when others then

                   

                  INSERT INTO data_objects_sbc.log_ps_sns_stats ( message )

                  VALUES ( 'Log [error] ' ||

                  'dequeued at [' || TO_CHAR( SYSTIMESTAMP,

                  'DD-MON-YYYY HH24:MI:SS.FF3' ) || ']' );

                  commit;

                   

                  END;

                   

                  dequeue in schema 2 (long job)

                   

                  create or replace PROCEDURE plsql_objects_v3.aq_campaign_emails_dequeue(

                  context RAW,

                  reginfo SYS.AQ$_REG_INFO,

                  descr SYS.AQ$_DESCRIPTOR,

                  payload RAW,

                  payloadl NUMBER

                  ) AS

                  r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;

                  r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;

                  v_message_handle RAW(16);

                  o_payload aq_campaign_emails_type;

                   

                   

                  cursor c_email_clob (cp_id in number) is

                  select *

                  from data_objects_sbc.campaign_clobs

                  where id = cp_id;

                  vc_email_clob c_email_clob%rowtype;

                   

                   

                  BEGIN

                  r_dequeue_options.msgid         := descr.msg_id;

                  r_dequeue_options.consumer_name := descr.consumer_name;

                  r_dequeue_options.wait          := DBMS_AQ.NO_WAIT;  

                  r_dequeue_options.visibility    := DBMS_AQ.IMMEDIATE;

                   

                  DBMS_AQ.DEQUEUE(

                  queue_name => descr.queue_name,

                  dequeue_options => r_dequeue_options,

                  message_properties => r_message_properties,

                  payload => o_payload,

                  msgid => v_message_handle

                  );

                  commit;

                   

                  open c_email_clob(o_payload.p_attribtue1);

                  fetch c_email_clob into vc_email_clob;

                  close c_email_clob;

                   

                  if vc_email_clob.email =  2 then

                  MARKET_TEMPLATE_PCK.EMAIL_CAMPAIGN(p_template_id      =>  nvl(vc_email_clob.name,vc_email_clob.query_id)

                                                    ,p_location_id      =>  GET_LOC_ID_FROM_STORE_ID(p_store_id => vc_email_clob.hastorenos ,p_com_id => vc_email_clob.com_id)

                                                    ,p_force_send       =>  1

                                                    ,p_xml              =>  vc_email_clob.csv_clob

                                                    ,p_com_id           =>  vc_email_clob.com_id

                                                    ,p_bee_id           =>  vc_email_clob.cus_camapign_id

                                                    );

                  end if;

                   

                  INSERT INTO data_objects_sbc.log_campaign_emails_stats ( message )

                  VALUES ( 'Log (committed) [' || o_payload.p_notes || '] ' ||

                  'dequeued at [' || TO_CHAR( SYSTIMESTAMP,

                  'DD-MON-YYYY HH24:MI:SS.FF3' ) || '] for id: '||o_payload.p_attribtue1 ||'['||nvl(vc_email_clob.name,vc_email_clob.query_id)||']');

                  COMMIT;

                   

                  exception

                  when others then

                  INSERT INTO data_objects_sbc.log_campaign_emails_stats ( message )

                  VALUES ( 'Log [error] ' ||

                  'dequeued at [' || TO_CHAR( SYSTIMESTAMP,

                  'DD-MON-YYYY HH24:MI:SS.FF3' ) || ']'||dbms_utility.format_error_stack||'/'||dbms_utility.format_error_backtrace );

                  commit;

                   

                  END;

                  • 6. Re: Oracle AQ dequeue is not asynchronous
                    WGabriel

                    Hello,

                     

                    please add also the enqueue options for both AQs (supposely, these are identical to your dequeue options).

                    Your transaction handling should be optimized, but more on that later.

                     

                    Kind regards,

                    WGabriel

                    • 7. Re: Oracle AQ dequeue is not asynchronous
                      838673

                      enqueue in schema 1 (short job)

                       

                       

                      create or replace PROCEDURE aq_ps_sns_enqueue

                      (p_com_token          in varchar2 default null

                      ,p_com_id             in number default null

                      ,p_user_id            in number default null

                      ,p_arn                in varchar2 default null

                      ,p_message            in varchar2 default null

                      ,p_publish_as_json    in varchar2 default null

                      )

                      is

                      r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;

                      r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;

                      v_message_handle RAW(16);

                      o_payload aq_ps_sns_type;

                       

                        cursor c_com(cp_token in varchar2) is

                        select id

                        from data_objects_sbc.companies

                        where com_token = cp_token;

                        v_com_id number;

                       

                      BEGIN

                       

                       

                        open c_com(p_com_token);

                        fetch c_com into v_com_id;

                        close c_com;

                       

                      o_payload := aq_ps_sns_type(

                        sysdate

                      ,v_com_id

                      ,null

                      ,p_user_id

                      ,p_message

                      ,p_arn

                      ,p_publish_as_json

                      ,TO_CHAR(SYSTIMESTAMP, 'DD-MON-YYYY HH24:MI:SS.FF3' ));

                       

                      r_enqueue_options.visibility  := DBMS_AQ.IMMEDIATE;

                       

                      DBMS_AQ.ENQUEUE(

                      queue_name => 'aq_ps_sns',

                      enqueue_options => r_enqueue_options,

                      message_properties => r_message_properties,

                      payload => o_payload,

                      msgid => v_message_handle

                      );

                       

                      COMMIT;

                       

                      --htp.p('completed');

                       

                      EXCEPTION

                      WHEN OTHERS THEN

                      htp.p('aq_ps_sns_enqueue error:');

                      htp.p(dbms_utility.format_error_stack);

                      htp.p(dbms_utility.format_error_backtrace);

                       

                      END aq_ps_sns_enqueue;

                       

                      enqueue in schema 2 (long job)

                       

                       

                      create or replace PROCEDURE plsql_objects_v3.aq_campaign_emails_enqueue

                      (p_id       in number default null)

                      is

                      r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;

                      r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;

                      v_message_handle RAW(16);

                      o_payload aq_campaign_emails_type;

                      BEGIN

                       

                      o_payload := aq_campaign_emails_type(

                      null

                      ,null

                      ,null

                      ,null

                      ,null

                      ,null

                      ,null

                      ,TO_CHAR(SYSTIMESTAMP, 'DD-MON-YYYY HH24:MI:SS.FF3' )

                      ,p_id

                      ,null);

                       

                      r_enqueue_options.visibility  := DBMS_AQ.IMMEDIATE;

                       

                      DBMS_AQ.ENQUEUE(

                      queue_name => 'PLSQL_OBJECTS_V3.aq_campaign_emails',

                      enqueue_options => r_enqueue_options,

                      message_properties => r_message_properties,

                      payload => o_payload,

                      msgid => v_message_handle

                      );

                      COMMIT;

                       

                      --htp.p('completed');

                      EXCEPTION

                      WHEN OTHERS THEN

                      htp.p('aq_campaign_emails_enqueue error:');

                      htp.p(dbms_utility.format_error_stack);

                      htp.p(dbms_utility.format_error_backtrace);

                      END aq_campaign_emails_enqueue;