This discussion is archived
9 Replies Latest reply: Jan 8, 2007 3:31 PM by mdrake RSS

Sax Loader example

mdrake Expert
Currently Being Moderated
The following sample code shows a technique that can be used to maximize throughput when uploading a large XML File. The technique relies on breaking the large document into a series of fragments and inserting each of the fragments as a seperate XML document.

This technique is appropriate when the XML file consists of a collection of set of documents that have been combined into a single large document. Since the processing of parsing and creating the smaller documents in typically less expensive the task of inserting the smaller documents into the database it allows a single reader process to feed multiple writer processes. The number of writer processes can be tuned to maximize resource consumption and throughput..

The source code for the classes can be found below..
  • 1. Sax Loader Main
    mdrake Expert
    Currently Being Moderated
    package com.oracle.st.xmldb.pm.saxLoader;
    import com.oracle.st.xmldb.pm.common.baseApp.BaseApplication;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.PrintStream;
    import java.io.PrintWriter;
    import java.io.StringWriter;
    import java.lang.InterruptedException;
    import java.sql.Connection;
    import java.sql.SQLException;
    import java.text.DecimalFormat;
    import java.util.Hashtable;
    import java.util.Vector;
    import oracle.xml.parser.v2.SAXParser;
    import oracle.xml.parser.v2.XMLDocument;
    import org.w3c.dom.Document;
    import org.xml.sax.SAXException;
    public class SaxProcessor extends BaseApplication 
    {
      public static final boolean DEBUG = false;
      public static final String XML_NAMESPACE_NAMESPACE       = "http://www.w3.org/2000/xmlns/";
      public static final String XML_SCHEMA_INSTANCE_NAMESPACE = "http://www.w3.org/2001/XMLSchema-instance";
      public static final String SCHEMA_INSTANCE_PREFIX        = "schemaInstancePrefix";
      public static final String SCHEMA_LOCATION               = "schemaLocation";
      public static final String NO_NAMESPACE_SCHEMA_LOCATION  = "noNamespaceSchemaLocation";
      public static final String MAX_DOCUMENTS                  = "maxDocumentsToLoad";
      private Vector documentQueue = new Vector();
      private Vector loaderStatistics = new Vector();
      private Hashtable threadPool = new Hashtable();
      private int threadCount;
      private int readCount = 0;
      private int writeCount = 0;
      private int removedCount = 0;
      private void setWriterCount(int count)
      {
        this.threadCount = count;
      }
      
      private int getWriterCount()
      {
        return this.threadCount;
      }
      private synchronized boolean writersActive()
      {
        return getActiveThreadCount() > 0;
      }
      protected synchronized int getActiveThreadCount()
      {
        return this.threadPool.size();
      }
      Thread saxReader;
      private boolean parserActive = false;
      protected synchronized void setParserActive()
      {
        this.parserActive = true;
      }
      protected synchronized void setParsingComplete()
      {
        this.parserActive = false;
        notifyAll();
      }
      public synchronized boolean parsingComplete()
      {
        return !this.parserActive;
      }
      public synchronized void logWriteOperation()
      {
          this.writeCount = this.writeCount + 1;
      }
      public synchronized boolean processingComplete()
      {
        boolean result = (parsingComplete()) && (this.documentQueue.size() == 0);
        if (DEBUG)
        { 
          this.log("Parser Running = " + !parsingComplete() + ". Queue Size = " + this.documentQueue.size() + ". Result = " + result);
        }
        return result;
      }
      private boolean documentQueueFull()
      {
        return (this.documentQueue.size() >= (getWriterCount() * 2));
      }
      protected synchronized void addToQueue(Document xml)
      throws SAXException
      {
        if (documentQueueFull())
        {
          try
          {
            wait();
            if ( !writersActive() )
            {
              throw new SAXException("SaxLoader: No active Writer threads.");
            }
          }
          catch (InterruptedException ie)
          {
            log(ie);
          }
        }
        if (DEBUG)
        {
          this.log(xml);
        }
        this.documentQueue.addElement(xml);
        this.readCount = this.readCount + 1;
        if (readCount == Integer.parseInt(getSetting(MAX_DOCUMENTS,"-1")))
        {
          setParsingComplete();
        }
        notifyAll();
      }
      public synchronized Document getNextDocument(String thread)
      {
        Document xml = null;
        while (!parsingComplete() && (this.documentQueue.size() == 0))
        {
           try 
           {
              wait();
           } 
           catch (InterruptedException ioe)
           {
           }
        }
        if (this.documentQueue.size() > 0)
        {
          xml = (Document) this.documentQueue.remove(0);
          this.removedCount = this.removedCount + 1;
          notifyAll();
        };
        return xml;
      }
      public SaxProcessor() throws SQLException, IOException, SAXException
      {
         super();
      }   
      public void doSomething(String [] args)
      {
         try {
           setWriterCount(Integer.parseInt(getSetting("ThreadCount","4")));
           // createFileWriters();
           this.saxReader = createSaxProcessor();
           this.saxReader.start();
           createDatabaseWriters();
           waitForCompletion();
           writeStatistics();
         }
         catch (Exception e)
         {
            this.log("DocumentProcessor",e);
            this.setParsingComplete();
         }
      } 
      private synchronized void waitForCompletion()
      {
        while (!parsingComplete() )
        {
          try 
          {
            wait();
          } 
          catch (InterruptedException ioe)
          {
          }
        }
      }
      private void createFileWriters()
      {
         DecimalFormat df = (DecimalFormat) DecimalFormat.getInstance();
         df.applyPattern("000000");
         for (int i = 0 ; i < getWriterCount(); i++)
         {
           String threadName = "Writer_" + df.format(i+1);
           FileWriter writer = new FileWriter(this,threadName);
           writer.start();
           this.threadPool.put(threadName,writer);
        }    
      }
      private void createDatabaseWriters() throws SQLException
      {
         DecimalFormat df = (DecimalFormat) DecimalFormat.getInstance();
         df.applyPattern("000000");
         int commitCharge  = Integer.parseInt(getSetting("CommitCharge","50"));
         String targetTable      = getSetting("Table");
         String errorTable      = getSetting("ErrorTable");
         for (int i = 0 ; i < getWriterCount(); i++)
         {
            String threadName = "Writer_" + df.format(i+1);
            Connection conn = getNewConnection();
            conn.setAutoCommit(false);
            DatabaseWriter writer = new DatabaseWriter(this,threadName,conn);
            writer.setParameters(targetTable, errorTable, commitCharge);
            writer.start();
            this.threadPool.put(threadName,writer);
        }      
      }
      private Thread createSaxProcessor() throws SQLException
      {
         String threadName = "SaxReader";
         SourceProcessor saxReader = new SourceProcessor(threadName);
         saxReader.setSaxProcessor(this);
         saxReader.setTargetElement(getSetting("Element"));
         saxReader.setSchemaInformation(getSetting(SaxProcessor.SCHEMA_INSTANCE_PREFIX,null),
                                        getSetting(SaxProcessor.NO_NAMESPACE_SCHEMA_LOCATION,null),
                                        getSetting(SaxProcessor.SCHEMA_LOCATION,null));
         saxReader.setTargetFilename(getSetting("SourceXML","DIR"));
         return saxReader; 
      }
      public synchronized void writeStatistics()
      throws Exception
      {
          while (writersActive())
          {
             wait();
             log("Active Children = " + getActiveThreadCount());
          }
    
          this.log("Documents Queued = " + this.readCount + ". Docuuments De-Queued = " + this.removedCount + ". Documents Written = " + this.writeCount + ".");
    
          while (this.loaderStatistics.size() > 0)
           {
              LoaderStatistics stats = (LoaderStatistics) this.loaderStatistics.remove(0);
              this.log(stats.getThreadname() +  "," + stats.getStartTime() + "," + stats.getEndTime() + "," + stats.getFileCount() + "," + stats.getBytesProccesed());
           }
      }
    
      public synchronized void log (String s)
      {
        this.log.println(s);
      }
    
      public synchronized void log(Document xml)
      {
        try 
        {
          StringWriter sw = new StringWriter();
          PrintWriter pw = new PrintWriter(sw);
          ((XMLDocument) xml).print(pw);
          printXML(xml,pw);
          this.log.println(sw.toString());
        }
        catch (IOException ioe)
        {
          this.log(ioe);
        }
      }
    
      public synchronized void log(String thread, Document xml)
      {
        this.log.println("Thread " + thread +  " processed document " + this.writeCount + ".");
        try 
        {
          StringWriter sw = new StringWriter();
          PrintWriter pw = new PrintWriter(sw);
          printXML(xml,pw);
          pw.close();
          this.log.println(sw.toString());
        }
        catch (IOException ioe)
        {
          this.log(ioe);
        }
      }
    
      public synchronized void log(String thread, Document xml, Exception e)
      {
        this.log.println("Thread " + thread + " encountered Execption -");
        this.log(e);
        this.log.println("While processing the following Document -");
        try 
        {
          StringWriter sw = new StringWriter();
          PrintWriter pw = new PrintWriter(sw);
          printXML(xml,pw);
          pw.close();
          this.log.println(sw.toString());
        }
        catch (IOException ioe)
        {
          this.log(e);
        }
      }
    
      protected synchronized void printXML(Document xml, PrintWriter pw)
      throws IOException
      {
         ((XMLDocument) xml).print(pw);
      }
      public synchronized void log(Object object)
      {
         this.log.println(object);
         return;
      }
    
      public synchronized void log(String thread, Exception e)
      {
         this.log("Unexpected Exception raised in thread " + thread);
         this.log(e);
         return;
      }
      public synchronized void log(Exception e)
      {
         e.printStackTrace(this.log);
         return;
      }
    
      public synchronized void log(String thread, String file, Exception e)
      {
         this.log("Unexpected Exception raised in thread " + thread + " while processing File " + file);
         this.log(e);
         return;
      }
    
      public synchronized void recordStatistics(LoaderStatistics stats)
      {
        this.loaderStatistics.addElement(stats);
        this.threadPool.remove(stats.getThreadname());
        notify();
      }  
      public PrintStream getLog()
      {
        return this.log;
      }
      public static void main( String [] args )
      {
        try
        {
          SaxProcessor app = new SaxProcessor();
          app.initializeConnection();
          app.doSomething( args );
        }
        catch( Exception e )
        {
           e.printStackTrace();
        }
      }    
    }
  • 2. Source Processor
    mdrake Expert
    Currently Being Moderated
    package com.oracle.st.xmldb.pm.saxLoader;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.PrintWriter;
    import java.io.StringWriter;
    import java.util.Enumeration;
    import java.util.Hashtable;
    import oracle.xml.parser.v2.SAXParser;
    import oracle.xml.parser.v2.XMLDocument;
    import oracle.xml.parser.v2.XMLElement;
    import org.w3c.dom.Attr;
    import org.w3c.dom.Document;
    import org.w3c.dom.Element;
    import org.w3c.dom.Node;
    import org.xml.sax.Attributes;
    import org.xml.sax.ContentHandler;
    import org.xml.sax.Locator;
    import org.xml.sax.SAXException;
    public class SourceProcessor extends Thread implements ContentHandler 
    {
      public static final boolean  DEBUG = false;
      private String xsiPrefix = null;
      private String noNamespaceSchemaLocation = null;
      private String schemaLocation=null;
      boolean explicitSchemaLocation = false;
      private boolean explicitSchemaLocation()
      {
          return this.explicitSchemaLocation;
      }
      private String targetFilename = null;
      private String targetElement = null;
      protected String getTargetElementName() 
      {
        return this.targetElement;
      }
      private boolean isTargetElement(String elementName)
      {
        return ((currentNode == null) && (elementName.equals(getTargetElementName())));
      }
      private XMLDocument currentDocument;
      private void setDocument(XMLDocument newDocument)
      {
          this.currentDocument = newDocument;
          setCurrentNode(newDocument);
      }
      private XMLDocument getDocument()
      {
          return this.currentDocument;
      }  
      private Node currentNode;
      private void setCurrentNode(Node node)
      {
          this.currentNode = node;
      }
      private Node getCurrentNode()
      {
          return this.currentNode;
      }
      private Hashtable namespaceToPrefix = null;
      private Hashtable prefixToNamespace = null;
      private SaxProcessor processor;
      public SourceProcessor(String threadName)
      {
        super(threadName);
        this.namespaceToPrefix = new Hashtable();
        this.prefixToNamespace = new Hashtable();
      }
      public void setSaxProcessor(SaxProcessor processor) 
      {
        this.processor = processor;
      }
      public void setTargetElement(String targetElement)
      {
        this.targetElement = targetElement;
      }
      public void setTargetFilename(String filename)
      {
        this.targetFilename = filename;
      }
      public void setSchemaInformation(String xsiPrefix, String noNamespaceSchemaLocation, String schemaLocation)
      {
        this.xsiPrefix = xsiPrefix;
        this.noNamespaceSchemaLocation = noNamespaceSchemaLocation;
        this.schemaLocation = schemaLocation;
        this.explicitSchemaLocation = this.xsiPrefix != null;
      }
    
      public void startDocument() throws SAXException
      {
        if (DEBUG)
        {
            this.processor.log("startDocument : Processing Document");      
        }
      }
      private void createNewDocument()
      throws SAXException
      {
        setDocument(new XMLDocument());   
      }
      public void endDocument()
      throws SAXException
      { 
        this.processor.setParsingComplete();
      }
      public void startElement(String namespaceURI, String localName, String elementName, Attributes attrs) 
      throws SAXException
      {
        if (DEBUG)
        {
            this.processor.log("startElement : Namespace = " + namespaceURI + ",localName = " + localName + ",Name = " + elementName);  
        }
        if (getDocument() == null)
        {
          if (isTargetElement(localName)) 
          {
            if (DEBUG) { 
              processor.log("startElement() : Starting new document : Found instance of " + localName);
            }
            createNewDocument();
          }     
          else
          {
            if (DEBUG) 
              {
                  if (!explicitSchemaLocation())
                  {
                    for (int i=0; i < attrs.getLength(); i++)
                    {
                      if (DEBUG)
                      {
                        this.processor.log("processAttributes : Local Name = " + attrs.getLocalName(i) + ", Q Name = " + attrs.getQName(i) + ",Type = " + attrs.getType(i) + ", URI = " + attrs.getURI(i) +  ", Value = " + attrs.getValue(i));
                      }
                      if (attrs.getURI(i).equals(SaxProcessor.XML_SCHEMA_INSTANCE_NAMESPACE))
                      { 
                        if (attrs.getLocalName(i).equals(SaxProcessor.NO_NAMESPACE_SCHEMA_LOCATION))
                        {
                          this.noNamespaceSchemaLocation = attrs.getValue(i);
                        }
                        if (attrs.getLocalName(i).equals(SaxProcessor.SCHEMA_LOCATION))
                        {
                          this.schemaLocation = attrs.getValue(i);
                        }
                      }
                    }
                  }
                  processor.log("startElement() : Skipping element " + localName);
              }
          }
        }
        if (getCurrentNode() != null)
        {
          XMLElement nextElement = createNewElement(namespaceURI, localName, elementName, attrs);
          getCurrentNode().appendChild(nextElement);
          setCurrentNode(nextElement);
        }
      }
      public void endElement(String namespaceURI, String localName, String qName) throws SAXException
      {     
        if (DEBUG)
        {
            this.processor.log("endElement : Namespace = " + namespaceURI + ",localName = " + localName + ",qName = " + qName);  
        }
        if (getDocument() != null)
        {
          setCurrentNode(getCurrentNode().getParentNode());
          if (getCurrentNode().equals(getDocument()))
          {
            if (this.xsiPrefix != null)
            {
              // getDocument().getDocumentElement().setAttribute("xmlns:" + this.xsiPrefix,SaxProcessor.XML_SCHEMA_INSTANCE_NAMESPACE);;
              if (this.noNamespaceSchemaLocation != null)        
              {
                getDocument().getDocumentElement().setAttributeNS(SaxProcessor.XML_SCHEMA_INSTANCE_NAMESPACE,this.xsiPrefix + ":" + SaxProcessor.NO_NAMESPACE_SCHEMA_LOCATION,this.noNamespaceSchemaLocation);;
              }
              if (this.schemaLocation != null)        
              {
                getDocument().getDocumentElement().setAttributeNS(SaxProcessor.XML_SCHEMA_INSTANCE_NAMESPACE,this.xsiPrefix + ":" + SaxProcessor.SCHEMA_LOCATION,this.schemaLocation);;
              }
            }
            if (DEBUG) 
            {
              this.processor.log("Completed Document");
              try {
                this.processor.printXML(getDocument(),new PrintWriter(this.processor.getLog()));
              }
              catch (IOException ioe)
              {
                throw new SAXException(ioe);
              }
            }
            setCurrentNode(null);
            this.processor.addToQueue(getDocument());
            if (this.processor.parsingComplete())
            {
              throw new ProcessingCompleteException();
            }
            setDocument(null);
          }      
        }
      } 
      private XMLElement createNewElement(String namespaceURI, String localName, String elementName, Attributes attrs)
      {
        XMLElement newElement = null;
        if (namespaceURI != null)
        {
          if (this.namespaceToPrefix.containsKey(namespaceURI))
          {
            /* Namespace in already in Scope - create Element from Qualified Name */
            newElement = (XMLElement) getDocument().createElement(elementName);      
          }
          else 
          {
            /* Namespace is not already in Scope - create Element with namespace */
            newElement = (XMLElement) getDocument().createElementNS(namespaceURI,elementName);      
            newElement.setPrefix((String) this.namespaceToPrefix.get(namespaceURI));
          }
        }
        else
        {
          newElement = (XMLElement) getDocument().createElement(localName);
        }
        addAttributes(newElement,attrs);
        if (getCurrentNode().equals(getDocument()))
        {
          addNamespaceDeclarations(newElement);
        }
        return newElement;
      }
      private void addAttributes(Element element, Attributes attrs)
      {
        for (int i=0; i < attrs.getLength(); i++)
        {
          if (DEBUG)
          {
            this.processor.log("processAttributes : Local Name = " + attrs.getLocalName(i) + ", Q Name = " + attrs.getQName(i) + ",Type = " + attrs.getType(i) + ", URI = " + attrs.getURI(i) +  ", Value = " + attrs.getValue(i));
          }
          if (attrs.getURI(i).equals("http://www.w3.org/2000/xmlns/"))
          { 
          }
          else
          {
            element.setAttribute(attrs.getQName(i),attrs.getValue(i));
          }
        }
      }
      private void addNamespaceDeclarations(Element element)
      {
        Enumeration keys = this.namespaceToPrefix.keys();
        while (keys.hasMoreElements())
        {
          String namespace = (String) keys.nextElement();
          String prefix = (String) namespaceToPrefix.get(namespace);
          Attr attr = null;
          if (prefix.equals(""))
          {
            attr = getDocument().createAttribute("xmlns");
            attr.setValue(namespace);
            element.setAttributeNode(attr);
          }
          else
          {
            if (!prefix.equals(element.getPrefix()))
              {
                attr = getDocument().createAttribute("xmlns:" + prefix);
                attr.setValue(namespace);
                element.setAttributeNode(attr);
              }
          }
        }
      }
      public void characters(char[] p0, int p1, int p2) throws SAXException
      {
        if (getDocument() != null)
        {
          StringWriter sw = new StringWriter();
          sw.write(p0,p1,p2);
          String value = sw.toString();
          Node textNode = getDocument().createTextNode( value );
          getCurrentNode().appendChild( textNode );
        }
      }
      public void startPrefixMapping(String prefix, String uri)
      throws SAXException
      {
        if (DEBUG)
        {
          this.processor.log("startPrefixMapping() : Prefix = " + prefix + ", URI = " + uri);
        }
        if (uri.equals(SaxProcessor.XML_SCHEMA_INSTANCE_NAMESPACE))
        {
           if (explicitSchemaLocation())
           {
              return;
           }
           else
           {
              this.xsiPrefix = prefix;
           }       
        }
        this.namespaceToPrefix.put(uri,prefix);
        this.prefixToNamespace.put(prefix,uri);
      }
      public void endPrefixMapping(String prefix) 
      throws SAXException
      {
        if (DEBUG)
        {
          this.processor.log("endPrefixMapping() : Prefix = " + prefix);
        }
        Enumeration e = prefixToNamespace.keys();
        while (e.hasMoreElements()) {
            String thisPrefix = (String) e.nextElement();
            if (thisPrefix.equals(prefix))
            {
              String namespace = (String) prefixToNamespace.remove(thisPrefix);
              namespaceToPrefix.remove(namespace);
            }
        }
      }
    
      public void ignorableWhitespace(char[] p0, int p1, int p2) throws SAXException
      {
        // throw new SAXException ("Un-Implemented Method: ingnoreableWhitespace");
      }
    
      public void processingInstruction(String p0, String p1) throws SAXException
      {
        throw new SAXException ("Un-Implemented Method: processingInstruction");
      }
    
      public void setDocumentLocator(Locator p0)
      {
        // throw new SAXException ("Un-Implemented Method: setDocumentLocator");
      }
    
      public void skippedEntity(String p0) throws SAXException
      {
        throw new SAXException ("Un-Implemented Method: skippedEntity");
      }
      public void run()
      {
        try 
        {
          SAXParser parser = new SAXParser();
          parser.setAttribute(SAXParser.STANDALONE,Boolean.valueOf(true));
          parser.setValidationMode(SAXParser.NONVALIDATING);
          parser.setContentHandler(this);
          this.processor.setParserActive();
          parser.parse(new FileInputStream(this.targetFilename));
        }
        catch (ProcessingCompleteException pce)
        {
        }
        catch (Exception e)
        {
          this.processor.log(this.getName(),e);
        }
      }
    }
  • 3. Database Writer
    mdrake Expert
    Currently Being Moderated
    package com.oracle.st.xmldb.pm.saxLoader;
    import java.io.IOException;
    import java.io.PrintWriter;
    import java.io.Writer;
    import java.sql.CallableStatement;
    import java.sql.Connection;
    import java.sql.SQLException;
    import java.text.DateFormat;
    import java.util.Calendar;
    import java.util.GregorianCalendar;
    import java.util.Locale;
    import oracle.sql.CLOB;
    import org.w3c.dom.Document;
    public class DatabaseWriter extends Thread
    {
      public static final boolean DEBUG = false;
      private Locale locale = new Locale(Locale.ENGLISH.toString(), "US");
      private SaxProcessor processor;
      private Connection connection;
      private String threadName;
      private String targetTable;
      private String errorTable;
      private int commitCharge;
      private double byteCount;
      private int fileCount;
      private Calendar startTime;
      private Calendar endTime;
    
      public DatabaseWriter(SaxProcessor processor, String name, Connection dbConnection)
      {
         this.processor = processor;
         this.threadName = name;
         this.connection = dbConnection;
         this.byteCount = 0;
         this.fileCount = 0;
      }
    
       public void setParameters(String targetTable, String errorTable, int commitCharge)
       {
          this.targetTable  = targetTable;
          this.errorTable   = errorTable;
          this.commitCharge = commitCharge;
      }
    
       private void updateByteCount(double bytes)
       {
          this.byteCount = this.byteCount + bytes;
          this.fileCount ++;
       }
    
       private int getFileCount()
       {
         return this.fileCount;
       }
    
       private double getByteCount()
       {
         return this.byteCount;
       }
    
       private void setStartTime()
       {
          this.startTime = new GregorianCalendar(this.locale);
       }
    
      private void setEndTime()
      {
          this.endTime = new GregorianCalendar(this.locale);
      }
    
      private String getStartTime()
      {
         DateFormat df = DateFormat.getDateTimeInstance(DateFormat.MEDIUM,DateFormat.LONG);
         return df.format(this.startTime.getTime());
      }
    
      private String getEndTime()
      {
         DateFormat df = DateFormat.getDateTimeInstance(DateFormat.MEDIUM,DateFormat.LONG);
         return df.format(this.endTime.getTime());
      }
    
      private Calendar getElapsedTime()
      {
        return null;
      }
    
      private int getTransferRate()
      {
         return 0;
      }
      
      private Connection getConnection()
      throws Exception
      {
        return this.connection;
      }
    
      private String getTargetTable()
      {
        return this.targetTable;
      }
    
      private String getErrorTable()
      {
        return this.errorTable;
      }
    
      private CallableStatement  prepareSQLStatement(String sqlText)
      throws Exception
      {
          CallableStatement sqlStatement = null; 
          try
          {
             sqlStatement = getConnection().prepareCall(sqlText);
          }
          catch (SQLException SQLe)
          {
            this.processor.log(this.threadName,SQLe);      
            if (sqlStatement != null)
            {
              sqlStatement.close();
            }
          }
           return sqlStatement;
        }
    
        public void run()
        {
          Exception fatalError = null;
          setStartTime();    
          this.processor.log("Thread " + this.threadName + " started at " + getStartTime());
          String insertStatementText   = "insert into " + getTargetTable() + " values (xmlParse(DOCUMENT ? WELLFORMED))";
          String errorStatementText = "insert into " + getErrorTable() + " values (xmlParse(DOCUMENT ? WELLFORMED))";
          this.processor.log("Thread " + this.threadName + " using Insert Statement : " + insertStatementText);
          this.processor.log("Thread " + this.threadName + " using Error Statement  : " + errorStatementText);      
          try 
          {
            CallableStatement insertStatementSQL = prepareSQLStatement(insertStatementText);
            CallableStatement errorStatementSQL  = prepareSQLStatement(errorStatementText);
            CLOB clob = CLOB.createTemporary( getConnection(), true, CLOB.DURATION_SESSION );
            try 
            {
              while (!this.processor.processingComplete() )
              {
                 Document nextDocument = this.processor.getNextDocument(this.threadName);
                 if ( nextDocument != null )
                 {
                   try {
                     int bytesWritten = uploadDocument(insertStatementSQL,clob,nextDocument);
                     updateByteCount(bytesWritten);
                   }
                   catch (SQLException SQLe)
                   {
                     this.processor.log(this.threadName, nextDocument, SQLe);
                     uploadDocument(errorStatementSQL,clob,nextDocument);
                   }
                   if ((getFileCount() % this.commitCharge) == 0)
                   {
                     getConnection().commit();
                     this.processor.log("Thread " + this.threadName + " inserted " + getFileCount() + " records.");
                   }
                 }
               }
               getConnection().commit();
               insertStatementSQL.close();
               errorStatementSQL.close();
               getConnection().close(); 
            }
            catch (Exception e)
            {
               this.processor.log(this.threadName,e);
               fatalError = e;
               getConnection().rollback();
            }
            finally 
            {
              if (!getConnection().isClosed())
              {
                  getConnection().rollback();
                  getConnection().close();
              }
            }
          }
          catch (Exception e)
          {
            this.processor.log(this.threadName,e);
          }
          setEndTime();
          this.processor.log("Thread " + this.threadName + " completed at " + getStartTime());
          this.processor.recordStatistics(new LoaderStatistics(this.threadName,this.startTime,this.endTime,getFileCount(),getByteCount(),fatalError));  
        }
        private int uploadDocument(CallableStatement sqlStatement, CLOB clob, Document xml)
        throws Exception, SQLException, IOException
        {
            clob.truncate(0);
            Writer out = clob.setCharacterStream(0);
            this.processor.printXML(xml,new PrintWriter(out));
            out.close();
            
            this.processor.logWriteOperation();
            
            sqlStatement.setClob(1,clob);
            sqlStatement.execute();         
    
            if (DEBUG) 
            {
              this.processor.log(this.threadName,xml);
            }
            int length = (int) clob.length();
            return length ;
        }
    }
  • 4. Processing Complete
    mdrake Expert
    Currently Being Moderated
    package com.oracle.st.xmldb.pm.saxLoader;
    import org.xml.sax.SAXException;
    
    public class ProcessingCompleteException extends SAXException 
    {
      public ProcessingCompleteException()
      {
        super("Processing Complete");
      }
    }
  • 5. LoaderStatistics
    mdrake Expert
    Currently Being Moderated
    package com.oracle.st.xmldb.pm.saxLoader;
    import java.util.Calendar;
    import java.text.DateFormat;
    import java.text.DecimalFormat;
    public class LoaderStatistics 
    {
       private String m_ThreadName;
       private Calendar m_StartTime;
       private Calendar m_EndTime;
       private int m_FileCount;
       private double m_BytesProcessed;
       private Exception m_FatalException;
       
       public LoaderStatistics(String name, Calendar start, Calendar end, int files, double bytes,Exception e)
       {
           m_ThreadName = name;
           m_StartTime = start;
           m_EndTime = end;
           m_FileCount = files;
           m_BytesProcessed = bytes;
           m_FatalException = e;
       }
    
       public String getThreadname()
       {
           return m_ThreadName;
       }
    
       public String getStartTime()
       {
          DateFormat df = DateFormat.getDateTimeInstance(DateFormat.MEDIUM,DateFormat.LONG);
          return df.format(m_StartTime.getTime());
       }
    
       public String getEndTime()
       {
          DateFormat df = DateFormat.getDateTimeInstance(DateFormat.MEDIUM,DateFormat.LONG);
          return df.format(m_EndTime.getTime());
       }
    
       public int getFileCount()
       {
           return m_FileCount;
       }
       public String getBytesProccesed()
       {
           DecimalFormat df = (DecimalFormat) DecimalFormat.getInstance();
           df.applyPattern("###,###,##0");
           return df.format(m_BytesProcessed);
       }
    
    }
  • 6. FileWriter
    mdrake Expert
    Currently Being Moderated
    package com.oracle.st.xmldb.pm.saxLoader;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.PrintWriter;
    import java.io.Writer;
    import java.text.DateFormat;
    import java.util.Calendar;
    import java.util.GregorianCalendar;
    import java.util.Locale;
    import oracle.xml.parser.v2.XMLDocument;
    import org.w3c.dom.Document;
    import org.w3c.dom.Element;
    import org.w3c.dom.NodeList;
    public class FileWriter extends Thread
    {
    
      public static final boolean DEBUG = false;
    
      private Locale m_Locale = new Locale(Locale.ENGLISH.toString(), "US");
      
      private SaxProcessor processor;
      private String threadName;
    
      private double byteCount;
      private int fileCount;
    
      private Calendar startTime;
      private Calendar endTime;
    
      public FileWriter(SaxProcessor processor, String name)
      {
         this.processor = processor;
         this.threadName = name;
         this.byteCount = 0;
         this.fileCount = 0;
      }
    
       private void updateByteCount(double bytes)
       {
          this.byteCount = this.byteCount + bytes;
          this.fileCount ++;
       }
    
       private int getFileCount()
       {
         return this.fileCount;
       }
    
       private double getByteCount()
       {
         return this.byteCount;
       }
    
       private void setStartTime()
       {
          this.startTime = new GregorianCalendar(m_Locale);
       }
    
      private void setEndTime()
      {
          this.endTime = new GregorianCalendar(m_Locale);
      }
    
      private String getStartTime()
      {
         DateFormat df = DateFormat.getDateTimeInstance(DateFormat.MEDIUM,DateFormat.LONG);
         return df.format(this.startTime.getTime());
      }
    
      private String getEndTime()
      {
         DateFormat df = DateFormat.getDateTimeInstance(DateFormat.MEDIUM,DateFormat.LONG);
         return df.format(this.endTime.getTime());
      }
    
      private Calendar getElapsedTime()
      {
        return null;
      }
    
      private int getTransferRate()
      {
         return 0;
       }
      
        public void run()
        {
          Exception fatalError = null;
          setStartTime();
          
          this.processor.log("Thread " + this.threadName + " started at " + getStartTime());
          
          try 
          {
    
              while (!this.processor.processingComplete() )
              {
                 XMLDocument nextDocument = (XMLDocument) this.processor.getNextDocument(this.threadName);
                 if (nextDocument != null) {
                   int bytesWritten = writeDocument(nextDocument);
                   updateByteCount(bytesWritten);
                 }
               
              }
          }
          catch (Exception e)
          {
            this.processor.log(this.threadName,e);
          }
          setEndTime();
          this.processor.log("Thread " + this.threadName + " completed at " + getStartTime());
          this.processor.recordStatistics(new LoaderStatistics(this.threadName,this.startTime,this.endTime,getFileCount(),getByteCount(),fatalError));  
        }
    
        private int writeDocument(XMLDocument xml)
        throws Exception, IOException
        {
            Element root = xml.getDocumentElement();
            NodeList nodes = root.getElementsByTagName("docnum");
            String filename = nodes.item(0).getFirstChild().getNodeValue();
            filename = filename + ".xml";
            
            FileOutputStream out = new FileOutputStream(filename);
            xml.print(new PrintWriter(out));
            out.close();
            
            this.processor.logWriteOperation();    
    
            if (DEBUG) 
            {
              this.processor.log(this.threadName,xml);
            }
            return 0;
         }
    }
  • 7. Base Application
    mdrake Expert
    Currently Being Moderated
    package com.oracle.st.xmldb.pm.common.baseApp;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.io.Writer;
    import java.sql.Connection;
    import java.sql.SQLException;
    import oracle.sql.CLOB;
    import org.xml.sax.SAXException;
    /**
     * A Class class.
     * <P>
     * @author Mark D. Drake
     * Please complete these missing tags
     * @rref
     * @copyright
     * @concurrency
     * @see
     */
    
    public abstract class BaseApplication extends ConnectionProvider
    {
        public static final boolean DEBUG = true;
    
        public abstract void doSomething( String [] args ) throws Exception;
    
        public CLOB createCLOB( Connection conn, InputStream is )
        throws SQLException, IOException
        {
            CLOB clob = CLOB.createTemporary( conn, false, CLOB.DURATION_SESSION);
            writeToClob(clob,is);
            return clob;
        }
    
        public CLOB writeToClob( CLOB clob, InputStream is)
        throws SQLException, IOException 
        {
            InputStreamReader reader = new InputStreamReader( is );
            Writer writer = clob.setCharacterStream(0);
            
            char [] buffer = new char [ clob.getChunkSize() ];
            for( int charsRead = reader.read( buffer );
            charsRead > - 1;
            charsRead = reader.read( buffer ) )
            {
                writer.write( buffer, 0, charsRead );
            }
            writer.close();
            return clob;
        }
        /**
         * Please complete the missing tags for main
         * @param
         * @return
         * @throws
         * @pre
         * @post
         */
        public BaseApplication() 
        {
            super();
        }
    }
  • 8. ConnectionProvider
    mdrake Expert
    Currently Being Moderated
    package com.oracle.st.xmldb.pm.common.baseApp;
    import java.io.File;
    import java.io.FileReader;
    import java.io.IOException;
    import java.io.PrintStream;
    import java.io.PrintWriter;
    import java.io.Reader;
    import java.io.StringWriter;
    import java.io.Writer;
    import java.sql.DriverManager;
    import java.sql.Connection;
    import java.sql.SQLException;
    import java.util.Properties;
    import oracle.jdbc.OracleConnection;
    import oracle.jdbc.pool.OracleOCIConnectionPool;
    import oracle.jdbc.oci.OracleOCIConnection;
    import oracle.xml.parser.v2.DOMParser;
    import oracle.xml.parser.v2.XMLDocument;
    import oracle.xml.parser.v2.XMLElement;
    import org.xml.sax.SAXException;
    import org.w3c.dom.Element;
    import org.w3c.dom.Text;
    import org.w3c.dom.NodeList;
    import java.io.InputStream;
    public class ConnectionProvider extends Object
    {
        public static final boolean DEBUG = true;
    
        protected OracleConnection connection;
    
        protected XMLDocument connectionDefinition;
    
        protected OracleOCIConnectionPool connectionPool;
    
        public static final String CONNECTION = "Connection";
        public static final String DRIVER = "Driver";
        public static final String HOSTNAME = "Hostname";
        public static final String PORT = "Port";
        public static final String SID = "SID";
        public static final String SERVICENAME = "ServiceName";
        public static final String SERVERMODE = "Server";
        public static final String SCHEMA = "Schema";
        public static final String PASSWORD = "Password";
        public static final String POOL = "Pool";
        public static final String THIN_DRIVER = "thin";
        public static final String OCI_DRIVER = "oci8";
    
        public static final String DEFAULT_CONNECTION_DEFINITION = "c:\\temp\\connection.xml";
        public static final String DEFAULT_DRIVER = OCI_DRIVER;
        public static final String DEFAULT_HOSTNAME = "localhost";
        public static final String DEFAULT_PORT = "1521";
        public static final String DEFAULT_SERVERMODE = "DEDICATED";
    
        public static final String TARGET_DIRECTORY = "targetDirectory";
    
        protected PrintStream log;
        
        public ConnectionProvider() {
            
        }
        
        public void initializeConnection()     
        throws SAXException, IOException, SQLException
        {
            this.initializeConnection(System.out);
        }
        
        public void initializeConnection(PrintStream log)
        throws SAXException, IOException, SQLException
        {
          DriverManager.registerDriver( new oracle.jdbc.driver.OracleDriver() );
          this.log = log;
          loadConnectionSettings();
          this.connection = openConnection();
        }
        
        public ConnectionProvider getConnectionProvider() {
            return this;
        }
        
        public void initalizeConnection(String connectionLocation, PrintStream log)
        throws SAXException, IOException, SQLException
        {
          DriverManager.registerDriver( new oracle.jdbc.driver.OracleDriver() );
          this.log = log;
          loadConnectionSettings(connectionLocation);
          this.connection = openConnection();
        }
        public void setLogger(PrintStream log)
        {
          this.log = log;
        }
        private void setConnectionSettings(XMLDocument doc)
        {
          this.connectionDefinition = doc;
        }
    
        private void dumpConnectionSettings()
        throws IOException
        {
          StringWriter sw = new StringWriter();
          PrintWriter pw = new PrintWriter(sw);
          this.connectionDefinition.print(pw);
          pw.close();
          sw.close();
        }
        private void setConnectionPool(OracleOCIConnectionPool pool)
        {
          this.connectionPool = pool;
        }
        public OracleConnection getConnection()
        throws SQLException
        {
          if (this.isPooled())
          {
            return (OracleOCIConnection) this.connectionPool.getConnection();
          }
          else
          {
            return this.connection;
          }
       }
        
       public void closeConnection(Connection conn)
       throws Exception
       {
          if (isPooled())
          {
           conn.close();
          }
        } 
        public Connection getConnection(String schema, String passwd)
        throws Exception
        {
          if (isPooled())
          {
            return (OracleOCIConnection) this.getConnection(schema,passwd);
          }
          else
          {
            return this.connection;
          }
        }
        public String getSetting(String nodeName)
        {
          return getSetting(nodeName, null);
        }
     
        public String getSetting(String nodeName, String defaultValue)
        {
          String textValue = null;
          XMLElement root = (XMLElement) this.connectionDefinition.getDocumentElement();
          NodeList children = root.getChildrenByTagName(nodeName);
          if (children.getLength() != 0)
          {
            Element  element = (Element) children.item(0);
            Text text = (Text) element.getFirstChild();
            if (text != null)
            {
                return text.getData();
            }
          }
          return defaultValue;
        }   
        protected String getDriver()
        {
            return getSetting(DRIVER,DEFAULT_DRIVER);
        }
        protected String getHostname()
        {
            return getSetting(HOSTNAME,DEFAULT_HOSTNAME);
        }
        protected String getPort()
        {
            return getSetting(PORT,DEFAULT_PORT);
        }
        protected String getServerMode()
        {
            return getSetting(SERVERMODE,DEFAULT_SERVERMODE);
        }
        protected String getServiceName()
        {
            return getSetting(SERVICENAME);
        }
        protected String getSID()
        {
            return getSetting(SID);
        }
        protected boolean isPooled()
        {
            String usePool =  getSetting(POOL,Boolean.FALSE.toString());
            return !usePool.equalsIgnoreCase(Boolean.FALSE.toString());
        }
        protected String getSchema()
        {
            return getSetting(SCHEMA);
        }
        protected String getPassword()
        {
            return getSetting(PASSWORD);
        }
        
        public void loadConnectionSettings()
        throws IOException, SAXException
        {
          String filename = System.getProperty( "com.oracle.st.xmldb.pm.ConnectionParameters", this.DEFAULT_CONNECTION_DEFINITION ) ;
          loadConnectionSettings(filename);
        }
        public void loadConnectionSettings(String filename)
        throws IOException, SAXException
        {
          if (DEBUG)
          {
            System.out.println("Using connection Parameters from : " + filename);
          }
          Reader reader = new FileReader(new File(filename));
          DOMParser parser = new DOMParser();
          parser.parse(reader);
          XMLDocument doc = parser.getDocument();
          setConnectionSettings(doc);
          if (DEBUG)
          {
            dumpConnectionSettings();
          }
        }
        protected String getDatabaseURL()
        {
            if( getDriver() != null)
            {
              if( getDriver().equalsIgnoreCase( THIN_DRIVER ) )
              {
                  return "jdbc:oracle:thin:@" + getHostname() + ":" + getPort() + ":" + getSID();
              }
              else
              {
                  return "jdbc:oracle:oci8:@(description=(address=(host=" + getHostname() + ")(protocol=tcp)(port=" + getPort() + "))(connect_data=(service_name=" + getServiceName() + ")(server=" + getServerMode() + ")))";
              }
            }
            else
            {
              return null;
            }
        }   
        private OracleConnection openConnection()
        throws SQLException
        {
            String user = getSchema();
            String password = getPassword();
            String connectionString = user + "/" + password + "@" + getDatabaseURL();
            OracleConnection conn = null;
            if( DEBUG )
            {
                this.log.println( "ConnectionProvider.establishConnection(): Connecting as " + connectionString );
            }
            try
            {
                conn = (OracleConnection) DriverManager.getConnection( getDatabaseURL(), user, password );
                if( DEBUG )
                {
                    this.log.println( "ConnectionProvider.establishConnection(): Database Connection Established" );
                }
            }
            catch( SQLException sqle )
            {
                int err = sqle.getErrorCode();
                this.log.println( "ConnectionProvider.establishConnection(): Failed to connect using " + connectionString );
                sqle.printStackTrace(this.log);
                throw sqle;
            }
            return conn;
        }
        public OracleConnection getNewConnection()
        throws SQLException
        {
          return openConnection();
        }
      
        public OracleOCIConnectionPool createConnectionPool()
        throws Exception
        {
          OracleOCIConnectionPool pool;
          Properties poolConfig  = new Properties( );
          poolConfig.put (OracleOCIConnectionPool.CONNPOOL_MIN_LIMIT, "1");
          poolConfig.put (OracleOCIConnectionPool.CONNPOOL_MAX_LIMIT, "1");
          poolConfig.put (OracleOCIConnectionPool.CONNPOOL_INCREMENT, "0");
          return new OracleOCIConnectionPool(getSchema(),getPassword(), getDatabaseURL(),poolConfig);
        }
       public XMLDocument getConnectionSettings()
       {
         return this.connectionDefinition;
       }
    }
  • 9. Configuration File
    mdrake Expert
    Currently Being Moderated
    The SaxLoader is driven by an XML configuratio file
    <?xml version='1.0' encoding='windows-1252'?>
    <Connection>
      <Driver>Thin</Driver>
      <Hostname>localhost</Hostname>
      <Port>1521</Port>
      <ServiceName>orcl.xp.mark.drake.oracle.com</ServiceName>
      <SID>orcl</SID>
      <ServerMode>DEDICATED</ServerMode>
      <Schema>RDF</Schema>
      <Password>RDF</Password>
      <SourceXML>C:\temp\uniprot1.xml</SourceXML>
      <Element>Description</Element>
      <Table>RDF_DOCUMENT_TABLE</Table>
      <ErrorTable>RDF_ERROR_TABLE</ErrorTable>
      <schemaInstancePrefix>xsi</schemaInstancePrefix>
      <schemaLocation/>
      <noNamespaceSchemaLocation/>
      <CommitCharge>10</CommitCharge>
      <ThreadCount>4</ThreadCount>
    </Connection>
    Driver can be Thin or OCI

    Schema and Password are the database schema name and password for the JDBC connection

    SouceXML is the name of the file to be processed

    Element is the name of the element which forms the root element of each fragment which is to be loaded.


    Table is the name of the table that the fragment should be inserted into

    SchemaInstancePrefix is the prefix used for the XML schema instance namespace

    noNamespaceSchemaLocation or schemaLocation is the SchemaURL for the target table.


    CommitCharge is how many records a thread should insert before issuing a commit.

    ThreadCount is the number of parallel writer threads that should be started.