4 Replies Latest reply on Aug 21, 2013 3:35 PM by jcpsantos

    BinaryDeltaCompressor usage

    jcpsantos

      Hi,

       

      I'm thinking on using Coherence BinaryDeltaCompressor to calculate deltas in each entry update, and then apply this deltas in a secondary copy of the entry.

      I was wondering if anyone has used this Coherence feature, there is not much documentation about it, and since it is a low level class of Coherence, I dont even know if we are suposed to use it.

       

      Also, in our use case, each entry is fairly large, and most of the updates will be targeting some specific fields of the entry, which means that we would also like to "coalesce/merge" several deltas, and then take a "final delta" and apply it to the copy of the entry. I have tested this with BinaryDeltaCompressor and it seems that if we apply a delta on top of a delta, and apply the result to the entry copy, it will end up in the correct state (two deltas applied), for instance:

      - update entry from state X to state Y -> take delta 1;

      - update entry from state Y to state Z -> take delta 2;

      - merge deltas (apply delta 2 on delta 1) -> get delta final;

      - apply delta final on entry copy on state X, get state Z;

       

      Can anyone confirm this behavior?

       

      Also, we have tried using PofDeltaCompressor but I think it is having some problems reading the pof stream, and it is constantly falling back to plain BinaryDeltaCompressor, we are getting the following exception:

       

      2013-08-17 14:06:44.305/5.854 Oracle Coherence GE 3.7.1.9 <Error> (thread=DistributedCache, member=1): (Reverting to default binary delta algorithm.)

       

      2013-08-17 14:06:44.305/5.854 Oracle Coherence GE 3.7.1.9 <Error> (thread=DistributedCache, member=1): java.io.EOFException

      at com.tangosol.io.AbstractByteArrayReadBuffer$ByteArrayBufferInput.readPackedInt(AbstractByteArrayReadBuffer.java:430)

      at com.tangosol.io.pof.PofDeltaCompressor.diffSparseArray(PofDeltaCompressor.java:509)

      at com.tangosol.io.pof.PofDeltaCompressor.diffUserType(PofDeltaCompressor.java:366)

      at com.tangosol.io.pof.PofDeltaCompressor.diffUniformValue(PofDeltaCompressor.java:149)

      at com.tangosol.io.pof.PofDeltaCompressor.diffValue(PofDeltaCompressor.java:121)

      at com.tangosol.io.pof.PofDeltaCompressor.diffSparseArray(PofDeltaCompressor.java:523)

      at com.tangosol.io.pof.PofDeltaCompressor.diffUserType(PofDeltaCompressor.java:366)

      at com.tangosol.io.pof.PofDeltaCompressor.diffUniformValue(PofDeltaCompressor.java:149)

      at com.tangosol.io.pof.PofDeltaCompressor.diffValue(PofDeltaCompressor.java:121)

      at com.tangosol.io.pof.PofDeltaCompressor.diffSparseArray(PofDeltaCompressor.java:523)

      at com.tangosol.io.pof.PofDeltaCompressor.diffUserType(PofDeltaCompressor.java:366)

      at com.tangosol.io.pof.PofDeltaCompressor.diffUniformValue(PofDeltaCompressor.java:149)

      at com.tangosol.io.pof.PofDeltaCompressor.diffValue(PofDeltaCompressor.java:121)

      at com.tangosol.io.pof.PofDeltaCompressor.createDelta(PofDeltaCompressor.java:52)

      at com.tangosol.io.BinaryDeltaCompressor.extractDelta(BinaryDeltaCompressor.java:83)

      at delta.DeltaTester$DeltaExtractEntryProc.process(DeltaTester.java:211)

       

      thanks

        • 1. Re: BinaryDeltaCompressor usage
          Harvey Raja-Oracle

          Hi, I am surprised that applying a delta to a delta works, and would ask you to make sure your assertions / success criteria is correctly formed. This functionality is used internally in a number of contexts; delta backups, decoration only updates to the backup, and reconstructing a binary from the PofValue (object representations of POF properties). I did a talk on this a while ago but the slides appear to still be available if you are looking for literature: http://www.ukoug.org/what-we-offer/library/pof-secrets-and-tips-harvey-raja-oracle/pof-art.pdf Let me know if your assertions are valid and I'll take a closer look. Also, i'll include the examples I refer to in the slides as a post on this thread. Thanks, Harvey

          • 2. Re: BinaryDeltaCompressor usage
            Harvey Raja-Oracle
            package com.tangosol.io.pof;
            
            
            import com.tangosol.io.BinaryDeltaCompressor;
            import com.tangosol.io.ByteArrayWriteBuffer;
            import com.tangosol.io.ReadBuffer;
            import com.tangosol.io.ReadBuffer.BufferInput;
            import com.tangosol.io.Serializer;
            import com.tangosol.io.WriteBuffer;
            import com.tangosol.io.WriteBuffer.BufferOutput;
            import com.tangosol.io.pof.PofConstants;
            import com.tangosol.io.pof.PofHelper;
            import com.tangosol.io.pof.PofReader;
            import com.tangosol.io.pof.PofWriter;
            import com.tangosol.io.pof.PortableObject;
            import com.tangosol.io.pof.reflect.PofNavigationException;
            import com.tangosol.util.Base;
            import com.tangosol.util.Binary;
            import com.tangosol.util.BinaryEntry;
            import com.tangosol.util.ExternalizableHelper;
            import com.tangosol.util.extractor.AbstractUpdater;
            
            
            import java.io.IOException;
            import java.util.Collection;
            import java.util.Map;
            
            
            /**
             * PofCollectionUpdater is a {@link com.tangosol.util.ValueUpdater ValueUpdater}
             * implementation allowing an element within a collection to be added without
             * necessitating the deserialization of the collection in its entirety.
             *
             * @author hr  2012.05.31
             */
            public class PofCollectionUpdater
                    extends AbstractUpdater
                    implements PortableObject
                {
                // ----- constructors ---------------------------------------------------
            
            
                /**
                 * Construct a PofCollectionUpdater; used exclusively for serialization.
                 */
                public PofCollectionUpdater()
                    {
                    }
            
            
                /**
                 * Construct a PofCollectionUpdater with the provided path that should
                 * refer to a collection in any binary this updater operates against.
                 *
                 * @param anPath  path to the collection to update
                 */
                public PofCollectionUpdater(int...anPath)
                    {
                    m_anPath = anPath;
                    }
            
            
                // ----- AbstractUpdater methods ----------------------------------------
            
            
                /**
                 * {@inheritDoc}
                 */
                public void updateEntry(Map.Entry entry, Object oValue)
                    {
                    BinaryEntry binEntry    = (BinaryEntry) entry;
                    Serializer  serializer  = binEntry.getSerializer();
                    Binary      binValue    = binEntry.getBinaryValue();
                    Collection  collAppend  = (Collection) oValue;
                    ReadBuffer  buffValue   = ExternalizableHelper.getUndecorated((ReadBuffer) binValue);
                    BufferInput buffOld     = buffValue.getBufferInput();
                    int         nAppendSize = 0;
                    int[]       anPath      = m_anPath;
            
            
                    try
                        {
                        for (Object oElement : collAppend)
                            {
                            nAppendSize += sizeof(oElement, serializer);
                            }
            
            
                        buffOld.readUnsignedByte(); // skip FMT_EXT
            
            
                        for (int i = 0; i < anPath.length; ++i)
                            {
                            int nTypeId    = buffOld.readPackedInt(); // pof type id
                            int nVersionId = nTypeId > 0 ? buffOld.readPackedInt() : 0; // object version id
                            for (int j = 0; j < anPath[i]; ++j)
                                {
                                buffOld.readPackedInt(); // attribute index
                                PofHelper.skipValue(buffOld);
                                }
                            }
                        buffOld.readPackedInt(); // attribute index
                        int of      = buffOld.getOffset();
                        int nTypeId = buffOld.readPackedInt(); // pof type id
            
            
                        if (nTypeId != PofConstants.T_COLLECTION)
                            {
                            throw new PofNavigationException("Pof Collection expected");
                            }
            
            
                        int cItems  = buffOld.readPackedInt();
                        int ofItems = buffOld.getOffset();
            
            
                        for (int i = 0; i < cItems; ++i)
                            {
                            PofHelper.skipValue(buffOld);
                            }
            
            
                        int        ofEnd  = buffOld.getOffset();
                        byte[]     abOrig = binValue.toByteArray(ofItems, ofEnd - ofItems);
                        ReadBuffer buf    = new PofBinaryDeltaCompressor().encode(0, of, ofEnd - of + nAppendSize,
                            cItems + collAppend.size(), abOrig, ofEnd, binValue.length(), serializer, collAppend);
            
            
                        ReadBuffer applied = new BinaryDeltaCompressor().applyDelta(binValue, buf);
                        binEntry.updateBinaryValue(applied.toBinary());
                        }
                    catch (IOException e)
                        {
                        throw Base.ensureRuntimeException(e);
                        }
                    }
            
            
                // ----- PortableObject interface ---------------------------------------
            
            
                /**
                 * {@inheritDoc}
                 */
                public void readExternal(PofReader in) throws IOException
                    {
                    m_anPath = in.readIntArray(0);
                    }
            
            
                /**
                 * {@inheritDoc}
                 */
                public void writeExternal(PofWriter out) throws IOException
                    {
                    out.writeIntArray(0, m_anPath);
                    }
            
            
                // ----- internal helpers -----------------------------------------------
            
            
                /**
                 * Determine the serialized size of an object.
                 *
                 * @param oValue      the object that should be measured
                 * @param serializer  the serializer to use to serialize the object
                 *
                 * @return the number of bytes required by the object
                 *
                 * @throws IOException
                 */
                protected static int sizeof(Object oValue, Serializer serializer) throws IOException
                    {
                    WriteBuffer buff    = new ByteArrayWriteBuffer(128);
                    BufferOutput buffOut = buff.getBufferOutput();
                    serializer.serialize(buffOut, oValue);
                    return buffOut.getOffset();
                    }
            
            
                // ----- inner class: PofBinaryDeltaCompressor --------------------------
            
            
                /**
                 * A DeltaCompressor that creates a delta that can be applied to a collection.
                 */
                protected class PofBinaryDeltaCompressor
                        extends BinaryDeltaCompressor
                    {
            
            
                    /**
                     * Create a delta with the provided parameters.
                     *
                     * @param ofHead
                     * @param ofHeadEnd
                     * @param cBytes
                     * @param cItems
                     * @param abOrig
                     * @param ofTail
                     * @param ofTailEnd
                     * @param serializer
                     * @param collAppend
                     * @return
                     * @throws IOException
                     */
                    public ReadBuffer encode(int ofHead, int ofHeadEnd, int cBytes, int cItems,
                                             byte[] abOrig, int ofTail, int ofTailEnd,
                                             Serializer serializer, Collection collAppend)
                            throws IOException
                        {
                        WriteBuffer bufDelta = new ByteArrayWriteBuffer(20 + cBytes);
                        WriteBuffer.BufferOutput out = bufDelta.getBufferOutput();
            
            
                        out.write(FMT_BINDIFF);
            
            
                        // extract 0 - collection
                        out.write(OP_EXTRACT);
                        out.writePackedInt(ofHead);
                        out.writePackedInt(ofHeadEnd);
            
            
                        // write collection
                        out.write(OP_APPEND);
                        out.writePackedInt(cBytes);
                        out.writePackedInt(PofConstants.T_COLLECTION);
                        out.writePackedInt(cItems);
                        out.write(abOrig);
            
            
                        for (Object oElement : collAppend)
                            {
                            serializer.serialize(out, oElement);
                            }
            
            
                        // write remaining
                        if (ofTail < ofTailEnd)
                            {
                            out.write(OP_EXTRACT);
                            out.writePackedInt(ofTail);
                            out.writePackedInt(ofTailEnd - ofTail);
                            }
            
            
                        out.write(OP_TERM);
            
            
                        return bufDelta.getReadBuffer();
                        }
                    }
            
            
                private int[] m_anPath;
                }
            
            • 3. Re: BinaryDeltaCompressor usage
              Harvey Raja-Oracle
              public class PofCollectionUpdaterTest
                  {
                  @Test
                  public void testCollectionAppend()
                      {
                      Collection<String>     coll       = Arrays.asList("alpha", "beta", "gamma");
                      Collection<String>     collAppend = Arrays.asList("delta", "epsilon", "zeta");
                      final SimplePofContext ctx        = new SimplePofContext();
                      CollectionContainer    value      = new CollectionContainer(coll, 0);
              
              
                      ctx.registerUserType(1000, CollectionContainer.class, new PofAnnotationSerializer(1000, CollectionContainer.class));
              
              
                      Binary binVal   = ExternalizableHelper.toBinary(value, ctx);
                      BinaryEntry binEntry = Mockito.mock(BinaryEntry.class);
              
              
                      when(binEntry.getBinaryValue()).thenReturn(binVal);
                      when(binEntry.getSerializer()).thenReturn(ctx);
                      doAnswer(new Answer<Void>()
                          {
                          @Override
                          public Void answer(InvocationOnMock invocationOnMock) throws Throwable
                              {
                              Binary              binValue = (Binary) invocationOnMock.getArguments()[0];
                              CollectionContainer collNew  = (CollectionContainer) ExternalizableHelper.fromBinary(binValue, ctx);
              
              
                              assertThat(collNew, notNullValue());
                              assertThat(collNew.m_collValue.size(), is(6));
                              return null;
                              }
                          }
                      ).when(binEntry).updateBinaryValue(any(Binary.class));
              
              
                      new PofCollectionUpdater(1).updateEntry(binEntry, collAppend);
                      }
              
              
              
              
              
              
                  @Portable
                  public static class CollectionContainer
                      {
              
              
                      public CollectionContainer()
                          {
                          }
              
              
                      public CollectionContainer(Collection collValue, int nIdentity)
                          {
                          this.m_collValue = collValue;
                          this.m_nIdentity = nIdentity;
                          }
              
              
                      @PortableProperty(1)
                      protected Collection m_collValue;
                      @PortableProperty(0)
                      protected int        m_nIdentity;
                      }
                  }
              
              • 4. Re: BinaryDeltaCompressor usage
                jcpsantos

                You're right, it is not possible to apply a delta over a delta.

                In my initial test cases it was working, I guess I did only non conflicting updates and somehow the delta compressor was able to increment the previous delta with the new changes, but in another simple test case where I updated twice the same value, it did throw an out of bounds exception:


                ... sizes in bytes, first delta calculated here

                entity size: 156

                delta size: 22

                finalDelta size: 22

                ... new update and second delta calculated here and applied on previous delta

                Exception in thread "main" java.lang.IndexOutOfBoundsException: ofBegin=14, ofEnd=156, Binary.length=22

                 

                So, I guess there is no way of "coalescing" two or more deltas, right?

                 

                Another question regarding POF, we are trying to implement some kind of lazy de/serialization of our entities.

                We know how to parse and navigate the pof stream using pofupdaters and extractors, but with PofSerializers we already get the PofReader and PofWriter.

                In there a way to get the "ComplexPofValue" of the entity being de/serialized?

                 

                Thanks.