This discussion is archived
3 Replies Latest reply: Nov 19, 2012 5:23 AM by robvarga RSS

Aggregation using InvocationService

846348 Newbie
Currently Being Moderated
Hi All,

I am trying to implement Aggregation using InvocationService ( Invocable Agent ). Invoking an Agent on Proxy nodes which will invoke another agent on all storage nodes and runs the aggregation logic and returns the result to proxy node.

I have implemented the run() on Storage nodes like :

public void run(){

NamedCache cache = CacheFactory.getCache(cacheName);
               DistributedCacheService service =
               (DistributedCacheService) cache.getCacheService();
               int cPartitions = service.getPartitionCount();
               PartitionSet partsProcessed = new PartitionSet(cPartitions);
               PartitionSet partsMember = service.getOwnedPartitions(getLocalMember());

                    Filter partitionedFilter = new PartitionedFilter(applicationFilter, partsMember);

     Set setEntries = cache.entrySet(partitionedFilter);
Object result = aggregate(setEntries);
setResult(result);

}
public Object aggregate(Set entries)}
.... Aggregation Logic.
}
public Member getLocalMember(){
          return CacheFactory.getCluster().getLocalMember();
     }

What I want to know is what are the pros and cons in this implementation ?

1) is the Set returned by cache.entrySet() are mutable / Immutable in this scenario ?
2) is it safe to obtain the entries in this way ?
3) what happens If some other process ( EntryProcessor ) modifies the entries That I have obtained in the Invocable Agent ?
4) what happens if the Partitions moves / lost ?


AFAIK the coherence Aggregator.aggregate(Set entries) method gets a copy of the entries , so , if I run 20 Aggregations in parallel its going to create 20 different copies of the same entries ( I may be wrong ). But I have noticed storage nodes heap is going up very fast when I run multiple Aggregation in parallel. and with coherence Aggregation there is no control over the amount/size of result returned . this will lead to proxy nodes going out of memory.


is it Safe to implement Aggregation using Invocale agents ?

is there any better approach to implement Aggregations ?

Please suggest me if there are any improvements I can make in the above implementation ?

