Forum Stats

  • 3,825,197 Users
  • 2,260,480 Discussions
  • 7,896,437 Comments

Discussions

OEP 12c example with streaming data from oracle table

990087
990087 Member Posts: 10
edited Mar 31, 2017 5:02PM in Complex Event Processing

Hi,

Can anyone direct me to where I can find an example of using Oracle complex event processing 12c (12.1.3.1) that can detect changes in an oracle database table?

Answers

  • User757087-Oracle
    User757087-Oracle Member Posts: 30
    edited Mar 31, 2017 5:02PM

    Ideally, you would do this with a product called Oracle Golden Gate. http://www.oracle.com/technetwork/middleware/goldengate/overview/index.html

    It does think very efficiently because it has minimal impact on the performance of the source database because it finds the changes from the log files.

    But if you want to use OEP you could it in a way similiar to the adapter in the helloworld sample. You could set-up the datasource in the OEP configuration run an SQL query over and over again finding new rows. It would be something like this:

    <data-source>

                    <name>scbDS</name>

      <data-source-params>

                 <jndi-names>

                     <element>scbDS</element>

                 </jndi-names>

                 <global-transactions-protocol>None</global-transactions-protocol>

             </data-source-params>

                    <connection-pool-params>

                            <test-table-name> SQL SELECT 1 FROM DUAL</test-table-name>

                            <initial-capacity>1</initial-capacity>

                            <max-capacity>15</max-capacity>

                            <capacity-increment>1</capacity-increment>

                    </connection-pool-params>

                    <driver-params>

      <use-xa-data-source-interface>false</use-xa-data-source-interface>

                            <driver-name>oracle.jdbc.OracleDriver</driver-name>

                            <url>jdbc:oracle:thin:@server1:1521:db11g</url>

                            <properties>

                                    <element>

                                            <value>rto</value>

                                            <name>user</name>

                                    </element>

                                    <element>

                                            <value>welcome1</value>

                                            <name>password</name>

                                    </element>

                            </properties>

                    </driver-params>

            </data-source>

    package com.oracle.demo.adapter;

    import java.io.BufferedReader;

    import java.io.File;

    import java.io.FileReader;

    import java.io.IOException;

    import java.sql.Connection;

    import java.sql.PreparedStatement;

    import java.sql.ResultSet;

    import java.sql.SQLException;

    import java.text.ParseException;

    import java.text.SimpleDateFormat;

    import java.util.Date;

    import javax.annotation.Resource;

    import javax.sql.DataSource;

    import org.apache.commons.logging.Log;

    import org.apache.commons.logging.LogFactory;

    import com.bea.wlevs.ede.api.InitializingBean;

    import com.bea.wlevs.ede.api.ResumableBean;

    import com.bea.wlevs.ede.api.RunnableBean;

    import com.bea.wlevs.ede.api.StreamSender;

    import com.bea.wlevs.ede.api.StreamSource;

    import com.oracle.advtech.demo.bean.Offer;

    public class RTDEventDBAdapter implements StreamSource, RunnableBean,

      ResumableBean, InitializingBean {

      private Log log = LogFactory.getLog(RTDEventDBAdapter.class);

      private long timeStamp;

      private File dbFile = null;

      private int dbCount = 50;

      private String dbFilename = "Y";

      private StreamSender eventSender;

      private DataSource ds;

      private boolean suspended = false;

      private String selectStmt = "select * from RTDEvent where STATUS=? and ROWNUM <= ? order by SEQUENCEID asc";

      private String updateStmt = "update RTDEvent set STATUS= ? where SEQUENCEID= ?";

      private String selectCustStmt = "select * from T_CUST where CCNO=?";

      @Resource(name = "scbDS")

      public void setDataSource(DataSource dataSource) {

      this.ds = dataSource;

      }

      @Override

      public void run() {

      try {

      while (!isSuspended()) {

      //log.info("Reading RTDEvent table...");

      Connection conn = null;

      PreparedStatement pstmt1 = null;

      PreparedStatement pstmt2 = null;

      PreparedStatement pstmt3 = null;

      ResultSet rs = null;

      ResultSet rs2 = null;

      try {

      conn = ds.getConnection();

      conn.setAutoCommit(true);

      pstmt1 = conn.prepareStatement(selectStmt);

      pstmt1.setString(1, "0");

      pstmt1.setInt(2, checkFile());

      rs = pstmt1.executeQuery();

      while (rs.next()) {

      log.info("Got Row from RTDEvent table...");

      Offer event = new Offer();

      event.setSeqNo(rs.getInt("SEQUENCEID"));

      event.setnCust(rs.getString("NCUST_ID"));

      event.setCcNo(rs.getString("CARD_NO"));

      if (rs.getString("LONGITUDE") != null) {

      event.setLongitude(Double.valueOf(

      rs.getString("LONGITUDE")).doubleValue());

      } else {

      event.setLongitude(0);

      }

      if (rs.getString("LATITUDE") != null) {

      event.setLatitude(Double.valueOf(

      rs.getString("LATITUDE")).doubleValue());

      } else {

      event.setLatitude(0);

      }

      event.setStatus(rs.getString("STATUS"));

      event.setmLocLabel(rs

      .getString("MERCHANT_LOCATION_LABEL"));

      event.setModel_mcc1(rs.getString("MODEL_MCC1"));

      event.setModel_mcc2(rs.getString("MODEL_MCC2"));

      event.setModel_mcc3(rs.getString("MODEL_MCC3"));

      event.setClse1(rs.getString("MODEL_CLSE_1"));

      event.setClse2(rs.getString("MODEL_CLSE_2"));

      event.setClse3(rs.getString("MODEL_CLSE_3"));

      event.setClseClst1(rs.getString("MODEL_CLSE_CLUSTER_1"));

      event.setClseClst2(rs.getString("MODEL_CLSE_CLUSTER_2"));

      event.setClseClst3(rs.getString("MODEL_CLSE_CLUSTER_3"));

      // event.setTxTime(Long.parseLong(rs.getString("TX_TIME")));

      event.setTxTime(rs.getString("TX_TIME"));

      event.setTxDate(rs.getString("TX_DATE"));

      event.setTxType(rs.getString("TX_TYPE"));

      event.setmId(String.valueOf(rs

      .getInt("RTO_MERCHANT_ID")));

      event.setBranchId(String.valueOf(rs

      .getInt("RTO_MERCHANT_BRANCH_ID")));

      event.setMcc(rs.getString("MERCHANT_MCC"));

      event.setRegionId(String.valueOf(rs.getInt("REGION_ID")));

      event.setZipCode(rs.getString("ZIPCODE"));

      event.setCardTy(rs.getString("CARD_TY"));

      // Construct the txnDateTime for display it on dashboard

      String offerDateString = event.getTxDate();

      String offerTimeString = event.getTxTime();

      if (offerDateString != null

      && offerDateString.trim().length() > 0

      && offerTimeString != null

      && offerTimeString.trim().length() > 0) {

      String txDateTimeStr = offerDateString + " "

      + offerTimeString;

      SimpleDateFormat format_in = new SimpleDateFormat(

      "ddMMyyyy HHmmss");

      SimpleDateFormat format_out = new SimpleDateFormat(

      "dd/MM/yyyy HH:mm:ss");

      try {

      Date tempDate = format_in.parse(txDateTimeStr);

      event.setTxnDateTime(format_out

      .format(tempDate));

      } catch (ParseException e) {

      e.printStackTrace();

      }

      }

      pstmt2 = conn.prepareStatement(updateStmt);

      pstmt2.setString(1, "1");

      pstmt2.setInt(2, rs.getInt("SEQUENCEID"));

      pstmt2.executeUpdate();

      pstmt2.close();

      // 0=Not processed

      // 1=Pushed to RTD

      // 2=Message Sent Out

      // 3=Error at RTD, to be reprocessed

      pstmt3 = conn.prepareStatement(selectCustStmt);

      pstmt3.setString(1, event.getCcNo());

      rs2 = pstmt3.executeQuery();

      if (rs2.next()) {

      event.setMobileNo(rs2.getString("MOBILENO"));

      System.out.println("RTD EventDB Adapter Sending Event: " + event.toString());

      eventSender.sendInsertEvent(event);

      }

      if (rs2 != null) {

      rs2.close();

      }

      if (pstmt3 != null) {

      pstmt3.close();

      }

      }

      // try {Thread.sleep(10000);} catch (InterruptedException

      // e1) {}

      } catch (SQLException e) {

      e.printStackTrace();

      } catch (Exception e) {

      e.printStackTrace();

      } finally {

      try {

      if (rs != null) {

      rs.close();

      }

      if (rs2 != null && !rs2.isClosed()) {

      rs2.close();

      }

      if (pstmt1 != null) {

      pstmt1.close();

      }

      if (pstmt2 != null && !pstmt2.isClosed()) {

      pstmt2.close();

      }

      if (pstmt3 != null && !pstmt3.isClosed()) {

      pstmt3.close();

      }

      if (conn != null) {

      conn.close();

      }

      } catch (SQLException e) {

      e.printStackTrace();

      }

      }

      Thread.sleep(10 * 1000);

      }

      } catch (InterruptedException e) {

      e.printStackTrace();

      }

      }

      @Override

      public void setEventSender(StreamSender sender) {

      this.eventSender = sender;

      }

      public synchronized void suspend() throws Exception {

      suspended = true;

      }

      private synchronized boolean isSuspended() {

      return suspended;

      }

      @Override

      public void beforeResume() throws Exception {

      suspended = false;

      }

      private int checkFile() throws Exception {

      if (dbFile != null) {

      long timeStamp = dbFile.lastModified();

      if (this.timeStamp != timeStamp) {

      this.timeStamp = timeStamp;

      onChange(dbFile);

      }

      }

      return dbCount;

      }

      private void onChange(File file) throws Exception {

      BufferedReader in = null;

      String line;

      try {

      this.timeStamp = dbFile.lastModified();

      in = new BufferedReader(new FileReader(dbFile));

      line = in.readLine();

      if (line != null) {

      dbCount = Integer.parseInt(line);

      }

      } catch (Exception ee) {

      ee.printStackTrace();

      } finally {

      if (in != null) {

      try {

      in.close();

      } catch (IOException e) {

      e.printStackTrace();

      }

      }

      }

      }

      @Override

      public void afterPropertiesSet() throws Exception {

      if (System.getProperty("DBCOUNT_FILE") != null) {

      dbFilename = System.getProperty("DBCOUNT_FILE");

      dbFile = new File(dbFilename);

      onChange(dbFile);

      }

      }

    }

This discussion has been closed.