This discussion is archived
2 Replies Latest reply: Feb 16, 2011 1:02 PM by Bob Hanckel RSS

Push Replication PublishingTransformer example.

769299 Explorer
Currently Being Moderated
Hi,

As I couldn't find any examples in the Push Replication docs or examples, I've done the best I can (guessing) as to what is needed for a "PublishingTransformer". I'm basically looking to extract objects from a source cluster, but convert them to an alternative object format before placing them in a cache on a second cluster. Everything in both clusters is in POF format.

Anyway, Here's my code that extracts a TradingPosition and converts it to a SimplePosition before it's published to the destination cache. I'm not too bothered about those objects, just someone confirming that the basic approach I've taken is correct. I've tested this with the FilePublisher, just so I can check the output. It seems to run OK. So, does the following look alright:

package com.csg.gpc.coherence.processor;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import com.csg.gpc.domain.model.position.simple.SimplePosition;
import com.csg.gpc.domain.model.position.trading.TradingPosition;
import com.oracle.coherence.patterns.pushreplication.EntryOperation;
import com.oracle.coherence.patterns.pushreplication.PublishableEntry;
import com.oracle.coherence.patterns.pushreplication.PublishingTransformer;
import com.tangosol.net.BackingMapManagerContext;
import com.tangosol.util.Binary;
import com.tangosol.util.Converter;

public class TradingPositionPublishingTransformer implements
          PublishingTransformer {
     
     @Override
     public Iterator<EntryOperation> transform(String cacheName, String publisherName,
               Iterator<EntryOperation> entryOperations) {

          List<EntryOperation> savedOperations = new ArrayList<EntryOperation>();
          
          while (entryOperations.hasNext()) {
               // Get the next Entry Operation and the Entry associated with it.
               EntryOperation entryOp = entryOperations.next();
               PublishableEntry pe = entryOp.getPublishableEntry();
               
               // Get the Backing Map and Converters associated with the Entry.
BackingMapManagerContext bmContext = pe.getContext();
Converter fromInternalConverter = bmContext.getValueFromInternalConverter();
Converter toInternalConverter = bmContext.getValueToInternalConverter();

// Get a Domain object (Position) by converting the Entry from the Map's
// binary format. Convert the Position to a new Position type.
TradingPosition tp = (TradingPosition)fromInternalConverter.convert(pe.getBinaryValue());
SimplePosition sp = new SimplePosition(tp.getKey(), tp.getNetPosition(), tp.getAveragePrice());

// Create a new PublishableEntry using the new Position type.
PublishableEntry npe = new PublishableEntry(
          pe.getBinaryKey(),
          (Binary)toInternalConverter.convert(sp),
          pe.getOriginalBinaryValue(),
          bmContext
          );

// Create a new Entry Operation using the new Position object type
// and add it to the outbound List.
EntryOperation newEntryOp = new EntryOperation(
          entryOp.getSiteName(),
          entryOp.getClusterName(),
          entryOp.getCacheName(),
          entryOp.getOperation(),
          npe
          );

savedOperations.add(newEntryOp);
          }
          
          return savedOperations.iterator();
     }
}


EDIT: I've found my example code above works OK on "Insert" operations on the source cache, but fails on "Delete" operations when the source cache is cleared. It's the PR class "EntryOperationProcessor" that fails on line 177, with a null pointer reference. This line in the source:

String originSiteName = (String) srcDecorations
.get(PublishingCacheStore.SourceClusterGlobalNameDecorationKey);

I simply copy the original entry's "sitename", and I also print this out to make sure it's set, so I don't think it's that the name is missing, but something to do with the "srcDecorations.get()" call. I've no idea what's going on here. Maybe Brian or someone else who has done this can comment?

Cheers,

Steve