Regards
Srinivas
  • 1. Re: Aggregation using InvocationService
    robvarga Oracle ACE
    Currently Being Moderated
    user6427215 wrote:
    Hi All,

    I am trying to implement Aggregation using InvocationService ( Invocable Agent ). Invoking an Agent on Proxy nodes which will invoke another agent on all storage nodes and runs the aggregation logic and returns the result to proxy node.

    I have implemented the run() on Storage nodes like :

    public void run(){

    NamedCache cache = CacheFactory.getCache(cacheName);
                   DistributedCacheService service =
                   (DistributedCacheService) cache.getCacheService();
                   int cPartitions = service.getPartitionCount();
                   PartitionSet partsProcessed = new PartitionSet(cPartitions);
                   PartitionSet partsMember = service.getOwnedPartitions(getLocalMember());

                        Filter partitionedFilter = new PartitionedFilter(applicationFilter, partsMember);

         Set setEntries = cache.entrySet(partitionedFilter);
    Object result = aggregate(setEntries);
    setResult(result);

    }
    public Object aggregate(Set entries)}
    .... Aggregation Logic.
    }
    public Member getLocalMember(){
              return CacheFactory.getCluster().getLocalMember();
         }

    What I want to know is what are the pros and cons in this implementation ?
    One problem with that implementation is that it does not show that it deals with the fact that partitions can move.

    The invocable on the proxy node should figure out which partitions are where, it should send that partition set along with the invocable sent to the storage node. The invocable on the storage node should send an entry-set request with at least a PartitionedFilter for the requested partition set (possibly intersected with the partition set of local members), because partitions may move, and with this code they can be aggregated any number of times (0, 1 or more!!!) without this code ever noticing. Once the partitions are aggregated, you have to send back which partitions you actually aggregated so that the invocable on the proxy can resend requests for partitions which moved away from the node. Also, they invocable on the node must be aware that some partitions may not be owned at the moment. In that case it would have to wait (repeating waits for a certain short period of time) for the partition to turn up somewhere.


    Another problem is that this approach due to it using entrySet which will return deserialized data cannot take advantage of POF extraction, or indexes for doing the EntryAggregator.aggregate() step (it can leverage indexes within the filter if you sent the filter along).

    Also, ny code which tries to cast the Entry objects to BinaryEntry (which some out-of-the-box aggregators may do) would blow up with ClassCastException.
    1) is the Set returned by cache.entrySet() are mutable / Immutable in this scenario ?
    Most likely mutable, but the mutation does not get sent back to the cache.
    2) is it safe to obtain the entries in this way ?
    Depends on what you mean safe. They cannot be cast to BinaryEntry while entries normally passed to an aggregator can.
    3) what happens If some other process ( EntryProcessor ) modifies the entries That I have obtained in the Invocable Agent ?
    Nothing else sees these entries, they are created when deserializing the result of the entrySet() call.
    4) what happens if the Partitions moves / lost ?
    If it is lost, you cannot really do much about it (not even notice it with this code).

    If they move, as said above, they can be doubly aggregated or not aggregated. See above for more details.


    >
    AFAIK the coherence Aggregator.aggregate(Set entries) method gets a copy of the entries
    It gets an entry with the same binary references and some additional stuff internally. It does not get the backing map entry instance.
    so , if I run 20 Aggregations in parallel its going to create 20 different copies of the same entries ( I may be wrong ).
    Correct.
    But I have noticed storage nodes heap is going up very fast when I run multiple Aggregation in parallel. and with coherence Aggregation there is no control over the amount/size of result returned .
    This is not exactly true. With a real aggregation your code is in control of the returned result, as it is the one which instantiates it. The result then gets serialized and deserialized. On the other hand, you can do quite a bit of cheating leveraging the fact that the entry-aggregator and the filter instance is not going to be serialized to be sent to the local node, so you can smuggle in a reference and notice its absence if it is not serialized allowing you not to do anything for partitions which do not reside on the caller node (because they moved between the getOwnedPartitions and the sending of your aggregate/entrySet call). This way, you can return data without serializing/deserializing it from the filter/aggregator to the caller code if it is on the local node by putting it into a smuggled in collection reference, for example. Just don't smuggle out objects which Coherence also holds on to as it can load to a memory leak or data corruption if you also hold on to it, possibly mutate it, etc.

    Just be aware, that data in the backing map for which keys are passed to you in applyIndex are not protected from mutation while you are in the applyIndex method, so you should not examine the backing values within applyIndex.
    this will lead to proxy nodes going out of memory.
    If you return too much data to it from the storage nodes, it can. You should size your proxies appropriately, and also think about the amount of data you return from the storage nodes, and how many concurrent "aggregations" you allow at the same time.

    On the other hand, the footprint of this pattern on the proxy node is not so much different from the footprint of the normal aggregations. The difference is the footprint on the storage nodes, which would be much worse due to entrySet used instead of aggregate.


    >
    is it Safe to implement Aggregation using Invocale agents ?
    Yes, with the above mentioned safeguards. Whether it is worth doing it is another question. Why don't you just do a partition-by-partition aggregation with a PartitionedFilter from the proxy node?


    is there any better approach to implement Aggregations ?
    Out-of-the-box aggregations? :)


    The only reason I see for doing aggregation via InvocationService is when you want to have more parallelization than what your thread-count on the partitioned service provides. But that would need to take this a bit further as it would need to do direct backing map access as the above solution still would go through the distributed cache service worker threads so suffer the same constraint on thread-count (and actually there would be only a single thread executing the aggregation per node), and since it would do direct backing map access, you would need to build a solution which can indicate to you that the partition you are aggregating actually moved while you were aggregating it so you can discard your results for that partition and reaggregate that partition again (or indicate that you did not process it so it can be rerequested). Also, such out-of-service "aggregation" would not be able to take advantage of indexes as it would not have the necessary synchronization for it.

    Best regards,

    Robert

    Edited by: robvarga on Nov 16, 2012 6:39 PM
  • 2. Re: Aggregation using InvocationService
    Jonathan.Knight Expert
    Currently Being Moderated
    robvarga wrote:
    AFAIK the coherence Aggregator.aggregate(Set entries) method gets a copy of the entries
    It gets an entry with the same binary references and some additional stuff internally. It does not get the backing map entry instance.
    so , if I run 20 Aggregations in parallel its going to create 20 different copies of the same entries ( I may be wrong ).
    Correct.
    I think this might be confusing - I certainly could read it one of two ways. I think what Rob is saying is that you get 20 instances of the entries - that is 20 sets of instances of the Map.Enty implementation but these entries all use the same keys and values so Coherence is not creating new copies of the underlying cache data to pass to each aggregator.

    As Rob alludes to, when you run an aggregator your memory usage can go up for a number of reasons, for example if your Filters cause the entries to be deserialized (e.g. especially if they are reflection filters/extractors) or the aggregator causes the entries to be deserialized, then you also have the results of the aggregator to build.

    Also, I would agree that I can't see why you would want to implement your own method of running aggregators. Coherence under the covers does a lot of work to make sure the aggregator runs accurately - for example dealing with partition moves as Rob has already mentioned. There are other things to be aware of too, for example, you have already noticed heap usage goes up the more aggregators you run in parallel so obviously you would want to do something to limit this. As you understand more about how Coherence works and how it deals with fail over you realise that there are a lot of edge cases that your own aggregator framework would have to deal with. It is easy to build something that works for a happy stable cluster but will go wrong in a number of ways when there is a problem.

    JK
  • 3. Re: Aggregation using InvocationService
    robvarga Oracle ACE
    Currently Being Moderated
    Jonathan.Knight wrote:
    robvarga wrote:
    AFAIK the coherence Aggregator.aggregate(Set entries) method gets a copy of the entries
    It gets an entry with the same binary references and some additional stuff internally. It does not get the backing map entry instance.
    so , if I run 20 Aggregations in parallel its going to create 20 different copies of the same entries ( I may be wrong ).
    Correct.
    I think this might be confusing - I certainly could read it one of two ways. I think what Rob is saying is that you get 20 instances of the entries - that is 20 sets of instances of the Map.Enty implementation but these entries all use the same keys and values so Coherence is not creating new copies of the underlying cache data to pass to each aggregator.
    Yep, it was a bit confusingly worded. What I meant was:

    In the backing map you have some backing map Entry objects which reference the internal (Binary) version of the key and the cached value.

    The entry objects you get in the EntryAggregator.aggregate() method of the parallel aggregator, are NOT these Entry objects. What you do get in that collection are instances of a class (which class this is is not part of the API, may change over time) implementing the BinaryEntry interface (which the original backing map Entry usually does not) which reference the same internal (Binary) version of the key and the value.
    So the binary key and value objects in the received BinaryEntry instance are the same instances which are held in the backing map, but the BinaryEntry instances are created just for your aggregate method call, they are not shared with any other thread and they are discarded after your aggregate request has finished processing. So the BinaryEntry instances you get in the collection are practically shallow copies (of a differing type) of the backing map entry objects.

    Best regards,

    Robert

Legend

  • Correct Answers - 10 points
  • Helpful Answers - 5 points