7 Replies Latest reply: Mar 28, 2013 11:20 AM by ggarciao.com RSS

    Passing parameters to an Aggregator

    ggarciao.com
      I'm using an aggregator to processing some data in my cluster, but this process depends on a 'context'. For example, an aggregator needs to know which tenant and which user is running the aggregation.

      To pass this context to the aggregator I'm using a serializable class member on the Aggregator class like this:
      public class MyAggregator<D , R> extends AbstractAggregator {
      
              /**
               * This is the accumulator / aggregator. It is transient to avoid passing it to another cluster member
               */
           private transient Map<String, List<R>> aggregators;
      
           /**
            * Identifier of tenant. It should be pass it to all the cluster members
            */
           private String tenantId;
      
           /**
            * Username. It should be pass it to all the cluster members
            */
           private String username;
      
              // Rest of the aggregator
      }
      In my first version, I was setting the 'context' (tenant/username) using the constructor of the aggregator, but when I ran this aggregator I got a NoSuchMethodException because my class did not have a constructor with no parameters. In my second version I've added the no-args constructor but now I'm getting a NullPointerException because my tenant/username are null.

      So, here are my questions?
      * Why coherence needs a no-args contructor in the aggregator? I was expecting coherence to serialize my Aggregator (with all the serializable class members) and then pass it to another cluser member. So why it needs to construct a new aggregator? NOTE: if you see my example code, you will see that the aggregator is actually transient to avoid passing it to another cluster member (I got this from the book Oracle Coherence 3.5 by Aleksander Seovic).

      * If this does not work ... How can I pass parameters to my aggregators? I cannot run the aggregator "anonymously" ... The elements to be aggregated depends on the tenant and the user.

      Thanks in advance,
        • 1. Re: Passing parameters to an Aggregator
          Jonathan.Knight
          Hi,

          You need to serialize the fields in your Aggregator class. Assuming you are using POF serialization, the easiest way to do this is use the POF annotations like this...
          @com.tangosol.io.pof.annotation.Portable
          public class MyAggregator<D , R> extends AbstractAggregator {
           
                  /**
                   * This is the accumulator / aggregator. It is transient to avoid passing it to another cluster member
                   */
               private transient Map<String, List<R>> aggregators;
           
               /**
                * Identifier of tenant. It should be pass it to all the cluster members
                */
                  @com.tangosol.io.pof.annotation.PortableProperty(value = 1)
               private String tenantId;
           
               /**
                * Username. It should be pass it to all the cluster members
                */
                  @com.tangosol.io.pof.annotation.PortableProperty(value = 2)
               private String username;
           
                  // Rest of the aggregator
          }
          Then in your POF configuration file something like this...
          <user-type>
              <type-id>1001</type-id>
              <class-name>com.some.package.name.MyAggregator</class-name>
          </user-type>
          Now Coherence will properly serialize your Aggregator including the fields.

          If you don't know about POF or about serialization in Coherence then you should read up on it in the documentation as it is very important.
          http://docs.oracle.com/cd/E24290_01/coh.371/e22837/api_serialize.htm#CACCJHCB

          JK
          • 2. Re: Passing parameters to an Aggregator
            user639604
            Coherence need the empty constructor for the POF de-serialization.

            You can just put both constructor in your class and use the one with two String input parameter when you instantiate the object. However, you will need to implement the readExternal and writeExternal (assume you have POF configured) method to pass the tenantId and username.

            Something like this.
            public class MyAggregator<D, R> extends AbstractAggregator
            {
            
                /**
                 * This is the accumulator / aggregator. It is transient to avoid passing it
                 * to another cluster member
                 */
                private transient Map<String, List<R>> aggregators;
            
                /**
                 * Identifier of tenant. It should be pass it to all the cluster members
                 */
                private String tenantId;
            
                /**
                 * Username. It should be pass it to all the cluster members
                 */
                private String username;
            
                public MyAggregator(String tenantId, String username)
                {
                    this.tenantId = tenantId;
                    this.username = username;
                }
                
                public MyAggregator()
                {
                }
            
                @Override
                public void readExternal(
                    PofReader in)
                    throws IOException
                {
                    this.tenantId = in.readString(1000);
                    this.username = in.readString(1001);
                    super.readExternal(in);
                }
            
                @Override
                public void writeExternal(
                    PofWriter out)
                    throws IOException
                {
                    super.writeExternal(out);
                    out.writeString(1000, tenantId);
                    out.writeString(1001, username);
                }
            
                // Rest of the aggregator
            }
            • 3. Re: Passing parameters to an Aggregator
              ggarciao.com
              Thanks for your helpful remarks!

              But I cannot afford implementing POF right now because I need a serializer for each 'user-type'/custom type related to the cache service. And what I'm keeping in the required cache service is a "big" custom data model.

              I strongly agree with you that this POF is a MUST HAVE on coherence, and we will have it.

              But for the moment, we are cosidenring using two options:
              * Make work the standard java serialization even for agregators
              * Using an open source lib called 'gridkit' that have an AutoPofSerializer (http://code.google.com/p/gridkit/wiki/AutoPofSerializer)

              But regarding the first option I've a question for you: If I've disabled POF in my cluster (no pof system property, no pof default serializer in the cache config, no pof serializer in a cache service):
              * Why the standard serialization is not working for aggregators?
              * You said that coherence ask me for a no-args constructor because pof needs it, but why is my cluster using POF?

              We are considering the second option because activating a POF serializer on the cache service where the aggregator should run requires a serializer for each custom type stored in the cache (we use PofSerializer because we cannot change the data objects). But what if there is a way to activate POF for some objects only (i.e. the aggregator)? or there is a default pof serializer that I can use for custom objects?

              Edited by: ggarciao.com on Mar 20, 2013 7:44 AM
              • 4. Re: Passing parameters to an Aggregator
                alexey.ragozin
                Hi,

                Take a look at com.tangosol.io.pof.SafeConfigurablePofContext
                It is an extension of default Pof serializer which will fallback to Java serialization if object does not implement POF.

                PS
                I have designed AutoPofSerializer with exactly your case in mind - postpone writing serialization logic for large domain model - compared to SafeConfigurablePofContext it should offer better space efficiency and performance.

                Regards,
                Alexey
                • 5. Re: Passing parameters to an Aggregator
                  ggarciao.com
                  Alexey,

                  I'm now using the AutoPofSerializer and everything is working!! Thanks a lot for the hint!

                  As information, is important to stress that the AutoPofSerializer do not serialize Aggregators/Processors because they implement the interface PortableObject (and AutoPofSerializer do not use the ReflectionPofSerializer for PortableObjects )

                  But this was easily solved putting my aggregator in the pof-config.xml to use the ReflectionPofSerializer like this:
                  <user-type>
                       <type-id>1001</type-id>
                       <class-name>com.exmaple.MyAggregator</class-name>
                       <serializer>
                            <class-name>org.gridkit.coherence.utils.pof.ReflectionPofSerializer</class-name>
                       </serializer>
                  </user-type>
                  NOTE: Remember to set the system variable 'gridkit.auto-pof.max-config-id' to a number bigger than 1000 to allow the AutoPofSerializer read this configuration.

                  The only thing that is a 'shame' is that the AutoPofSerializer does not support inheritance of classes and due to this, you need to add an entry in the pof-config for each aggregator/processor instead of adding a single entry for the class com.tangosol.util.aggregator.AbstractAggregator (for example).

                  I've checked the code of the AutoPofserializer and adding this feature does not seems very hard ... what do you think Alexey? i'm missing something here?

                  Thanks a lot again!

                  Edited by: ggarciao.com on Mar 28, 2013 10:47 AM
                  • 6. Re: Passing parameters to an Aggregator
                    alexey.ragozin
                    Hi Guillermo,

                    Thank you, for feedback.

                    I have better idea.

                    AutoPofSerializer will use readExternal / writeExternal methods if
                    - object is PortableObject and it overrides readExternal and writeExternal methods
                    - object is PortableObject and it declares not field and it super class eligible for POF serialization (according to these rules)

                    Otherwise it will fallback to ReflectionPofSerialzer

                    By these rules, your custom aggregators will either use ReflectionPofSerializer automatically.
                    At the same time "true" PortableObject will still use POF serialization.

                    Of cause this rules could also be wrong in certain cases, but it should be more convenient for typical test like yours.

                    New rules are in trunk and you can try them out.

                    Regards,
                    Alexey
                    • 7. Re: Passing parameters to an Aggregator
                      ggarciao.com
                      Excellent! I'll take a look at it ...

                      Keep the good work

                      Cheers,