Forum Stats

  • 3,853,759 Users
  • 2,264,266 Discussions
  • 7,905,444 Comments

Discussions

Advanced Queuing ORA-32167: No payload set on the Message

2685222
2685222 Member Posts: 3
edited Feb 24, 2016 12:11PM in Advanced Queueing

My application needs to be notified whenever a record is added or updated to a table in the database.

OCCI provides subscription class for asynchronous notification. I implement asynchronous notification with the Subscription class.

Everything is fine. I am using OTT and eventHandler is recieving notification. But when I an trying to get Object from Message,

  I am getting ORA-32167: No payload set on the Message.

Any idea how I can retrieve the payload from my event handler? Please I need help.

What I did it:

GRANT EXECUTE ON dbms_aq TO C##BO_LINGRAND;

GRANT RESOURCE TO C##BO_LINGRAND;

GRANT CONNECT TO C##BO_LINGRAND;

GRANT EXECUTE ANY PROCEDURE TO C##BO_LINGRAND;

GRANT aq_administrator_role TO C##BO_LINGRAND;

GRANT aq_user_role TO C##BO_LINGRAND;

GRANT EXECUTE ON dbms_aqadm TO C##BO_LINGRAND;

GRANT EXECUTE ON dbms_aq TO C##BO_LINGRAND;

GRANT EXECUTE ON dbms_aqin TO C##BO_LINGRAND;

------------------- created type --------------------

CREATE TYPE notification_type AS OBJECT (

record_id          VARCHAR2(80),

subject            VARCHAR2(120),

message            VARCHAR2(120)

);

------------------- created queue table --------------------

BEGIN

dbms_aqadm.create_queue_table (

    queue_table => 'notification_queue_table',

    queue_payload_type => 'notification_type',

    comment => 'multi-consumer',

    multiple_consumers => TRUE,

    compatible => '8.1.0'

  );

END;

------------------- created queue --------------------

BEGIN

dbms_aqadm.create_queue (

queue_name => 'notification_queue',

queue_table=> 'notification_queue_table'

);

END;

------------------- started --------------------

BEGIN

dbms_aqadm.start_queue(queue_name => 'notification_queue');

END;

------------------- created table users --------------------

CREATE TABLE Users (

  user_id number(20),

  email VARCHAR(80) NOT NULL,

  password CHAR(41) NOT NULL

);

------------------- created trigger --------------------

CREATE OR REPLACE TRIGGER users_tablechange_notification

AFTER UPDATE OR INSERT OR DELETE ON USERS

FOR EACH ROW

DECLARE

enqopt dbms_aq.enqueue_options_t;

mprop dbms_aq.message_properties_t;

enq_msgid RAW(16);

rcpt_list dbms_aq.aq$_recipient_list_t;

triger_payload notification_type;

BEGIN

rcpt_list(0) := sys.aq$_agent('RECEIVER_NOTIFICATIONS', null, null);

mprop.recipient_list := rcpt_list;

triger_payload := notification_type('record_ID', 'UPDATED', 'USERS');

dbms_aq.enqueue(

  queue_name => 'C#BO_LINGRAND.notification_queue',

  enqueue_options => enqopt,

  message_properties => mprop,

  payload => triger_payload,

  msgid => enq_msgid);

END;

------------------- AQ_Connection.h --------------------

#pragma once

#include <occi.h>

using namespace oracle::occi;

class AQ_Connection

{

public:

  AQ_Connection();

  ~AQ_Connection();

  bool create_callback();

  static unsigned int eventHandler(oracle::occi::aq::Subscription &sub, oracle::occi::aq::NotifyResult *nr);

private:

Environment *env;

  oracle::occi::aq::Subscription *newSubscription;

  Bytes *payload;

  std::vector<oracle::occi::aq::Subscription> vSubscriptionList;

};

extern Connection* conn;


------------------- AQ_Connection.cpp--------------------

#include "stdafx.h"

#include "AQ_Connection.h"

#include <iostream>

using namespace std;

#include "Notification.h"

#include "RegisterMappings.h"

#include <Windows.h>

Connection* conn;

AQ_Connection::AQ_Connection()

{

}

bool AQ_Connection::create_callback()

{

try{

  env = Environment::createEnvironment(Environment::Mode(Environment::EVENTS | Environment::OBJECT));

  RegisterMappings(env);

  string host = "127.0.0.1";

  int port = 1521;

  string serviceName = "orcl";

  string tnsString = "(DESCRIPTION =(ADDRESS_LIST =(ADDRESS = (PROTOCOL = TCP)(HOST =" + host + ")(PORT = " + to_string(port) + ")))(CONNECT_DATA =(SERVICE_NAME = " + serviceName + ")))";

  conn = env->createConnection("C##BO_LINGRAND", "xxxxxxxxxxxx", tnsString);

  newSubscription = new oracle::occi::aq::Subscription(env);

  newSubscription->setSubscriptionNamespace(oracle::occi::aq::Subscription::NS_AQ);

  newSubscription->setProtocol(oracle::occi::aq::Subscription::PROTO_CBK);

  payload = new Bytes(env);

  newSubscription->setPayload(*payload);

  newSubscription->setCallbackContext((void *)this);

  newSubscription->setSubscriptionName("C##BO_LINGRAND.NOTIFICATION_QUEUE:RECEIVER_NOTIFICATIONS");

  newSubscription->setPresentation(oracle::occi::aq::Subscription::PRES_DEFAULT);

  newSubscription->setNotifyCallback(AQ_Connection::eventHandler);

  vSubscriptionList.push_back(*newSubscription);

  conn->registerSubscriptions(vSubscriptionList);

  }

  catch (oracle::occi::SQLException &e){

  std::cout << e.what();

  }

return true;

}

