9 Replies Latest reply: Nov 11, 2012 6:58 AM by robvarga RSS

    Best practices for mass invocation

    AntonZ
      Hi.

      I have some custom EntryProcessor that updates entries based on the passed parameter: MyCustomEP(object UpdateTo).

      What's fastest way to execute it again 1mln objects in my cache?

      Currently by executing it in something like (.Net in this case)

      Parallel.ForEach<object>(AllObjects,
      delegate(object CurrentObject)
      {
      result = MyCache.Invoke(CurrentObject.ID, new MyCustomEP(somevalue));
      }
      I can get stuff to run, but this is still very slow when used against big number of objects in the cache.

      Am I missing something and there are better ways to run pairs of CacheKeys and EPs?
        • 1. Re: Best practices for mass invocation
          robvarga
          AntonZ wrote:
          Hi.

          I have some custom EntryProcessor that updates entries based on the passed parameter: MyCustomEP(object UpdateTo).

          What's fastest way to execute it again 1mln objects in my cache?

          Currently by executing it in something like (.Net in this case)

          Parallel.ForEach<object>(AllObjects,
          delegate(object CurrentObject)
          {
          result = MyCache.Invoke(CurrentObject.ID, new MyCustomEP(somevalue));
          }
          I can get stuff to run, but this is still very slow when used against big number of objects in the cache.

          Am I missing something and there are better ways to run pairs of CacheKeys and EPs?
          Why don't you use the InvocableMap.invokeAll(Collection, EntryProcessor) method? That lets you send the same entryprocessor to all the specified keys. Sending it to 1 million keys at once may not be a good idea, it is probably better to separate it yourself to smaller chunks, e.g. 1k-10k-50k-100k keys at the same time, but you should measure what happens, none the less.

          Best regards,

          Robert
          • 2. Re: Best practices for mass invocation
            AntonZ
            Each invocation has a different parameter passed to Entry Processor.

            So say, I have a very volatile field on my object (the object in turn is relatively big, with let's say 500 fields on it), for instance stock price field, which changes for example few times a second (while other parts of object are updated as well, with much smaller frequency buth all fields gets various updates every 5 minute let's say). How would you suggest to approach such update?

            Currently I am essentially doing it through Custom Entry Processor, where cache.Invoke(ID, MyEntryProcessor(80)) updates object with id=ID, setting stock price to 80. (Since I have few checks before updating in my real use case, I do it through Entry Processor instead of PofUpdater or alike). It works fine for one update, but for thousands and more, it gets too slow.
            • 3. Re: Best practices for mass invocation
              Jonathan.Knight
              Hi,

              Assuming you are doing all the updates from a single client then you are going to be limited by how long it takes for an individual entry processor to do the update; it comes down simple queuing theory. If the updates (i.e. price ticks) are coming in faster than the entry processor can do a single update then you are going to fall behind.

              There are various ways to parallelize the processing but they will be have to take into account that presumably you need the updates to be in order, so you cannot for example just use a thread pool to process the price ticks.

              JK
              • 4. Re: Best practices for mass invocation
                AntonZ
                Jonathan, so what kind of suggestions there are?

                So, again, I just received 1 mln stock prices and want to get them onto my objects in the cache as fast as possible, in no particular order. How do you suggest to do the update in minimal time?

                Split the processes so they have multiple connections to cluster? Any suggestion to save time and still keep it all under one process?

                Currently below works (safe concurrent updates), but very slow:

                Parallel.ForEach<object>(AllObjects,
                delegate(object CurrentObject)
                {
                result = MyCache.Invoke(CurrentObject.ID, new MyCustomEP(somevalue));
                }
                • 5. Re: Best practices for mass invocation
                  user639604
                  How about go with Robert's suggestion, InvocableMap.invokeAll(Collection, EntryProcessor) and the EntryProcessor has a Map of of <CurrentObject.ID, value>?

                  Then during the execution of the EntryProcessor you know which value to use base on the key.
                  • 6. Re: Best practices for mass invocation
                    robvarga
                    AntonZ wrote:
                    Each invocation has a different parameter passed to Entry Processor.

                    So say, I have a very volatile field on my object (the object in turn is relatively big, with let's say 500 fields on it), for instance stock price field, which changes for example few times a second (while other parts of object are updated as well, with much smaller frequency buth all fields gets various updates every 5 minute let's say). How would you suggest to approach such update?

                    Currently I am essentially doing it through Custom Entry Processor, where cache.Invoke(ID, MyEntryProcessor(80)) updates object with id=ID, setting stock price to 80. (Since I have few checks before updating in my real use case, I do it through Entry Processor instead of PofUpdater or alike). It works fine for one update, but for thousands and more, it gets too slow.
                    In this case, as user639604 suggested, divide the keys by partition id (that is going to be a bit tricky with .NET), and each processor should contain a map of cache keys to parameters. On the server side you can get the appropriate parameter for the entry by getting the value from the map keyed by the cache key obtained from the entry. All keys in the map should map into the same partition.

                    The reason to separate different partitions to different processors is so that you do not need to duplicate data in different requests the proxy node sends to the storage nodes, rather each processor instance would go to a single storage node.

                    To be able to separate partitions from each other, you would either need to reimplement the key partitioning strategy Coherence uses in .NET, or implement your own key partitioning strategy both in .NET and Java and configure the service to use your implementation. You would need to confirm this with Oracle, the current DefaultKeyPartitioningStrategy simply takes a hash of the binary representation of the cache key modulo partition-count, but I am not 100% certain of what hash algorithm it uses. I have my guess, but I am not sure. Actually, I am not sure what methods the .NET side exposes, if you have a hashCode()-like method on the .NET side for the Binary object, that may give you the same hash code which is used for calculating the partition id.

                    Best regards,

                    Robert
                    • 7. Re: Best practices for mass invocation
                      AntonZ
                      So, how in the end would it look like?

                      foreach (KeyCollection in PartitionCollection)
                      MyCache.InvokeAll(null, new MyEntryProcessor(KeyCollection, SomeValueToUpdate)

                      Something of above?

                      Do you have a sample for .Net or Java for something like you're describing?

                      Thanks, Rob.
                      • 8. Re: Best practices for mass invocation
                        robvarga
                        AntonZ wrote:
                        So, how in the end would it look like?

                        foreach (KeyCollection in PartitionCollection)
                        MyCache.InvokeAll(null, new MyEntryProcessor(KeyCollection, SomeValueToUpdate)

                        Something of above?

                        Do you have a sample for .Net or Java for something like you're describing?

                        Thanks, Rob.
                        I can't give you a .NET sample, I never used .NET.

                        For Java it should be something like the following (it is more complex than on the server side, since you can't just get the key converter from the service on the client side as you don't have a reference to the partitioned service), you would have to verify it, as I never tried it:
                        int partitionCount = ... ; // get the partition count from the cluster side
                        Serializer pofContext = ... ; // get the POF context from somewhere
                        Binary binary = ExternalizableHelper.toBinary(cacheKey, pofContext);
                        int hash = binary.hashCode();
                        int partitionId = hash % partitionCount;
                        You can separate the cache keys to groups of cache keys all with the same partition id in the group, and submit a processor map with ids and corresponding property values to set from a single such group.

                        I am not sure if a similar thing would or would not work in .NET, as I have no idea whether a hashCode method is provided in the .NET client and if it is whether it is implemented identically or not.

                        Best regards,

                        Robert
                        • 9. Re: Best practices for mass invocation
                          robvarga
                          robvarga wrote:
                          AntonZ wrote:
                          So, how in the end would it look like?

                          foreach (KeyCollection in PartitionCollection)
                          MyCache.InvokeAll(null, new MyEntryProcessor(KeyCollection, SomeValueToUpdate)

                          Something of above?

                          Do you have a sample for .Net or Java for something like you're describing?

                          Thanks, Rob.
                          I can't give you a .NET sample, I never used .NET.

                          For Java it should be something like the following (it is more complex than on the server side, since you can't just get the key converter from the service on the client side as you don't have a reference to the partitioned service), you would have to verify it, as I never tried it:
                          int partitionCount = ... ; // get the partition count from the cluster side
                          Serializer pofContext = ... ; // get the POF context from somewhere
                          Binary binary = ExternalizableHelper.toBinary(cacheKey, pofContext);
                          int hash = binary.hashCode();
                          int partitionId = hash % partitionCount;
                          You can separate the cache keys to groups of cache keys all with the same partition id in the group, and submit a processor map with ids and corresponding property values to set from a single such group.

                          I am not sure if a similar thing would or would not work in .NET, as I have no idea whether a hashCode method is provided in the .NET client and if it is whether it is implemented identically or not.

                          Best regards,

                          Robert
                          Hm,

                          actually I just noticed that there is a Binary.calculateNaturalPartition(int partitionCount) method which gives you the partition id calculated in the default way, and it exists in .NET, too... I assume in C++, too.

                          So you can have just this and not bother with calculating the hash:
                          int partitionCount = ... ; // get the partition count from the cluster side with an Invocable
                          Serializer pofContext = ... ; // get the POF context from your service
                          Binary binary = ExternalizableHelper.toBinary(cacheKey, pofContext);
                          int partitionId = binary.calculateNaturalPartition(partitionCount);
                          This still comes with the usual assumption that you don't have a custom key partitioning strategy, and the cacheKey is actually the associated key if you defined key affinity.

                          Best regards,

                          Robert

                          Edited by: robvarga on Nov 11, 2012 12:57 PM