Towards a Timely, Well-Balanced, Event-Driven Architecture Blog

Version 2


    In most Java applications, system performance and throughput are adversely impacted if a receiver consumes events at a slower rate than that which they can be dispatched.

    For instance, in a financial system for monitoring trading orders, diverse event streams (customer orders, order executions, market data updates, etc.) might all contribute in the creation of near-realtime views. If a particular event occurs, it might takex milliseconds for the analytics to recompute the view. But if a sequence of events occurs in intervals smaller than x, the analytics won't be able to put up with the load. Somehow, we need to provide the application to be well-conditioned to load preventing resources from being committed when demand exceeds capacity.

    If we could assume that any type of event is consumed as fast as it can be produced, there would be no need to separate "event submission" from "event handling." In fact, it is the inability to recognize this need that leads to bottlenecks that become only visible when the system is tested under a heavy load.

    On the other hand, it is often possible to design receivers in which dealing with one event or 100 events takes roughly the same time or, to be more accurate, in which the code does not scale linearly with the number of events. This is because whether it is one event or 100 events, the same number of calls to the database, the filesystem, or to a CPU-intensive mathematical library might still be required. And it is these operations in which the most time must be spent.

    In this article, we discuss a simple framework that forms the basis for a powerful and flexible solution to decouple event production and consumption while supporting a pluggable "event-dispatching" policy.

    Dispatchers, Receivers, and Events

    Let's start by defining our main players: the dispatcher, the receiver, and the events.

    The Dispatcher interface provides the means for any user to send events.

    /** * An object that dispatches submitted events. This interface * provides a way of decoupling event submission from the * mechanics of how and when each event will be handled **/ public interface Dispatcher<T> { public void push(T event) throws NullPointerException; } 
    The Receiver interface provides the means to handle message notifications.
    /** * Defines an object which listens for generic events. **/ public interface Receiver<T> { /** * Invoked by the target dispatcher when flushing events. * * @param events An array of events (size at least one) * currently pushed into the dispatcher. The elements of * the array are guaranteed to be in the order in which * they were pushed into the dispatcher (insertion-order). **/ public void received(T[] events); } 

    Within the context of this article, events can be any object: aString, an Integer, a user-defined class, etc.

    Attentive readers may argue that these interfaces do not capture the problem in its entirety. In particular, theDispatcher interface could provide a few other signatures of the push method to allow atomic "push multiple" functionality as well as lifecycle functionality. For instance, termination could be managed via a few shutdown methods as in the java.util.concurrent.ExecutorServiceinterface. In the interest of simplicity, we omitted this aspect.

    Direct Dispatching

    Dispatcher implementations are responsible for transformingpush calls into well-regulated calls to the receiver. Their sole role is to act as an intermediary between the source (or sources) of events and the event handler.

    In the simplest case, a dispatcher can send the submitted message immediately in the caller's thread.

    /** * A Dispatcher implementation which sends any submitted message * immediately to the Receiver within the same thread. * @param <T> */ public class DirectDispatcher<T> implements Dispatcher<T> { private final Receiver<T> receiver; public DirectDispatcher(Receiver<T> receiver) throws NullPointerException { if (receiver == null) throw new NullPointerException(); this.receiver = receiver; } public void push(T object) throws NullPointerException { if (t == null) throw new NullPointerException(); T[] array =(T[]) new Object[1]; array[0] = object; this.receiver.received(array); } } 

    DirectDispatcher effectively is just an adapter for the interfaces Dispatcher and Receiver. It wires up the dispatcher/receiver abstraction we just decided to disjoin. Direct dispatching relies upon receivers handling the events as fast as they are produced.

    A more sophisticated implementation could batch events and wait until n calls have been made before dispatching them to the receiver:

    public class DirectBatchDispatcher<T> implements Dispatcher<T> { private final Receiver<T> receiver; private final AtomicInteger counter = new AtomicInteger(); private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<T>(); private final int batchSize; public DirectBatchDispatcher(Receiver<T> receiver, int size) throws IllegalArgumentException, NullPointerException { if (receiver == null) throw new NullPointerException(); if (batchSize < 1) throw new IllegalArgumentException(); this.receiver = receiver; this.batchSize = batchSize; } public void push(T event) throws NullPointerException { if (event == null) throw new NullPointerException(); queue.offer(event); int i = counter.incrementAndGet(); if (i>0 && i % batchSize == 0) { T[] array = (T[]) new Object[batchSize]; for (int j=0;j<batchSize;j++) { array[j] = queue.poll(); } receiver.received(array); } } } 

    There are several limitations with theDirectBatchDispatcher:

    • Event dispatching still occurs in the caller's thread.
    • The choice of n (size of the batches) implies some a priori knowledge of the number of incoming events and their distribution over time.

    In its first cut, it is hard to think of many use cases that would benefit from the DirectBatchDispatcher. The next implementation fixes its limitations.

    Asynchronous Dispatching

    The implementation we want to propose needs to translate calls from dispatchers into timely, well-regulated calls to the receiver, so that if the dispatcher's push method is called several times in quick succession, it translates into just one call to the receiver. We call this implementation theTimelyDispatcher.

    public class TimelyDispatcher<T> implements Dispatcher<T> { private final LinkedBlockingQueue<T> lbq = new LinkedBlockingQueue<T>(); private final Runnable runnable; private final AtomicLong timeLastAction = new AtomicLong(); private TimelyDispatcher(final Receiver<T> receiver, final long checkPeriod, final long maxPeriod, final TimeUnit timeUnit) { final long checkPeriodNanos = timeUnit.toNanos(checkPeriod); final long maxPeriodNanos = timeUnit.toNanos(maxPeriod); runnable = new Runnable() { private long timeFirstTake; public void run() { timeLastAction.set(System.nanoTime()); timeFirstTake = System.nanoTime(); ArrayList<T> list = new ArrayList<T>(); while (true) { try { T t = (list.isEmpty()) ? lbq.take() : lbq.poll(checkPeriod, timeUnit); long now = System.nanoTime(); if (list.isEmpty()) { timeFirstTake = now; } if (t!=null) { list.add(t); } if ((now - timeLastAction.get()) > checkPeriodNanos || (now - timeFirstTake) > maxPeriodNanos) { timeLastAction.set(now); T[] array = list.toArray((T[]) new Object[0]); receiver.received(array); list.clear(); } } catch (InterruptedException e) { // For sake of simplicity the code // does not deal with interruptions. throw new RuntimeException(e); } } } }; } // the safe way to start a thread is outside a constructor. private void startThread(ThreadFactory threadFactory) { Thread t = threadFactory.newThread(runnable); t.start(); } public void push(T t) throws NullPointerException { if (t == null) throw new NullPointerException(); timeLastAction.set(System.nanoTime()); lbq.offer(t); } /** * Constructs a new ConcurrentDispatcher. * * @param <T> the type of messages/events to push. * @param receiver the receiver implementation which will handle the events. * @param checkPeriod * @param maxPeriod * @param timeUnit the unit in which checkPeriod and maxPeriod are expressed (e.g.: milliseconds) * @param threadFactory the ThreadFactory to create the thread responsible of wiring up dispatcher * and receiver. * @return **/ public static <T> TimelyDispatcher<T> newInstance(Receiver<T> receiver, long checkPeriod, long maxPeriod, TimeUnit timeUnit, ThreadFactory threadFactory) throws NullPointerException, IllegalArgumentException { if (receiver == null || timeUnit == null || threadFactory == null) throw new NullPointerException(); if (checkPeriod <= 0 || maxPeriod <= 0 || checkPeriod >= maxPeriod) throw new IllegalArgumentException(); TimelyDispatcher<T> c = new TimelyDispatcher<T>(receiver, checkPeriod, maxPeriod, timeUnit); c.startThread(threadFactory); return c; } } 


    TimelyDispatcher is an immutable class. At creation time, the user must specify a non-null Receiverimplementation to wire up, and a checkPeriod and amaxPeriod. The checkPeriod defines the time interval in which events must occur before they are dispatched to the receiver. The maxPeriod defines the maximum time interval for which, in the presence of events, calls to the receiver can be delayed. Both checkPeriod andmaxPeriod are defined via the sameTimeUnit argument (e.g., milliseconds). Finally, since our implementation must create a new thread to perform asynchronous calls, it is given a ThreadFactory.

    How the Algorithm Works

    An embedded thread is responsible for monitoring the activity of the push method. When the method is invoked, we register the time of the call and update a thread-safe queue. We use a LinkedBlockingQueue so that as items are added to it, the embedded thread can take them off and store them into a private (thread-confined) list. This list's lifecycle is add-add-add-... , snapshot (toArray), and empty.

    The embedded thread encapsulates the logic that determines when to invoke the receiver's received method by checking on the current nano time against the values stored in the atomictimeLastAction and thread-privatetimeLastNotification variables.

    In Action

    Figures 1 and 2 show how calls to the Dispatcherare collected into smaller numbers of segmented calls to theReceiver. The elapsed time is captured on the X axis and each call is represented by a small multicolored dot.

    Asynchronous event dispatching with one source

    Figure 1. Asynchronous event dispatching with one source

    Figure 2 further enhances the problem by having two sources of events.

    Asynchronous event dispatching with two sources
    Figure 2. Asynchronous event dispatching with two sources

    Considerations and Improvements

    The TimelyDispatcher is not necessarily the bestDispatcher implementation; it is just one of many. It should be possible to write many other interesting implementations. For instance, we could write another that uses two queues, so that when we write on one queue the other queue can be drained in parallel. A volatile Boolean flag could be toggled by the embedded thread, determining which queue is for writing and which queue is for reading and emptying. If it were safe to assume that thepush method is always invoked by the same thread, we would also be able to replace the two thread-safe queues with any non-thread-safe ordered collection (e.g., ArrayList) and verify the performance gain (if any). Furthermore, if a common use case in our application is to handle bursts of events from multiple threads at the same time, it could be possible to useThreadLocal to direct each thread to its own queue in an attempt to reduce contention to the minimum.

    While it is possible to enhance or modify theTimelyDispatcher for specific needs, we should always remind ourselves that writing (and testing) concurrent software is still very hard and that it is very easy to get carried away with potentially more sophisticated yet unnecessary implementations. Therefore, before attempting to write a better concurrent dispatcher, we should be absolutely sure that the available one does not meet the needed criteria for performance and scalability.

    Asynchronous Dispatching Versus the Producer/Consumer Pattern

    It is important not to confuse the applicability of the asynchronous dispatching technique with the Producer/Consumer pattern. In the Producer/Consumer pattern, events represent self-contained units of work (tasks) that can be tackled by any consumer. In its simplest forms, scalability is achieved by adding more consumers. The developer's task is therefore to identify highly homogeneous, parallelizable activities. On the other hand, in the dispatcher/receiver framework, events represent the trigger for generally expensive activities to be performed. The time to perform those activities cannot be reduced and the separation between event production and consumption is there to prevent requests from executing more activities than the JVM would be able to sustain. In a way, the two approaches are orthogonal to each other, and in some circumstances it is also possible to combine them together--for example, the same class could act as receiver and producer at the same time.


    With Java SE 5 and 6, developers are given a powerful API that significantly simplifies the tasks of writing concurrent applications. At the same time, multi-core machines are becoming the norm; consequently, leveraging their full power requires a deep understanding of concurrent programming. In this article, we presented the problem of balancing the flow of events between dispatchers and receivers that is intrinsically linked with concurrency. We proposed a solution and used thejava.util.concurrent API (AtomicLong,LinkedBlockingQueue, etc.) to implement it.



    The author would like to thank Brian Goetz for his suggestions and assistance during the writing of this article. His guidance was particularly beneficial in reducing unnecessary complexity in the sample code.