unsigned int AQ_Connection::eventHandler(oracle::occi::aq::Subscription &sub, oracle::occi::aq::NotifyResult *nr)

{

try{

  oracle::occi::aq::Consumer hConsumer(conn);

  hConsumer.setConsumerName("RECEIVER_NOTIFICATIONS");

  hConsumer.setMessageIdToDequeue(nr->getMessageId());

  hConsumer.setDequeueMode(oracle::occi::aq::Consumer::DEQ_REMOVE);

  hConsumer.setQueueName("C##BO_LINGRAND.NOTIFICATION_QUEUE");

  oracle::occi::aq::Message *pMsg = &hConsumer.receive(oracle::occi::aq::Message::OBJECT, "NOTIFICATION_TYPE", "C##BO_LINGRAND");

  cout << "+++++++++++++++++++++++++++Message++++++++++++++++++++++++" << endl;

  if (pMsg)

  {

  Notification *pObj = (Notification*)pMsg->getObject();

  cout << "+++++++++++++++++++++++++++Notification++++++++++++++++++++++++" << endl;

  }

  }

  catch (oracle::occi::SQLException &e){

  std::cout << e.what();

  }

  catch (...){

  std::cout << GetLastError();

  }

return 1;

}

AQ_Connection::~AQ_Connection()

{

}

I used OTT to generate Notification

------------------- generated by OTT Notification.h --------------------

#ifndef NOTIFICATION_ORACLE

# define NOTIFICATION_ORACLE

#ifndef OCCI_ORACLE

# include <occi.h>

#endif

class Notification;

/************************************************************/

//  generated declarations for the NOTIFICATION_TYPE object type.

/************************************************************/

class Notification : public oracle::occi::PObject {

protected:

   OCCI_STD_NAMESPACE::string RECIRDID;

   OCCI_STD_NAMESPACE::string SUBJECT;

   OCCI_STD_NAMESPACE::string MESSAGE;

public:

   void *operator new(size_t size);

   void *operator new(size_t size, const oracle::occi::Connection * sess, const OCCI_STD_NAMESPACE::string& table);

   void *operator new(size_t, void *ctxOCCI_);

   void *operator new(size_t size, const oracle::occi::Connection *sess,

      const OCCI_STD_NAMESPACE::string &tableName,

      const OCCI_STD_NAMESPACE::string &typeName,

      const OCCI_STD_NAMESPACE::string &tableSchema,

      const OCCI_STD_NAMESPACE::string &typeSchema);

   OCCI_STD_NAMESPACE::string getSQLTypeName() const;

   void getSQLTypeName(oracle::occi::Environment *env, void **schemaName, unsigned int &schemaNameLen, void **typeName, unsigned int &typeNameLen) const;

   Notification();

   Notification(void *ctxOCCI_);

   static void *readSQL(void *ctxOCCI_);

   virtual void readSQL(oracle::occi::AnyData& streamOCCI_);

   static void writeSQL(void *objOCCI_, void *ctxOCCI_);

   virtual void writeSQL(oracle::occi::AnyData& streamOCCI_);

   ~Notification();

};

#endif

------------------- generated by OTT Notification.cpp --------------------

#include "stdafx.h"

#ifndef NOTIFICATION_ORACLE

# include "Notification.h"

#endif

/*****************************************************************/

//  generated method implementations for the NOTIFICATION_TYPE object type.

/*****************************************************************/

void *Notification::operator new(size_t size)

{

  return oracle::occi::PObject::operator new(size);

}

void *Notification::operator new(size_t size, const oracle::occi::Connection * sess, const OCCI_STD_NAMESPACE::string& table)

{

  return oracle::occi::PObject::operator new(size, sess, table, (char *) "C##BO_LINGRAND.NOTIFICATION_TYPE");

}

void *Notification::operator new(size_t size, void *ctxOCCI_)

{

return oracle::occi::PObject::operator new(size, ctxOCCI_);

}

void *Notification::operator new(size_t size,

    const oracle::occi::Connection *sess,

    const OCCI_STD_NAMESPACE::string &tableName,

    const OCCI_STD_NAMESPACE::string &typeName,

    const OCCI_STD_NAMESPACE::string &tableSchema,

    const OCCI_STD_NAMESPACE::string &typeSchema)

{

  return oracle::occi::PObject::operator new(size, sess, tableName,

        typeName, tableSchema, typeSchema);

}

