1 Reply Latest reply on May 1, 2012 6:12 PM by 319958

    PL/SQL AQ notification works in XE but not in our Enterprise Edition

    319958
      Hello,
      I am trying to get a Queue up and working and have ran through several examples, but I cannot get the Notify to work in our development environment but it works just fine in my local XE instance.

      Below is the code that WORKS in XE but does not call the procedure in the Enterprise Edition of Oracle:\

      Please note I may manually DBMS_AQ.DEQUEUE the queued item -- that works just fine, but the intent is the have it notify the procedure (this is still hashing out this technology so keep that in mind during critiquing).

      What is different between XE and Enterprise Edition in these regards or what is it I should check?
      (I got the job_queue_processes increased as noted in the below)
      GRANT EXECUTE ON dbms_aq TO aq;
      GRANT EXECUTE ON dbms_aqadm TO aq;
      GRANT CONNECT, RESOURCE, aq_administrator_role TO aq;
      begin
        dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','aq',FALSE);
        dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','aq',FALSE);
        commit;
      end;
      /
      
      drop procedure demo_queue_callback_procedure;
      BEGIN
              
              DBMS_AQADM.STop_QUEUE ( queue_name => 'aq.streams_queue_1' );
              DBMS_AQADM.DROP_QUEUE ( queue_name => 'aq.streams_queue_1' );
              DBMS_AQADM.DROP_QUEUE_TABLE ( queue_table => 'aq.streams_queue_1_table' 
                                           ,FORCE       =>TRUE );
      END;
      /
      BEGIN
              DBMS_AQADM.CREATE_QUEUE_TABLE ( 
                        queue_table        => 'aq.streams_queue_1_table'
                       ,queue_payload_type => 'aq.APP_INFO' 
                       ,multiple_consumers => TRUE );
          
             DBMS_AQADM.CREATE_QUEUE (
                queue_name  => 'aq.streams_queue_1',
                queue_table => 'aq.streams_queue_1_table'
                );
          
             DBMS_AQADM.START_QUEUE (
               queue_name => 'aq.streams_queue_1'
               );
         
         END;
      /
      
      
      CREATE TABLE aq.queue_message_table
         ( message VARCHAR2(4000)  , ins_date date default sysdate);
      
      
       CREATE or replace PROCEDURE aq.demo_queue_callback_procedure(
                           context  RAW,
                           reginfo  SYS.AQ$_REG_INFO,
                           descr    SYS.AQ$_DESCRIPTOR,
                           payload  RAW,
                           payloadl NUMBER
                           ) AS
          
            r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
            r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
            v_message_handle     RAW(16);
            o_payload            aq.APP_INFO;
         
         BEGIN
         
            r_dequeue_options.msgid := descr.msg_id;
            r_dequeue_options.consumer_name := descr.consumer_name;
         
            DBMS_AQ.DEQUEUE(
               queue_name         => descr.queue_name,
               dequeue_options    => r_dequeue_options,
               message_properties => r_message_properties,
               payload            => o_payload,
               msgid              => v_message_handle
               );
         
            INSERT INTO aq.queue_message_table ( message )
            VALUES ( '[' || o_payload.parameter || ';' || o_payload.value || ';' || to_char(o_payload.dte,'ddMONyy:hh:mi:ss') ||']' );
            COMMIT;
         
         END;
      /
      
      BEGIN
          
             DBMS_AQADM.ADD_SUBSCRIBER (
                queue_name => 'aq.streams_queue_1',
                subscriber => SYS.AQ$_AGENT(
                                 'aq_queue_subscriber',
                                 NULL,
                                 NULL )
                );
         
             DBMS_AQ.REGISTER (
                SYS.AQ$_REG_INFO_LIST(
                   SYS.AQ$_REG_INFO(
                      'streams_queue_1:aq_queue_subscriber',
                      DBMS_AQ.NAMESPACE_AQ,
                      'plsql://aq.demo_queue_callback_procedure',
                      HEXTORAW('FF')
                      )
                   ),
                1
                );
         END;
         /
         
         
       DECLARE
          
             r_enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
             r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
             v_message_handle     RAW(16);
             o_payload            aq.app_info;
             vreclst DBMS_AQ.AQ$_RECIPIENT_LIST_T;
          
          BEGIN
          
            o_payload := aq.app_info('msg4',4,sysdate);
           --vreclst(1):= sys.aq$_agent('ANYONE','', null);
            --r_message_properties.recipient_list:= vreclst;
          
            DBMS_AQ.ENQUEUE(
               queue_name         => 'aq.streams_queue_1',
               enqueue_options    => r_enqueue_options,
               message_properties => r_message_properties,
               payload            => o_payload,
               msgid              => v_message_handle
               );
         
           COMMIT;
         
         END;
      /
         
      select * from aq.queue_message_table   ;
      
      SELECT *
          FROM   aq.aq$streams_queue_1_table;
          
          
      set serveroutput on    
          DECLARE
          
             r_dequeue_options    DBMS_AQ.DEQUEUE_OPTIONS_T;
             r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
             v_message_handle     RAW(16);
             o_payload            aq.app_info;
          
          BEGIN
          
            --r_dequeue_options.dequeue_mode := DBMS_AQ.BROWSE;
                r_dequeue_options.consumer_name := 'aq_QUEUE_SUBSCRIBER';
            DBMS_AQ.DEQUEUE(
               queue_name         => 'aq.streams_queue_1',
               dequeue_options    => r_dequeue_options,
               message_properties => r_message_properties,
               payload            => o_payload,
               msgid              => v_message_handle
               );
         
            DBMS_OUTPUT.PUT_LINE(
               '*** message is [' || o_payload.parameter || ';' || o_payload.value || '] ***'
               );
         commit;
         END;
      /    
          
          select * from dba_queue_schedules ;
          
          select * from dba_objects;
       SELECT *
         FROM v$parameter
        WHERE name = 'aq_tm_processes'
           OR name = 'job_queue_processes';
      job_queue_processes = 10;
      aq_tm_processes = 0;
      
      
      
      SELECT *
         FROM v$parameter
        WHERE name = 'aq_tm_processes'
           OR name = 'job_queue_processes';
      in XE
      aq_tm_processes     => 0
      job_queue_processes => 10
      
      Ora Type:
      Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production     
      PL/SQL Release 11.2.0.1.0 - Production                                           
      CORE     11.2.0.1.0     Production                                                         
      TNS for IBM/AIX RISC System/6000: Version 11.2.0.1.0 - Production                
      NLSRTL Version 11.2.0.1.0 - Production   
      
      
      in XE
      Oracle Database 11g Express Edition Release 11.2.0.2.0 - Production              
      PL/SQL Release 11.2.0.2.0 - Production                                           
      CORE     11.2.0.2.0     Production                                                         
      TNS for 32-bit Windows: Version 11.2.0.2.0 - Production                          
      NLSRTL Version 11.2.0.2.0 - Production 
      the
      job_queue_processes => 4