1 Reply Latest reply: Jun 24, 2013 11:01 AM by Unmesh RSS

    Help with CQL - Ignoring events


      Hi all,


      I am trying to put together a proof of concept using OEP and have some difficulties with CQL.  Here is what I would like to implement:


      Given a stream of events for different IDs

           a) Partition stream based on ID

           b) Act on first event for each ID as soon as it happens

           c) Ignore subsequent events for the same ID within 3 seconds period


      For example:

      Time (sec)Input EventOutput
      0+{id = 1}+{id = 1}
      1+{id = 1}, +{id = 3}+{id = 3}
      2+{id = 3}
      3+{id = 3}, +{id = 2}+{id = 2}
      4+{id = 1},+{id = 3}+{id = 1},+{id = 3}
      5+{id = 2}

      Thanks in advance for any ideas/thoughts/suggestions,



        • 1. Re: Help with CQL - Ignoring events

          Here's an approach that should solve your requirement. It makes use of a Java class with static methods.

          The java class will maintain a hashmap where the lastoutput timestamp for each of the id values is stored.

          Whenever we receive a new event, we call the getter in that StateClass to get the lastOutput timestamp for that id and if the difference between current timestamp and that value is >= 3 then we output that event. For the first event, there wouldn't be any 'lastOutput' so we check that condition by prev(A.id) is null.

          Whenever we output an event, in the MEASURES clause, we update the 'lastOutput timestamp' for that id by calling the setter.


          The query would look like :

                       select T.id  as id, T.tm as tm from helloworldInputChannel


                               partition by id


                                   trialPackage.StateClass.setLastOutput(A.id, A.ELEMENT_TIME) as id,

                                   A.ELEMENT_TIME as tm



                                 A as ((prev(A.id) is null) or ((A.ELEMENT_TIME - trialPackage.StateClass.getLastOutput(A.id)) >= 3))

                             ) as T


          Note that I have tried this with your example data on an application timestamped channel. If you try with system timestamped channel (where timestamp of the event would be System.nanoTime()) then you would have to check that the difference in A.ELEMENT_TIME  and trialPackage.StateClass.getLastOutput(A.id) is >= 3*10^9 (nanoseconds equivalent of 3 seconds).




          package trialPackage;


          import java.util.HashMap;

          import java.util.Map;


          public class StateClass {


              static Map<java.lang.Integer, java.lang.Long> lastOutputs;


              public static long getLastOutput(int id)


                    if(lastOutputs != null)




                         return lastOutputs.get(id);



                   return 0;



              public static int setLastOutput(int id, long lastOutput)


                  if(lastOutputs == null)

                     lastOutputs = new HashMap<java.lang.Integer, java.lang.Long>();

                  lastOutputs.put(id, lastOutput);


                  return id;





          Hope this helps!