2 Replies Latest reply: Jul 2, 2014 11:32 PM by Renu-Oracle RSS

    AQ notification callback procedure Dequeue messages problem

    2b7f791a-82f0-4e41-b385-bccad37e7500

      Hi All,

      I am using OCCI AQ and I have problem with callback procedure. All is fine, I am receiving notification but I can not get Message object from NotifyResult.

      My code is crashing when I trying to get Message

      Message message = nr->getMessage();

       

      Please let me know what I am doing wrong.

      Thanks.

       

      my PL/SQL

      BEGIN

         DBMS_AQADM.CREATE_QUEUE_TABLE(

            queue_table            => 'MyServer.obj_qtab',

            queue_payload_type     => 'SYS.ANYDATA',

            multiple_consumers     =>  true);

      END;

       

      BEGIN

         DBMS_AQADM.CREATE_QUEUE(

            queue_name         =>  'MyServer.obj_queue',

            queue_table        =>  'MyServer.obj_qtab');

      END; 

       

      BEGIN 

        dbms_aqadm.add_subscriber(

                 queue_name   =>    'MyServer.obj_queue', 

                 subscribe      r=>    sys.aq$_agent('MyServer','MyServer.obj_queue', null)); 

      END;

       

      My c++

        bool  SubscribeNotifications()

        {

        std::vector <Subscription> subs;

        string host = "host";

        int port = 1521;

        string serviceName ="ORCL";

        string tnsString = "(DESCRIPTION =(ADDRESS_LIST =(ADDRESS = (PROTOCOL = TCP)(HOST =" + host + ")(PORT = " + to_string(port) + ")))(CONNECT_DATA =(SERVICE_NAME = " + serviceName + ")(DIAG_ADR_ENABLED=OFF)(DIAG_DDE_ENABLED=FALSE)))";

       

        Environment *env = Environment::createEnvironment (Environment::EVENTS); 

       

        // Following block uses OCI to change the lisneteing agent port number:

        //Allocate error handle.

        OCIError* errhp = NULL;

        OCIHandleAlloc( (dvoid *) env->getOCIEnvironment(), (dvoid **) &errhp, OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0);

       

        //set attribute on environment handle:

        ub4 portNumber = 50011;

        sword status = OCIAttrSet (env->getOCIEnvironment(), OCI_HTYPE_ENV, (dvoid *)&portNumber, sizeof(portNumber), OCI_ATTR_SUBSCR_PORTNO, errhp);

        if (status != OCI_SUCCESS)

        {

            cout << "Failed to set the agent listening port" << endl;

            return false;

        }

       

        cout<<"Demo for advanced queuing enqueue/dequeue"<<endl; 

        Connection *con=env->createConnection("user", "password", tnsString); 

        cout<<"Connected to DB"<<endl; 

        Subscription *sub = new Subscription(env);

        sub->setSubscriptionNamespace(oracle::occi::aq::Subscription::NS_AQ);

        sub->setProtocol(oracle::occi::aq::Subscription::PROTO_CBK);

        sub->setCallbackContext((void *)this);

        sub->setSubscriptionName("MyServer.obj_queue:MyServer");

        sub->setPresentation(oracle::occi::aq::Subscription::PRES_DEFAULT);

        sub->setNotifyCallback(&DBWorker::event_message);

       

        subs.push_back(*sub);

        env->enableSubscription(*sub);

        con->registerSubscriptions(subs);

      return true;

      }

       

        static unsigned int event_message(Subscription &sub, NotifyResult *nr)

        {

        try{

        cout<<"Message Recieved"<<endl;

        cout<<"ConsumerName="<<nr->getConsumerName()<<endl;

       

        Message message = nr->getMessage();

        cout<<"CorrelationId="<< message.getCorrelationId()<<endl;

        cout<<"ConsumerName="<< nr->getConsumerName()<<endl;

        cout<< "QueueName="<<  nr->getQueueName().c_str()<<endl;

        Agent agent = message.getSenderId();

        AnyData any(sub.getEnvironment());

        any = message.getAnyData();

        oracle::occi::TypeCode type = any.getType();

        if(type == OCCI_TYPECODE_VARCHAR2)

        {

        std::string msg = any.getAsString();

        std::cout << "Message: "<< msg << std::endl;

        }

        else

        {

        std::cout << "Invalid Format.." << std::endl;

        }

       

      }catch(SQLException ex)

        {

        cout<<"Exception thrown for updateRow"<<endl;

        cout<<"Error number: "<<  ex.getErrorCode() << endl;

        cout<<ex.getMessage() << endl;

        }

        return 0;

        }