3 Replies Latest reply: Mar 28, 2012 4:48 AM by robvarga RSS

    Does CompositeAggregator execute aggregator sequentially

    925966
      Hi,

      I am using CompositeAggregator for summing up records per field, more than 10 fields. I have sth like this, but the performance is so poor.
      I wonder whether CompositeAggregator execute aggregator sequentially or parallel?
      Should I implement customAggregator myself? Any example for implementing one will be helpful. Thanks.

      Henry

                     CompositeAggregator composite = CompositeAggregator.createInstance(
                          new InvocableMap.EntryAggregator[]
                          {
                          new BigDecimalSum("getTradeDayQty"),
                          new BigDecimalSum("getActualUavalQty"),                    
                          new BigDecimalSum("getTradeDayTotalCost"),
                          new BigDecimalSum("getBuyCurrSetlRiskAmt"),                    
                          new BigDecimalSum("getBuyFutSetlRiskAmt"),
                          new BigDecimalSum("getSellCurrSetlRiskAmt"),                    
                          new BigDecimalSum("getSellCurrSetlRiskOffsetAmt"),
                          new BigDecimalSum("getSellFutSetlRiskAmt"),                    
                          new BigDecimalSum("getSellFutSetlRiskOffsetAmt"),
                          new BigDecimalSum("getBuyOsQty"),                    
                          new BigDecimalSum("getBuyExeQty"),
                          new BigDecimalSum("getBuyExeGrossConsdAmt"),                    
                          new BigDecimalSum("getSellOsQty"),
                          new BigDecimalSum("getSellExeQty"),
                          new BigDecimalSum("getSellExeGrossConsdAmt"),
                          new BigDecimalSum("getTradeLimitUsedAmt"),
                          new BigDecimalSum("getCrLimitUsedAmt"),
                          new BigDecimalSum("getBuyOsAmt"),                    
                          new BigDecimalSum("getSellOsAmt")               
                          });
        • 1. Re: Does CompositeAggregator execute aggregator sequentially
          robvarga
          922963 wrote:
          Hi,

          I am using CompositeAggregator for summing up records per field, more than 10 fields. I have sth like this, but the performance is so poor.
          I wonder whether CompositeAggregator execute aggregator sequentially or parallel?
          Should I implement customAggregator myself? Any example for implementing one will be helpful. Thanks.

          Henry

                         CompositeAggregator composite = CompositeAggregator.createInstance(
                              new InvocableMap.EntryAggregator[]
                              {
                              new BigDecimalSum("getTradeDayQty"),
                              new BigDecimalSum("getActualUavalQty"),                    
                              new BigDecimalSum("getTradeDayTotalCost"),
                              new BigDecimalSum("getBuyCurrSetlRiskAmt"),                    
                              new BigDecimalSum("getBuyFutSetlRiskAmt"),
                              new BigDecimalSum("getSellCurrSetlRiskAmt"),                    
                              new BigDecimalSum("getSellCurrSetlRiskOffsetAmt"),
                              new BigDecimalSum("getSellFutSetlRiskAmt"),                    
                              new BigDecimalSum("getSellFutSetlRiskOffsetAmt"),
                              new BigDecimalSum("getBuyOsQty"),                    
                              new BigDecimalSum("getBuyExeQty"),
                              new BigDecimalSum("getBuyExeGrossConsdAmt"),                    
                              new BigDecimalSum("getSellOsQty"),
                              new BigDecimalSum("getSellExeQty"),
                              new BigDecimalSum("getSellExeGrossConsdAmt"),
                              new BigDecimalSum("getTradeLimitUsedAmt"),
                              new BigDecimalSum("getCrLimitUsedAmt"),
                              new BigDecimalSum("getBuyOsAmt"),                    
                              new BigDecimalSum("getSellOsAmt")               
                              });
          Hi Henry,

          I believe aggregators always execute on a single thread, that of course means one thread per storage node involved. Some out of the box aggregators (those subclassing AbstractAggregator) would actually break if they were executing on multiple threads as they are not thread-safe.

          Also, CompositeAggregator iterates its aggregators one after the other, as EntryAggregator does not have any other methods. Again, that is driven on the storage node, so composite aggregator does not send aggregators on the network one by one, only aggregates the parallel parts of the aggregators one after the other, that being the parallel part of the composite aggregator.

          If you want to write a composite aggregator which iterates only once, then you have to have an additional method which is able to incrementally aggregate a single entry and do the iteration outside of the composed aggregators. AbstractAggregator is a good step in that direction, but unfortunately its relevant methods (init, process/processEntry, finalizeResults) are protected.

          So first of all look at the documentation (preferably the book as it is much more informative on that class) and understand how those three methods work. Then subclass AbstractAggregator so that those methods (init, processEntry, finalizeResults) are public (or at least accessible in the package of the subclass). Now you can write your own CompositeAggregator which relies on passed in aggregators subclassing that subclass of yours and hence having access to the single-entry methods, too, so that it can delegate to them one by one. Easiest would be if this composite aggregator would itself be an AbstractAggregator subclass so that you don't have to reproduce the contract of AbstractAggregator towards those methods (but you would want to delegate processEntry() instead of process()).

          Alternatively, instead of having a subclass for the aggregators, you may want to use reflection to make the protected methods accessible, as that would provide you a way to also use the out-of-the-box aggregators, too, most of which subclass AbstractAggregator.

          Best regards,

          Robert

          Edited by: robvarga on Mar 27, 2012 12:06 PM
          • 2. Re: Does CompositeAggregator execute aggregator sequentially
            925966
            Hi Robert,

            thanks for response. So do you mean that on one storage node, one thread run the aggregators one by one sequentially? Ie. after calculating BigDecimalSum("getTradeDayQty"), then BigDecimalSum("getActualUavalQty")...

            Henry
            • 3. Re: Does CompositeAggregator execute aggregator sequentially
              robvarga
              922963 wrote:
              Hi Robert,

              thanks for response. So do you mean that on one storage node, one thread run the aggregators one by one sequentially? Ie. after calculating BigDecimalSum("getTradeDayQty"), then BigDecimalSum("getActualUavalQty")...

              Henry
              Hi Henry,

              In short:

              Yes, on each targeted node one thread runs the CompositeAggregator on that node's share of data. Each of the targeted storage nodes execute the aggregator independently of each other, possibly in parallel to other storage nodes (if there are no unfinished earlier requests).

              In a bit more detail:

              The CompositeAggregator itself is what executes the individual aggregators one after the other. All what the storage node sees of this is that there is an aggregator (CompositeAggregator), so it even if it wanted to execute something in parallel, it does not know what to.

              Also, EntryAggregator does not mandate the aggregator to be multi-threading capable, and indeed, most AbstractAggregator subclasses are not, because they store interim values in the aggregator member variables. Therefore Coherence also cannot parallelize aggregator execution to multiple threads per node with each thread processing different partitions, because to do that safely it would have to deserialize the aggregator multiple times.


              Best regards,

              Robert