This discussion is archived
1 2 Previous Next 16 Replies Latest reply: Jul 25, 2012 7:26 AM by 798480 RSS

Any Event Transformation examples?

769299 Explorer
Currently Being Moderated
I've had a look at the latest Incubator 10 stuff for Coh 3.7. I'd previously posted in a thread (here: Push Replication PublishingTransformer example. about issues I'd run into trying to do this in a prior release of Push Replication. I take it that now most of the core of PR is actually housed in the Event framework, the Transformer should go in there instead?

I notice in the namespace schema for event (http://coherence.oracle.com/display/INC10/event-namespace-schema) that there is an unclickable link "<event:transformer-scheme />". I guess this is when an object of an appropriate type should go now?

My basic requirement remains the same: to "transform" a cached item in a source cache (in a source cluster) to a new format for placement in the destination cache (resident in a different cluster.)

Are there any working examples of how to do this?


Cheers,

Steve
  • 1. Re: Any Event Transformation examples?
    824909 Newbie
    Currently Being Moderated
    Hi ,


    I am searching for any example configuration for FilteringPublishingTransformer as well. I want to purge some of the data in the source cache itself so was looking if transformer can help. e.g I want to remote replicate changes where value or key contains a certain id.

    I am not sure if there is any other interception point where this can be achived. Please share if you got any examples .

    Thanks,
    Tarun
  • 2. Re: Any Event Transformation examples?
    824909 Newbie
    Currently Being Moderated
    The closest I got in using a declarative publisher filter is


    <event:transformer-scheme>
    <event:filtering-transformer-scheme>
    <event:filter>
    <instance:class classname="CustomFilterForReplication"> </instance:class>
    </event:filter>
    </event:filtering-transformer-scheme>
    </event:transformer-scheme>


    import java.io.IOException;

    import com.tangosol.io.pof.PofReader;
    import com.tangosol.io.pof.PofWriter;
    import com.tangosol.io.pof.PortableObject;
    import com.tangosol.util.Filter;


    public class CustomFilterForReplication implements Filter, PortableObject{

         public CustomFilterForReplication()
         {
              
         }
         
         
         @Override
         public boolean evaluate(Object arg0) {
              // TODO Auto-generated method stub
              System.out.println("fdsddsfdsfd");
              return true;
         }


         @Override
         public void readExternal(PofReader arg0) throws IOException {
              // TODO Auto-generated method stub
              
         }


         @Override
         public void writeExternal(PofWriter arg0) throws IOException {
              // TODO Auto-generated method stub
              
         }

    }


    It still fails at runtime with folllowing exception



    Anybody , any idea ?

    Problem : com.oracle.coherence.common.builders.VarArgsParameterizedBuilder@15a94f[<instance:class classname='CustomFilterForReplication'/>]The filter specified in <event:filter>
    <instance:class classname='CustomFilterForReplication'/>
    </event:filter> does not implement the com.tangosol.util.Filter interface
    Advice : Please consult the documentation regarding use of the Push Replication namespace

    at com.oracle.coherence.patterns.eventdistribution.configuration.EventDistributionNamespaceContentHandler$23.onElement(EventDistributionNamespaceContentHandler.java:4

    Edited by: 821906 on Feb 27, 2012 12:59 PM
  • 3. Re: Any Event Transformation examples?
    Brian Oliver Explorer
    Currently Being Moderated
    Hi,

    Yesterday we posted a patch release which updates and simplifies the use of filters with projects like the Event Distribution and Push Replication Pattern.

    Additionally we updated all of the documentation around filters, especially those here:

    [http://coherence.oracle.com/display/INC10/event-transformer-scheme]

    and here:

    [http://coherence.oracle.com/display/INC10/filter-namespace]

    We also added some examples configurations, those of which can be found here:

    [http://coherence.oracle.com/display/INC10/event-filtering-transformer-scheme]

    and furthermore in the functional test source tree of the Push Replication Pattern.

    Hope this helps.

    -- Brian
  • 4. Re: Any Event Transformation examples?
    824909 Newbie
    Currently Being Moderated
    Brian,

    With the updated code and examples - filtering on String based values is working fine.

    I am looking to do filtering of events that will have application objects contained in them .

    So I need to wire a custom filter that will have logic to inspect the contents of the application object.

    While going through the documentation - I come across

    http://coherence.oracle.com/display/INC10/event-entry-filter

    Elements
    One of following elements is required in the event:entry-filter element.

    Element Required/Optional Element Type Default Description
    *<class-scheme> Optional ParameterizedBuilder<Filter> (n/a) An <class-scheme> that produces an Filter to be applied to Entry s.*

    or one of the filters defined by the Filter Namespace.


    Could you please provide an example of wiring a custom filter that is declared as per ParameterizedBuilder<Filter> requirement.


    I used following and it threw error.


    Problem : The child element <class-name>CustomFilterForReplication</class-name> of the <filter:any>
    <class-name>CustomFilterForReplication</class-name>
    </filter:any> element did not produce the expected type interface com.tangosol.util.Filter when processed.
    Advice : Please consult the documentation regarding use of the 'filter' namespace


    Here is the filter I am using - I tries extending from EntryFilter , that doesnt work either.

    import java.io.IOException;
    import java.util.Map.Entry;

    import com.tangosol.io.pof.PofReader;
    import com.tangosol.io.pof.PofWriter;
    import com.tangosol.io.pof.PortableObject;
    import com.tangosol.util.Filter;
    import com.tangosol.util.filter.EntryFilter;


    public class CustomFilterForReplication implements Filter, PortableObject{

         public CustomFilterForReplication()
         {
              
         }
         


         @Override
         public void readExternal(PofReader arg0) throws IOException {
              // TODO Auto-generated method stub
              
         }


         @Override
         public void writeExternal(PofWriter arg0) throws IOException {
              // TODO Auto-generated method stub
              
         }


         



         @Override
         public boolean evaluate(Object arg0) {
              // TODO Auto-generated method stub
              
              System.out.println("Object"+arg0);
              return false;
         }

    }

    Thanks,
    Tarun


    Thanks,
    Tarun
  • 5. Re: Any Event Transformation examples?
    798480 Newbie
    Currently Being Moderated
    Hi,

    Thanks for the update.

    What I was looking for in the documentation is an example (as Steve is too I guess) that uses the class-scheme in the event:transformer-scheme element. I will use this implementation to remove/reset certain attributes on the object that is being distributed. Are there any examples of this kind of setup and a example class that this would work with?

    Any help or advice would be much appreciated.

    Thanks,
    Kunal
  • 6. Re: Any Event Transformation examples?
    798480 Newbie
    Currently Being Moderated
    So, on trying to implement EventIteratorTransformer to make my CustomEventIteratorTransformer I need to override:
    public Iterator<Event> transform(Iterator<Event> arg0)
    Now, time for being really basic:

    From the event object obtained from the iterator where can I find the event information like the event type, the object that will be distributed (i.e. the thing I ultimately want to edit) etc?

    Any help is much appreciated.
    Thanks.
  • 7. Re: Any Event Transformation examples?
    798480 Newbie
    Currently Being Moderated
    I have found an Uppercase transformer in the MutatingTransformer section as shown below. What class does the UppercaseEntryEventTransformer extend/implement? How can it be extended to deal with POF based entries? Is it possible to see the sample code for this transformer?
    <event:transformer-scheme>
       <event:mutating-transformer-scheme>
          <event:event-transformer>
             <class-scheme>
                <class-name>com.oracle.coherence.patterns.eventdistribution.transformers.UppercaseEntryEventTransformer</class-name>
             </class-scheme>
          </event:event-transformer>
       </event:mutating-transformer-scheme>
    <event:transformer-scheme>
  • 8. Re: Any Event Transformation examples?
    Brian Oliver Explorer
    Currently Being Moderated
    Hi Kunal,

    The first thing to remember about Push Replication and Event Transformation is that we're not dealing with Cache Entries. While Cache Entries are inserted, updated and removed from Caches, these are not the things actually replicated or distributed. Instead the "events" that represents what happened to the Cache Entries are replicated and distributed. This consequently means you need to think in terms of filtering/transforming events instead of the entries themselves. For example, updating a Cache Entry produces an UpdateEvent that encapsulates: the key, the previous value, the new value and information about the site (including cluster) on which the event occurred.

    The second thing to remember is that Push Replication and Event Transformation occurs in "batches" and not based on single Events at a time. That is, generally you don't filter or transform an individual event, but instead you filter and transform an entire batch of events. This is to ensure that replication and distribution occurs as efficiently as possible. This is also why you see the interfaces being based on Iterators and EventIterators. ie: You are given an EventIterator containing Events to transform. Your job is to return a new EventIterator, that of which will be consulted to replicate the events.

    In most cases however you may simply want to transform each event. This is easily achieved by declaring a mutating transformer.
    <event:transformer-scheme>
       <event:mutating-transformer-scheme>
          <event:event-transformer>
             <class-scheme>
                <class-name>YOUR-TRANSFORMER-CLASS-NAME-HERE</class-name>
             </class-scheme>
          </event:event-transformer>
       </event:mutating-transformer-scheme>
    <event:transformer-scheme>
    Let's take a look at what this defines.

    i). The "transformer-scheme" defines the Event Iterator Transformer. In this case we want it to be an Event Mutating Transformer Scheme.

    ii). The "mutating-transformer-scheme" is an implementation of an Event Iterator Transformer (as required by the "transformer-scheme").

    This implementation requires an Event Transformer to be provided, that of which can take an individual Event and return a "transformed" Event.

    iii). The "event-transformer" is an implementation of an Event Transformer. This requires your class name, which implements an Event Transformer.

    I hope this helps.

    - Brian
  • 9. Re: Any Event Transformation examples?
    Brian Oliver Explorer
    Currently Being Moderated
    Here is the source code of the String Transformer (from the Push Replication Functional Tests)
    package com.oracle.coherence.patterns.eventdistribution.transformers;
            
    import com.oracle.coherence.common.events.EntryEvent;
    import com.oracle.coherence.common.events.Event;
    
    import com.oracle.coherence.patterns.eventdistribution.EventTransformer;
    import com.oracle.coherence.patterns.eventdistribution.events.DistributableEntryEvent;
    
    /**
     * An {@link UppercaseEntryEventTransformer} transforms {@link EntryEvent} {@link String}-based values to uppercase.
     *
     * @author Brian Oliver
     */
    public class UppercaseEntryEventTransformer implements EventTransformer
    {
        /**
         * {@inheritDoc}
         */
        @Override
        public Event transform(Event event)
        {
            if (event instanceof DistributableEntryEvent)
            {
                DistributableEntryEvent evtDistributable = (DistributableEntryEvent) event;
    
                Object                  oValue           = evtDistributable.getEntry().getValue();
    
                if (oValue != null && oValue instanceof String)
                {
                    String sValue    = (String) oValue;
                    String sNewValue = sValue.toUpperCase();
    
                    evtDistributable.getEntry().setValue(sNewValue);
    
                    return evtDistributable;
                }
                else
                {
                    return event;
                }
            }
            else
            {
                return event;
            }
        }
    }
  • 10. Re: Any Event Transformation examples?
    798480 Newbie
    Currently Being Moderated
    Thanks Brian. That post is very helpful and actually underscores what I have managed to find out from looking at the code in the eventdistribution JAR for EntryOptimizingEventTransformer. The code gave me a nice start and also pointed me in the direction of using the <event:mutating-transformer-scheme>.

    Your code-post also highlights some extra checks that I should be doing on the event before mutating it.

    I will post (later on today) my solution on here as well in case you spot any glaring errors otherwise my prototype seems to be working fine.

    Thanks,
    Ku
  • 11. Re: Any Event Transformation examples?
    798480 Newbie
    Currently Being Moderated
    I managed to get something working on Friday evening. Here is what I got: this is my event transformer and have a question about using POFs within it.
    package com.bnpp.reflex.eventdistribution.transformers;
    
    import com.bnpp.reflex.blotter.RflxTrade;
    import com.oracle.coherence.common.events.Event;
    import com.oracle.coherence.patterns.eventdistribution.EventTransformer;
    import com.oracle.coherence.patterns.eventdistribution.events.DistributableEntry;
    import com.oracle.coherence.patterns.eventdistribution.events.DistributableEntryEvent;
    import com.tangosol.net.BackingMapManagerContext;
    import com.tangosol.util.Converter;
    
    public class RiskRemovalEventTransformer implements EventTransformer {
         /**
          * {@inheritDoc}
          */
         @Override
         public Event transform(Event event) {
              if (event instanceof DistributableEntryEvent) {
                   DistributableEntryEvent distributableEntryEvent = (DistributableEntryEvent) event;
    
                   try // try strip the risk, if it fails we don't want to lose the event
                   {
                        DistributableEntry entry = distributableEntryEvent.getEntry();
                        
                        // remove the original value to make the event lighter upon distribution
                        entry.setOriginalBinaryValue(null);
    
                        BackingMapManagerContext bmContext = entry.getContext();
                        Converter fromInternalValueConverter = bmContext
                                  .getValueFromInternalConverter();
    
                        Object obj = fromInternalValueConverter.convert(entry.getBinaryValue());
    
                        if (obj instanceof RflxTrade) {
                             RflxTrade trade = (RflxTrade) obj;
                             trade.setPV(0);
                             trade.setPV01(0);
    
                             entry.setValue(trade);
    
                             return distributableEntryEvent;
                        } else {
                             return event;
                        }
                   } catch (Exception e) {
                        System.err.println("Whilst stripping risk, Exception: "
                                  + e.getMessage());
                        return event;
                   }
              } else {
                   return event;
              }
         }
    }
    Do I have to do a getContext and create a converter which I use to get the object that I hope to mutate? Is this the most elegant way?
    OR
    Instead of using the converter, should I just use:
    obj = entry.getValue();
    I have implemented the Centralized PR example. On starting up the caches, everything seems to be running fine. When I insert the first value into leaf1 I see it correctly distributed to the hub and then to leaf2 BUT then what happens is that the storage nodes on the hub and leafs continually log information to my logfile (~100 entries/sec). Is this normal? The storage/proxy configs are listed below.
    2012-04-16 16:30:20.609/260911.034 Oracle Coherence GE 3.7.1.0 <D6> (thread=EventChannelController:Thread-5, member=2): Commenced distribution with EventChannel EventChannelController.Identifier{symbolicName=Leaf2Channel, externalName=NYC.hub:cluster:0x8021:reflex-blotter-trade-cache:Leaf2Channel} Class:com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController Method:onDistribute
    2012-04-16 16:30:20.609/260911.034 Oracle Coherence GE 3.7.1.0 <D6> (thread=EventChannelController:Thread-5, member=2): Completed distribution with EventChannelController.Identifier{symbolicName=Leaf2Channel, externalName=NYC.hub:cluster:0x8021:reflex-blotter-trade-cache:Leaf2Channel} Class:com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController Method:onDistribute
    2012-04-16 16:30:20.609/260911.034 Oracle Coherence GE 3.7.1.0 <D6> (thread=EventChannelController:Thread-5, member=2): Commenced distribution with EventChannel EventChannelController.Identifier{symbolicName=Leaf2Channel, externalName=NYC.hub:cluster:0x8021:reflex-blotter-trade-cache:Leaf2Channel} Class:com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController Method:onDistribute
    2012-04-16 16:30:20.609/260911.034 Oracle Coherence GE 3.7.1.0 <D6> (thread=EventChannelController:Thread-5, member=2): Completed distribution with EventChannelController.Identifier{symbolicName=Leaf2Channel, externalName=NYC.hub:cluster:0x8021:reflex-blotter-trade-cache:Leaf2Channel} Class:com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController Method:onDistribute
    h1. HUB
    h3. Proxy
    <?xml version="1.0"?>
    
    <cache-config xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config
         http://xmlns.oracle.com/coherence/coherence-cache-config/1.1/coherence-cache-config.xsd"
         xmlns:event="class://com.oracle.coherence.patterns.eventdistribution.configuration.EventDistributionNamespaceContentHandler"
         xmlns:element="class://com.oracle.coherence.environment.extensible.namespaces.XmlElementProcessingNamespaceContentHandler"
         xmlns:filter="class://com.oracle.coherence.environment.extensible.namespaces.FilterNamespaceContentHandler"
         element:introduce-cache-config="coherence-messagingpattern-cache-config.xml"
    >
    
         <defaults>
              <serializer>
                   <instance>
                        <class-name>com.tangosol.io.pof.ConfigurablePofContext</class-name>
                        <init-params>
                             <init-param>
                                  <param-type>string</param-type>
                                  <param-value>blotter-pof-config.xml</param-value>
                             </init-param>
                        </init-params>
                   </instance>
              </serializer>
         </defaults>
    
         <caching-scheme-mapping>
              <cache-mapping> <!-- This is for the non-distributed caches -->
                   <cache-name>reflex-*</cache-name>
                   <scheme-name>reflex-default</scheme-name>
              </cache-mapping>
              <cache-mapping> <!-- This is for the event replicated cache -->
                   <cache-name>reflex-blotter-trade-cache</cache-name>
                   <scheme-name>reflex-distribution</scheme-name>
              </cache-mapping>
         </caching-scheme-mapping>
    
         <caching-schemes>
    
              <distributed-scheme>
                   <scheme-name>reflex-default</scheme-name>
                   <service-name>ReflexDefault</service-name>
    
                   <!-- serializer block added to default section above -->
                   <backing-map-scheme>
                        <local-scheme />
                   </backing-map-scheme>
    
                   <autostart>true</autostart>
              </distributed-scheme>
    
              <distributed-scheme>
                   <scheme-name>reflex-distribution</scheme-name>
                   <service-name>ReflexDistribution</service-name>
    
                   <!-- serializer block added to default section above -->
    
                   <backing-map-scheme>
                        <read-write-backing-map-scheme>
                             <internal-cache-scheme>
                                  <local-scheme>
                                       <unit-calculator>BINARY</unit-calculator>
                                  </local-scheme>
                             </internal-cache-scheme>
                             <cachestore-scheme>
                                  <class-scheme>
                                       <class-name>com.oracle.coherence.patterns.pushreplication.PublishingCacheStore</class-name>
                                       <init-params>
                                            <init-param>
                                                 <param-type>java.lang.String</param-type>
                                                 <param-value>{cache-name}</param-value>
                                            </init-param>
                                       </init-params>
                                  </class-scheme>
                             </cachestore-scheme>
                        </read-write-backing-map-scheme>
                   </backing-map-scheme>
    
                   <autostart>true</autostart>
              </distributed-scheme>
    
              <proxy-scheme>
                   <service-name>ExtendTcpProxyService</service-name>
                   <thread-count>50</thread-count>
                   <acceptor-config>
                        <tcp-acceptor>
                             <local-address>
                                  <address>localhost</address>
                                  <port>56441</port>
                             </local-address>
                        </tcp-acceptor>
                   </acceptor-config>
                   <autostart>true</autostart>
              </proxy-scheme>
         </caching-schemes>
    </cache-config>
    h3. Storage
    <?xml version="1.0"?>
    
    <cache-config xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config
         http://xmlns.oracle.com/coherence/coherence-cache-config/1.1/coherence-cache-config.xsd"
         xmlns:event="class://com.oracle.coherence.patterns.eventdistribution.configuration.EventDistributionNamespaceContentHandler"
         xmlns:element="class://com.oracle.coherence.environment.extensible.namespaces.XmlElementProcessingNamespaceContentHandler"
         xmlns:filter="class://com.oracle.coherence.environment.extensible.namespaces.FilterNamespaceContentHandler"
         element:introduce-cache-config="coherence-messagingpattern-cache-config.xml"
    >
    
         <defaults>
              <serializer>
                   <instance>
                        <class-name>com.tangosol.io.pof.ConfigurablePofContext</class-name>
                        <init-params>
                             <init-param>
                                  <param-type>string</param-type>
                                  <param-value>blotter-pof-config.xml</param-value>
                             </init-param>
                        </init-params>
                   </instance>
              </serializer>
         </defaults>
    
         <caching-scheme-mapping>
              <cache-mapping> <!-- This is for the non-distributed caches -->
                   <cache-name>reflex-*</cache-name>
                   <scheme-name>reflex-default</scheme-name>
              </cache-mapping>
    
              <cache-mapping> <!-- This is for the event replicated cache -->
                   <cache-name>reflex-blotter-trade-cache</cache-name>
                   <scheme-name>reflex-distribution</scheme-name>
    
                   <event:distributor>
                        <event:distributor-name>{cache-name}</event:distributor-name>
                        <event:distributor-external-name>{site-name}-{cluster-name}-{cache-name}
                        </event:distributor-external-name>
    
                        <event:distributor-scheme>
                             <event:coherence-based-distributor-scheme />
                        </event:distributor-scheme>
                        <event:distribution-channels>
                             <event:distribution-channel>
                                  <event:channel-name>Leaf1Channel</event:channel-name>
                                  <event:starting-mode system-property="channel.starting.mode">enabled
                                  </event:starting-mode>
    
                                  <event:channel-scheme>
                                       <event:remote-cluster-channel-scheme>
                                            <event:remote-invocation-service-name>leaf-site1
                                            </event:remote-invocation-service-name>
                                            <distribution-role>HUB</distribution-role>
                                            <event:remote-channel-scheme>
                                                 <event:local-cache-channel-scheme>
                                                      <event:target-cache-name>{cache-name}
                                                      </event:target-cache-name>
                                                 </event:local-cache-channel-scheme>
                                            </event:remote-channel-scheme>
                                       </event:remote-cluster-channel-scheme>
                                  </event:channel-scheme>
                                  <event:transformer-scheme>
                                       <event:mutating-transformer-scheme>
                                            <event:event-transformer>
                                                 <class-scheme>
                                                      <class-name>com.bnpp.reflex.eventdistribution.transformers.RiskRemovalEventTransformer</class-name>
                                                 </class-scheme>
                                            </event:event-transformer>
                                       </event:mutating-transformer-scheme>
                                  </event:transformer-scheme>
                             </event:distribution-channel>
    
                             <event:distribution-channel>
                                  <event:channel-name>Leaf2Channel</event:channel-name>
                                  <event:starting-mode system-property="channel.starting.mode">enabled
                                  </event:starting-mode>
    
                                  <event:channel-scheme>
                                       <event:remote-cluster-channel-scheme>
                                            <event:remote-invocation-service-name>leaf-site2
                                            </event:remote-invocation-service-name>
                                            <distribution-role>HUB</distribution-role>
                                            <event:remote-channel-scheme>
                                                 <event:local-cache-channel-scheme>
                                                      <event:target-cache-name>{cache-name}
                                                      </event:target-cache-name>
                                                 </event:local-cache-channel-scheme>
                                            </event:remote-channel-scheme>
                                       </event:remote-cluster-channel-scheme>
                                  </event:channel-scheme>
                                  <event:transformer-scheme>
                                       <event:mutating-transformer-scheme>
                                            <event:event-transformer>
                                                 <class-scheme>
                                                      <class-name>com.bnpp.reflex.eventdistribution.transformers.RiskRemovalEventTransformer</class-name>
                                                 </class-scheme>
                                            </event:event-transformer>
                                       </event:mutating-transformer-scheme>
                                  </event:transformer-scheme>
                             </event:distribution-channel>
    
                        </event:distribution-channels>
                   </event:distributor>
              </cache-mapping>
         </caching-scheme-mapping>
    
         <caching-schemes>
              <!-- This is for the standard caches that we will NOT be distributing -->
              <distributed-scheme>
                   <scheme-name>reflex-default</scheme-name>
                   <service-name>ReflexDefault</service-name>
    
                   <!-- serializer block added to default section above -->
    
                   <backing-map-scheme>
                        <local-scheme>
                             <unit-calculator>BINARY</unit-calculator>
                        </local-scheme>
                   </backing-map-scheme>
    
                   <autostart>true</autostart>
              </distributed-scheme>
    
              <!-- The following scheme is required for each remote-site when using a 
                   RemoteInvocationPublisher -->
              <remote-invocation-scheme>
                   <service-name>leaf-site1</service-name>
                   <initiator-config>
                        <tcp-initiator>
                             <remote-addresses>
                                  <socket-address>
                                       <address>localhost</address>
                                       <port>56001</port>
                                  </socket-address>
                             </remote-addresses>
                             <connect-timeout>2s</connect-timeout>
                        </tcp-initiator>
                        <outgoing-message-handler>
                             <request-timeout>5s</request-timeout>
                        </outgoing-message-handler>
                   </initiator-config>
              </remote-invocation-scheme>
    
              <remote-invocation-scheme>
                   <service-name>leaf-site2</service-name>
                   <initiator-config>
                        <tcp-initiator>
                             <remote-addresses>
                                  <socket-address>
                                       <address>localhost</address>
                                       <port>56331</port>
                                  </socket-address>
                             </remote-addresses>
                             <connect-timeout>2s</connect-timeout>
                        </tcp-initiator>
                        <outgoing-message-handler>
                             <request-timeout>5s</request-timeout>
                        </outgoing-message-handler>
                   </initiator-config>
              </remote-invocation-scheme>
    
              <!-- This is for the distributed trade cache that we will be replicating 
                   in ALL sites -->
              <distributed-scheme>
                   <scheme-name>reflex-distribution</scheme-name>
                   <service-name>ReflexDistribution</service-name>
    
                   <!-- serializer block added to default section above -->
    
                   <backing-map-scheme>
                        <read-write-backing-map-scheme>
                             <internal-cache-scheme>
                                  <local-scheme>
                                       <unit-calculator>BINARY</unit-calculator>
                                  </local-scheme>
                             </internal-cache-scheme>
                             <cachestore-scheme>
                                  <class-scheme>
                                       <class-name>com.oracle.coherence.patterns.pushreplication.PublishingCacheStore</class-name>
                                       <init-params>
                                            <init-param>
                                                 <param-type>java.lang.String</param-type>
                                                 <param-value>{cache-name}</param-value>
                                            </init-param>
                                       </init-params>
                                  </class-scheme>
                             </cachestore-scheme>
                        </read-write-backing-map-scheme>
                   </backing-map-scheme>
    
                   <autostart>true</autostart>
              </distributed-scheme>
         </caching-schemes>
    </cache-config>
    h1. LEAF1

    h3. Proxy
    <?xml version="1.0"?>
    
    <cache-config xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config
         http://xmlns.oracle.com/coherence/coherence-cache-config/1.1/coherence-cache-config.xsd"
         xmlns:event="class://com.oracle.coherence.patterns.eventdistribution.configuration.EventDistributionNamespaceContentHandler"
         xmlns:element="class://com.oracle.coherence.environment.extensible.namespaces.XmlElementProcessingNamespaceContentHandler"
         xmlns:filter="class://com.oracle.coherence.environment.extensible.namespaces.FilterNamespaceContentHandler"
         element:introduce-cache-config="coherence-messagingpattern-cache-config.xml"
    >
    
         <defaults>
              <serializer>
                   <instance>
                        <class-name>com.tangosol.io.pof.ConfigurablePofContext</class-name>
                        <init-params>
                             <init-param>
                                  <param-type>string</param-type>
                                  <param-value>blotter-pof-config.xml</param-value>
                             </init-param>
                        </init-params>
                   </instance>
              </serializer>
         </defaults>
    
         <caching-scheme-mapping>
              <cache-mapping> <!-- This is for the non-distributed caches -->
                   <cache-name>reflex-*</cache-name>
                   <scheme-name>reflex-default</scheme-name>
              </cache-mapping>
              <cache-mapping> <!-- This is for the event replicated cache -->
                   <cache-name>reflex-blotter-trade-cache</cache-name>
                   <scheme-name>reflex-distribution</scheme-name>
              </cache-mapping>
         </caching-scheme-mapping>
    
         <caching-schemes>
    
              <distributed-scheme>
                   <scheme-name>reflex-default</scheme-name>
                   <service-name>ReflexDefault</service-name>
    
                   <!-- serializer block added to default section above -->
                   <backing-map-scheme>
                        <local-scheme />
                   </backing-map-scheme>
    
                   <autostart>true</autostart>
              </distributed-scheme>
    
              <distributed-scheme>
                   <scheme-name>reflex-distribution</scheme-name>
                   <service-name>ReflexDistribution</service-name>
    
                   <!-- serializer block added to default section above -->
    
                   <backing-map-scheme>
                        <read-write-backing-map-scheme>
                             <internal-cache-scheme>
                                  <local-scheme>
                                       <unit-calculator>BINARY</unit-calculator>
                                  </local-scheme>
                             </internal-cache-scheme>
                             <cachestore-scheme>
                                  <class-scheme>
                                       <class-name>com.oracle.coherence.patterns.pushreplication.PublishingCacheStore</class-name>
                                       <init-params>
                                            <init-param>
                                                 <param-type>java.lang.String</param-type>
                                                 <param-value>{cache-name}</param-value>
                                            </init-param>
                                       </init-params>
                                  </class-scheme>
                             </cachestore-scheme>
                        </read-write-backing-map-scheme>
                   </backing-map-scheme>
    
                   <autostart>true</autostart>
              </distributed-scheme>
    
              <proxy-scheme>
                   <service-name>ExtendTcpProxyService</service-name>
                   <thread-count>50</thread-count>
                   <acceptor-config>
                        <tcp-acceptor>
                             <local-address>
                                  <address>localhost</address>
                                  <port>56001</port>
                             </local-address>
                        </tcp-acceptor>
                   </acceptor-config>
                   <autostart>true</autostart>
              </proxy-scheme>
         </caching-schemes>
    </cache-config>
    h3. Storage
    <?xml version="1.0"?>
    
    <cache-config xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config
         http://xmlns.oracle.com/coherence/coherence-cache-config/1.1/coherence-cache-config.xsd"
         xmlns:event="class://com.oracle.coherence.patterns.eventdistribution.configuration.EventDistributionNamespaceContentHandler"
         xmlns:element="class://com.oracle.coherence.environment.extensible.namespaces.XmlElementProcessingNamespaceContentHandler"
         xmlns:filter="class://com.oracle.coherence.environment.extensible.namespaces.FilterNamespaceContentHandler"
         element:introduce-cache-config="coherence-messagingpattern-cache-config.xml"
    >
    
         <defaults>
              <serializer>
                   <instance>
                        <class-name>com.tangosol.io.pof.ConfigurablePofContext</class-name>
                        <init-params>
                             <init-param>
                                  <param-type>string</param-type>
                                  <param-value>blotter-pof-config.xml</param-value>
                             </init-param>
                        </init-params>
                   </instance>
              </serializer>
         </defaults>
    
         <caching-scheme-mapping>
              <cache-mapping>
                   <cache-name>reflex-*</cache-name>
                   <scheme-name>reflex-default</scheme-name>
              </cache-mapping>
    
              <cache-mapping> <!-- This is for the event replicated cache -->
                   <cache-name>reflex-blotter-trade-cache</cache-name>
                   <scheme-name>reflex-distribution</scheme-name>
    
                   <event:distributor>
                        <event:distributor-name>{cache-name}</event:distributor-name>
                        <event:distributor-external-name>{site-name}-{cluster-name}-{cache-name}
                        </event:distributor-external-name>
    
                        <event:distributor-scheme>
                             <event:coherence-based-distributor-scheme />
                        </event:distributor-scheme>
                        <event:distribution-channels>
                             <event:distribution-channel>
                                  <event:channel-name>HubChannel</event:channel-name>
                                  <event:starting-mode system-property="channel.starting.mode">enabled
                                  </event:starting-mode>
    
                                  <event:channel-scheme>
                                       <event:remote-cluster-channel-scheme>
                                            <event:remote-invocation-service-name>hub-site
                                            </event:remote-invocation-service-name>
                                            <event:remote-channel-scheme>
                                                 <event:local-cache-channel-scheme>
                                                      <event:target-cache-name>{cache-name}
                                                      </event:target-cache-name>
                                                 </event:local-cache-channel-scheme>
                                            </event:remote-channel-scheme>
                                       </event:remote-cluster-channel-scheme>
                                  </event:channel-scheme>
                                  <event:transformer-scheme>
                                       <event:mutating-transformer-scheme>
                                            <event:event-transformer>
                                                 <class-scheme>
                                                      <class-name>com.bnpp.reflex.eventdistribution.transformers.RiskRemovalEventTransformer</class-name>
                                                 </class-scheme>
                                            </event:event-transformer>
                                       </event:mutating-transformer-scheme>
                                  </event:transformer-scheme>
                             </event:distribution-channel>
    
                        </event:distribution-channels>
                   </event:distributor>
              </cache-mapping>
         </caching-scheme-mapping>
    
         <caching-schemes>
              <!-- This is for the standard caches that we will NOT be distributing -->
              <distributed-scheme>
                   <scheme-name>reflex-default</scheme-name>
                   <service-name>ReflexDefault</service-name>
    
                   <!-- serializer block added to default section above -->
    
                   <backing-map-scheme>
                        <local-scheme>
                             <unit-calculator>BINARY</unit-calculator>
                        </local-scheme>
                   </backing-map-scheme>
    
                   <autostart>true</autostart>
              </distributed-scheme>
    
              <!-- The following scheme is required for each remote-site when using a 
                   RemoteInvocationPublisher -->
              <remote-invocation-scheme>
                   <service-name>hub-site</service-name>
                   <initiator-config>
                        <tcp-initiator>
                             <remote-addresses>
                                  <socket-address>
                                       <address>localhost</address>
                                       <port>56441</port>
                                  </socket-address>
                             </remote-addresses>
                             <connect-timeout>2s</connect-timeout>
                        </tcp-initiator>
                        <outgoing-message-handler>
                             <request-timeout>5s</request-timeout>
                        </outgoing-message-handler>
                   </initiator-config>
              </remote-invocation-scheme>
    
              <!-- This is for the distributed trade cache that we will be replicating 
                   in ALL sites -->
              <distributed-scheme>
                   <scheme-name>reflex-distribution</scheme-name>
                   <service-name>ReflexDistribution</service-name>
    
                   <!-- serializer block added to default section above -->
    
                   <backing-map-scheme>
                        <read-write-backing-map-scheme>
                             <internal-cache-scheme>
                                  <local-scheme>
                                       <unit-calculator>BINARY</unit-calculator>
                                  </local-scheme>
                             </internal-cache-scheme>
                             <cachestore-scheme>
                                  <class-scheme>
                                       <class-name>com.oracle.coherence.patterns.pushreplication.PublishingCacheStore</class-name>
                                       <init-params>
                                            <init-param>
                                                 <param-type>java.lang.String</param-type>
                                                 <param-value>{cache-name}</param-value>
                                            </init-param>
                                       </init-params>
                                  </class-scheme>
                             </cachestore-scheme>
                        </read-write-backing-map-scheme>
                   </backing-map-scheme>
    
                   <autostart>true</autostart>
              </distributed-scheme>
         </caching-schemes>
    </cache-config>
  • 12. Re: Any Event Transformation examples?
    798480 Newbie
    Currently Being Moderated
    Just as further help to anyone else who is looking to do this or having trouble I did also have to add this to via the tangosol override file (tangosol-coherence-override.xml):
    <coherence xmlns="http://schemas.tangosol.com/coherence">
      <configurable-cache-factory-config>
         <class-name>com.oracle.coherence.environment.extensible.ExtensibleEnvironment</class-name>
         <init-params>
              <init-param>
                   <param-type>java.lang.String</param-type>
                   <param-value system-property="tangosol.coherence.cacheconfig">coherence-cache-config.xml</param-value>
              </init-param>
         </init-params>
      </configurable-cache-factory-config>
    </coherence>
    from:
    http://androidyou.blogspot.com/2011/10/coherence-push-replication.html

    and obviously had to include the incubator POF types in my POF config:
    <include>coherence-pof-config.xml</include>
    <include>coherence-common-pof-config.xml</include>
    <include>coherence-commandpattern-pof-config.xml</include>
    <include>coherence-functorpattern-pof-config.xml</include>
    <include>coherence-messagingpattern-pof-config.xml</include>
    <include>coherence-eventdistributionpattern-pof-config.xml</include>
    My classpath references pretty much all the incubator JARs (required for event distribution) as well as my own POF JAR and JAR for the transformer:

    coherence-commandpattern-2.8.4.32329.jar;
    coherence-common-2.2.0.32329.jar;
    coherence-eventdistributionpattern-1.2.0.32329.jar;
    coherence-functorpattern-1.5.4.32329.jar;
    coherence-pushreplicationpattern-4.0.4.32329.jar;
    coherence-messagingpattern-2.8.4.32329.jar;
    geronimo-jms_1.1_spec-1.1.1.jar;
    activemq-core-5.3.1.jar

    from:
    Push Replication - Centralized

    Not sure if there is anything else to add but hope this helps anyone else venturing down the path of event distribution!
  • 13. Re: Any Event Transformation examples?
    Brian Oliver Explorer
    Currently Being Moderated
    Hi Kunal,

    Great to hear you got it working. Here's a few other answers:

    1. Regarding obj = entry.getValue(); That's way better as you don't need to worry about the converters. You're deserializing the object anyway, so you may as well use the built in stuff.

    2. Regarding the logging. The Event Distribution and Push Replication threads by default are "lazy". You rarely will see messages like the ones you've indicated until you start actually distributing/replicating events. Once you do however, especially for the JMS implementation, it polls periodically (based on the batch delay) and thus you see the messages.

    Perhaps we need to make the logging level of those statements a bit finer?

    -- Brian
  • 14. Re: Any Event Transformation examples?
    798480 Newbie
    Currently Being Moderated
    Hi Brian,

    Sorry for the really late reply on this post but I am finding those log messages to be too detailed. My logs are growing to GBs....which makes them very hard to use. Is it possible to move these polling messages to the D9 level so that they can be omitted by setting the log level to 8?

    What do you think?

    Thanks,
    Kunal
1 2 Previous Next

Legend

  • Correct Answers - 10 points
  • Helpful Answers - 5 points