We are looking for some advice or recommendation on CEP architecture.
We need to build a CEP application that conforms to the following:
• HA with no loss of events or duplicate events when failing over to the backup server.
• We have some aggregative rules that needs to see all events.
• Events are XMLs with size of 3KB-50KB. Not all elements are needed for the rules but they are there for other systems that come after the CEP (the customer services).
• The XML elements that the CEP needs are in varying depth in the XML.
Running the EPN on a single thread is not fast enough for the required throughput mainly because network latency to the JMS and the heavy task of parsing of the XML. Because of that we are looking for a solution that will read the messages from the JMS in parallel (multi thread) but will keep the same order of events between the Primary and Secondary CEPs.
One idea that came to our minds is to use Coherence cache in the following way:
• On the CEP inbound use a distributed queue and not topic (at the CEP outbound it is still topic).
• On the CEPs side use a Coherence cache that runs on the CEPs JVMs (since we already have a Coherence cluster for HA).
• Both CEPs read from the queue using multi threading (10 reading threads – total of 20 threads) and putting it to the Coherence cache.
• The Coherence cache is publishing the events to both CEPs on a single thread.
The EPN looks something like this:
JMS adapter (multi threaded) -> replicated cache on both CEPs -> event bean -> HA adapter -> channel -> processor -> ….
Does this sounds sound to you?
Are we over shooting here? Is there a simpler solution for our needs?
Is there a best practice for such requirements?
For handling inbound events you should use a JMS topic (or distributed topic) instead of
a distributed queue. The reason is that in an HA configuration the primary CEP server
and the secondary CEP server(s) have to receive the same stream of input events, which
is not possible with via a queue because it is designed to deliver a message only to one
subscriber. In a topic each message is delivered to all its subscribers.
I don't follow the rationale for placing JMS input events in a Coherence cache and then
extracting them via an event bean. I recommend that in your JMS input adapter you extract
the CEP-relevant information form an XML message, build a CEP event and send it directly to
the channel that feeds the CEP processor, with an EPN as follows:
JMS adapter -> channel -> processor ->....
On the other hand you could consider an alternate approach for handling inbound XML messages:
using the Coherence messaging pattern instead of JMS. The Coherence messaging pattern supports
both topics and queues with similar semantics as JMS. With this approach you would be leveraging
the Coherence cluster in the same JVMs as the CEP servers, as you had intended. In this scenario your
EPN would roughly be as follows:
event bean (as topic input adapter) -> channel -> processor ->....
Just to make it clear:
We do not parse the XML on the event bean after the Coherence. We do it on the JMS adapter on multiple threads in order to utilize all the server resources (CPUs) and then we put it in the replicated cache.
The requirements from our application are:
- There is an aggregative query that needs to "see" all events (this means that we need to pass all events thru a single processor and we cannot partition them to several processors).
- Because this is a HA solution the events on both CEPs (primary and secondary) needs to be at the same order when reaching the HA inbound adapter and the processor.
- A single thread JMS adapter is not reading the messages from the JMS fast enough mainly because it takes time to parse the XML to an event.
- Using a multi-threaded adapter or many single threaded adapters with message selector will create a situation that the order of events on both CEPs will not be the same at the processor inbound.
This is why we needed a mediator so we can read in multiple threads that will parse the XMLs in parallel without concerning on order of messages and on the other hand publish all the messages on a single thread to the processors on both CEPs from this shared mediator (we use a replicated cache that runs on both JVMs).
We use queue instead of topic because if we read the messages from a topic on both CEPs it will be stored twice on the Coherence replicated cache. But if we use a queue, when server 1 read the message and put it in the Coherence replicated cache then server 2 will not read it because it was removed from the queue.
If I understand correctly you are suggesting replacing the JMS adapter with an event bean that will read the messages from the JMS directly?
Are you also suggesting that we will not use a replicated cache but instead a stand alone cache on each server? In this case how do we keep the same order of events on both CEPs (on both caches)?
Thanks for your clarifications, I see now why you need a mediator (shared buffer) that will hold the CEP events that result
from XML parsing in the input JMS adapters, and that you need to parallelize XML parsing via multiple JMS adapters per
server to improve performance.
My updated suggestions are as follows:
- For the mediator, use the Coherence cache cluster as a messaging system, via the Coherence Messaging Pattern library, which supports
both queues and topics. Use a messaging topic to which all the JMS Adapters will publish CEP events after completing parsing of
incoming XML messages. The Coherence messaging pattern provides HA. It distributes the published messages across the servers in the cluster
and assures nothing is lost in the case of server failure:
- Configure each JMS adapter with CLIENT_ACKNOWLEDGE mode so that reception of the message is only acknowledged
after the JMS adapter stores the CEP event in the Coherence cache (publishes it to the messaging topic). The JMS adapter performs
the acknowledgement by invoking the acknowledge() method on the received message object (message.acknowledge()).
This assures messages would not be lost if a server fails after reading messages from the JMS input queue and before
writing them to the intermediate Coherence messaging topic.
- An event bean on each CEP server reads messages from the messaging topic in the Coherence cache. The primary and the secondary
server(s) subscribe and read from the same Coherence messaging topic. The event bean forwards the event to a channel that
feeds the processor. A sketch of the EPN in each CEP server would be:
JMS input adapters -> Coherence-based messaging topic -> event bean -> channel -> processor