Edited by: stevephe on 15-Feb-2011 04:29
  • 1. Re: Push Replication PublishingTransformer example.
    769299 Explorer
    Currently Being Moderated
    Well, got the code working including Delete in the end by simply doing no translation on the source EntryOperation when a delete was detected. As the source and target types share the same composite key format, I can get away with this. Not sure what you'd do in the case where your transformed objects had a different key type? I wasted hours yesterday trying to get something sensible happening during deletes, but hit a brick wall; just leaving the EntryOperation well alone is the only way I could get it to work.

    Anyway, updated fully-working code code if it's of use to someone. Or if someone from Oracle ever visits these forums and could pass some advice on the approach...

    package com.csg.gpc.coherence.processor;

    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;

    import com.csg.gpc.domain.model.position.simple.SimplePosition;
    import com.csg.gpc.domain.model.position.trading.TradingPosition;
    import com.oracle.coherence.patterns.pushreplication.EntryOperation;
    import com.oracle.coherence.patterns.pushreplication.PublishableEntry;
    import com.oracle.coherence.patterns.pushreplication.PublishingTransformer;
    import com.tangosol.net.BackingMapManagerContext;
    import com.tangosol.util.Binary;
    import com.tangosol.util.Converter;

    public class TradingPositionPublishingTransformer implements
              PublishingTransformer {

         @Override
         public Iterator<EntryOperation> transform(String cacheName,
                   String publisherName, Iterator<EntryOperation> entryOperations) {

              List<EntryOperation> savedOperations = new ArrayList<EntryOperation>();

              while (entryOperations.hasNext()) {
                   // Get the next Entry Operation and the Entry associated with it.
                   EntryOperation entryOp = entryOperations.next();

                   PublishableEntry newEntry = null;
                   EntryOperation newEntryOp = null;
                   switch (entryOp.getOperation()) {

                        // If the operation is a Delete, we can use the entry "as is", as
                        // the incoming Position contains the key of the destination
                        // object to delete.
                        case Delete:
                             newEntryOp = entryOp;
                             break;
         
                        // If the operation is an Insert or Update, we need to convert the
                        // incoming object to the destination format.
                        case Insert:
                        case Update: {
                             PublishableEntry entry = entryOp.getPublishableEntry();
                             BackingMapManagerContext bmContext = entry.getContext();
                             Converter fromInternalValueConverter =
                                  bmContext.getValueFromInternalConverter();
                             Converter toInternalValueConverter =
                                  bmContext.getValueToInternalConverter();

                             // Get a Domain object (Position) by converting the Entry from
                             // the Map's binary format.
                             TradingPosition tp = (TradingPosition) fromInternalValueConverter
                                       .convert(entry.getBinaryValue());
         
                             // Convert the Position to a new Position type for insertion
                             // into the remote cluster.
                             SimplePosition sp = new SimplePosition(tp.getKey(), tp
                                       .getNetPosition(), tp.getAveragePrice());
         
                             // Create a new PublishableEntry using the new Position type.
                             newEntry = new PublishableEntry(entry.getBinaryKey(),
                                       (Binary) toInternalValueConverter.convert(sp), entry
                                                 .getOriginalBinaryValue(), entry.getContext());

                             // Create an Entry Operation using the new Position object type.
                             newEntryOp = new EntryOperation(entryOp.getSiteName(),
                                       entryOp.getSourceClusterGlobalName(),
                                       entryOp.getCacheName(), entryOp.getOperation(), newEntry);
                             
                             break;
                        }
                   }

                   // Add the Entry Operation to the List that will be returned by this method.
                   savedOperations.add(newEntryOp);
              }

              return savedOperations.iterator();
         }
    }
  • 2. Re: Push Replication PublishingTransformer example.
    Bob Hanckel Explorer
    Currently Being Moderated
    Hi Stevephe,

    Sorry for not responding, we have been heads down working on a tight dev schedule and
    I'll give folks a headsup to check on this forum.

    I would have to look at this in detail but my guess is that the code was trying to extract
    decorations from a deleted entry whose value no longer existed. We will look over the
    code but it sounds like you found a bug.

    Regards,

    Bob

Legend

  • Correct Answers - 10 points
  • Helpful Answers - 5 points