OCCI_STD_NAMESPACE::string Notification::getSQLTypeName() const

{

  return OCCI_STD_NAMESPACE::string("C##BO_LINGRAND.NOTIFICATION_TYPE");

}

void Notification::getSQLTypeName(oracle::occi::Environment *env, void **schemaName,

    unsigned int &schemaNameLen, void **typeName, unsigned int &typeNameLen) const

{

  PObject::getSQLTypeName(env, &Notification::readSQL, schemaName,

        schemaNameLen, typeName, typeNameLen);

}

Notification::Notification()

{

}

Notification::Notification(void *ctxOCCI_) : oracle::occi::PObject (ctxOCCI_)

{

}

void *Notification::readSQL(void *ctxOCCI_)

{

  Notification *objOCCI_ = new(ctxOCCI_) Notification(ctxOCCI_);

  oracle::occi::AnyData streamOCCI_(ctxOCCI_);

  try

  {

    if (streamOCCI_.isNull())

      objOCCI_->setNull();

    else

      objOCCI_->readSQL(streamOCCI_);

  }

  catch (oracle::occi::SQLException& excep)

  {

    delete objOCCI_;

    excep.setErrorCtx(ctxOCCI_);

    return (void *)NULL;

  }

  return (void *)objOCCI_;

}

void Notification::readSQL(oracle::occi::AnyData& streamOCCI_)

{

   RECIRDID = streamOCCI_.getString();

   SUBJECT = streamOCCI_.getString();

   MESSAGE = streamOCCI_.getString();

}

void Notification::writeSQL(void *objectOCCI_, void *ctxOCCI_)

{

  Notification *objOCCI_ = (Notification *) objectOCCI_;

  oracle::occi::AnyData streamOCCI_(ctxOCCI_);

  try

  {

    if (objOCCI_->isNull())

      streamOCCI_.setNull();

    else

      objOCCI_->writeSQL(streamOCCI_);

  }

  catch (oracle::occi::SQLException& excep)

  {

    excep.setErrorCtx(ctxOCCI_);

  }

  return;

}

void Notification::writeSQL(oracle::occi::AnyData& streamOCCI_)

{

   streamOCCI_.setString(RECIRDID);

   streamOCCI_.setString(SUBJECT);

   streamOCCI_.setString(MESSAGE);

}

Notification::~Notification()

{

  int i;

}

----------------------------------generated by OTT RegisterMappings.h -------------------------------

#ifndef REGISTERMAPPINGS_ORACLE

# define REGISTERMAPPINGS_ORACLE

#ifndef OCCI_ORACLE

# include <occi.h>

#endif

#ifndef NOTIFICATION_ORACLE

# include "Notification.h"

#endif

void RegisterMappings(oracle::occi::Environment* envOCCI_);

#endif

----------------------------------generated by OTT RegisterMappings.cpp -------------------------------

#include "stdafx.h"

#ifndef REGISTERMAPPINGS_ORACLE

# include "RegisterMappings.h"

#endif

void RegisterMappings(oracle::occi::Environment* envOCCI_)

{

  oracle::occi::Map *mapOCCI_ = envOCCI_->getMap();

  mapOCCI_->put("C##BO_LINGRAND.NOTIFICATION_TYPE", &Notification::readSQL, &Notification::writeSQL);

}

----------------------------------Main.cpp -------------------------------

#include "stdafx.h"

#include <iostream>

using namespace std;

#include "AQ_Connection.h"

int _tmain(int argc, _TCHAR* argv[])

{

AQ_Connection *aq_connection = new AQ_Connection();

  aq_connection->create_callback();

  cout << "Press q or Q to stop the server." << std::endl;

while (true)

  {

  char ch;

  cin >> ch;

  if (ch == 'q')

  {

  break;

  }

  }

return 0;

}

Answers

  • WGabriel
    WGabriel Member Posts: 202 Bronze Badge
    edited Feb 24, 2016 4:59AM

    Hello,

    the setup of your AQ seems to be ok. Your payload is only a constant string:

    triger_payload := notification_type('record_ID', 'UPDATED', 'USERS');
    

    Presumably you later want to insert meaningful data like a true record ID (via trigger syntax :OLD.<value>, :NEW.<value>) and the corresponding trigger action.

    Did you check the inserted AQ data after your pure DML action (including a COMMIT, because this is not executed in your trigger)?

    Kind regards,

    WoG

  • 2685222
    2685222 Member Posts: 3
    edited Feb 24, 2016 12:11PM

    Hi,

    yes, you right, this is test version and a payload is temporarily. I will change it when i will received a notification on subscriber side.
    I debug c++ app and found that, mapped Notification object was called by connection and readSQL was successfully executed

    void Notification::readSQL(oracle::occi::AnyData& streamOCCI_)

    {

       RECIRDID = streamOCCI_.getString();

       SUBJECT = streamOCCI_.getString();

       MESSAGE = streamOCCI_.getString();

    }

    RECIRDID, SUBJECT and MESSAGE has same value like in payload.

    I am still working. If you interesting I can send to you vs201 Test project

    Thanks



This discussion has been closed.