Forum Stats

  • 3,815,817 Users
  • 2,259,093 Discussions
  • 7,893,255 Comments

Discussions

initial ContinuousQuery entryInserted() events in original insertion order?

snidely_whiplash
snidely_whiplash Member Posts: 576
edited Nov 15, 2018 6:24AM in Coherence Support

I want my initial blast of MapEvents to show up in the original insertion order.  The cached objects have a timestamp and serial ID field so either of those could be used for ordering.  Is there some way to tell a CQC that it should return values in order?  It does not appear that you can pass in any kind of comparator.  Before starting the CQC I could get the size of the cache, then populate a sorted collection of that size with the initial events.  That's inelegant and introduces races conditions around startup. 

Answers

  • robvarga
    robvarga Member Posts: 1,708 Gold Badge
    edited Nov 14, 2018 11:40AM

    Hi,

    I would expect it would not to be possible.

    First of all, I would expect that you would get the events for any single partition as one contiguous block, without events for entries in other partitions interleaved, then events for another single partition, and so on... which would make it very unlikely that the order of events you receive to still be what you would need.

    As for insertion order within a single partition, that still may or may not be possible. After a partition moves several times, and sometimes promoted from backup, original insertion order is very unlikely to be retained, but it could potentially still be present, depending on the backing map implementation. If you write your own backing map implementation which also iterates elements in a partition in the insertion order, you may get what you want within a single partition.

    Note, though, that the above is just my expectation, it may not be correct.

    Best regards,

    Rob

  • snidely_whiplash
    snidely_whiplash Member Posts: 576
    edited Nov 15, 2018 12:46AM

    I'm stamping the objects serially on insertion so I don't need to rely on Coherence to maintain the insertion order for me.  I just need to be able to get the results ordered by that field's value.  If there isn't some way to order those initially returned values it would be a great feature to have.  Even if the CQC would just fire some event indicating the historical events have completed that would be great.  Then I would know subsequent events were arriving in time order.

    Right now I'm doing like this.  Not pretty.  If a cache insert happens during the initial blast of events it'll cause out of order processing.

    int initialSize = cache.keySet(currentFilterSet).size();

    Set<Execution> initialSet = new TreeSet<>();

    public void entryInserted(MapEvent mapEvent) {

         final Execution ex = (Execution) mapEvent.getNewValue();
         if (initialSize > 0) {

              // we're still getting the initial blast
              initialSize--;
              initialSet.add(ex);
              if (initialSize == 0) {

                   for (Execution e : initialSet) {

                        process(e);

                   }

                   initialSet.clear();
              }

         } else {

              process(ex);

         }

    }

  • robvarga
    robvarga Member Posts: 1,708 Gold Badge
    edited Nov 15, 2018 6:24AM

    Hi,

    first of all, I am not entirely sure that even subsequent events would arrive in overall time order, the best Coherence should be able to guarantee is time-order within a partition, but events from different partitions (owned by different nodes) may get interleaved, as there is no single common time-source ordering those events, except for the listener node itself.

    Also it would slow things down to have to wait to be able to ensure that events from different partitions are ordered correctly according to a comparator trying to force an ordering which in-effect would merge-sort per-partition-ordered event streams, which would hold up the listener node potentially indefinitely and potentially blow the listener node OOM, as it would have to hold the entire content of the CQC in memory to pre-sort it, before it could dispatch the first initial blast event, in order to carry out what you want.

    As for your example, you probably recognize, that one of the problem with your code is that assuming that in your example cache is the backing cache, then events can come between the cache.size() call and the the creation of the CQC which would render initialSize incorrect.

    What I would do in your situation is immediately after creating the CQC with a listener, do a backingCache.putAll() with a dummy marker Execution into every single partition to-be-returned by the CQC and for every single partition treat every event arriving within the partition before the marker Execution for the same partition as historic.

    That I believe is the closest you can get without dangerous side-effects like OOM or waiting indefinitely, but it would mean, that you should relinquish the cross-partition ordering, rather adopt a per-partition ordering.

    The events may also hold information upon which it may be inferred whether an event is for a mutation before or after the listener registration request for the containing partition, but not cross partition, and the change of that from pre to post would be suitable instead of the event for the marker execution insertion, but the point is, you still have to track this per partition.

    Then if you want, you can wait for the observing of all the marker Execution inserts, as at that point you would have received events for all the historic (and some additional non-historic events which you can deal with) but that would re-introduce all the risks of OOM and blocking indefinitely.

    In general, you probably should not rely on timestamping anyway across multiple nodes, since different nodes on different machines may have clocks skewed compared to each other, and therefore the concept of ordering by timestamps from local clocks is flawed anyway.

    Best regards,


    Rob