CompletableFuture for Asynchronous Programming in Java 8

Version 9

    by José Paumard

     

    New, elegant ways to process data asynchronously

     

    Java SE 8 brought so many new things to the Java platform that some of them have probably been left in the shadows. Not all applications are using the java.util.concurrent package, even though the primitives provided in this package are extremely useful for writing correct concurrent code.

     

    This package saw several very nice additions in Java 8. The ones we discuss in this article are the CompletionStage interface and the CompletableFuture implementing class. Along with the Future interface, they provide very nice patterns for building asynchronous systems.

     

    Problem Statement

     

    Let's start with the following piece of code. This is not meant to be Java code, but merely metalanguage code. We do not care about which API provides the methods written here nor about the classes used.

     

    queryEngine.select("select user from User user")
                .forEach(user -> System.out.println(user));
    

     

    We have here a query engine that launches a Java Persistence Query Language (JPQL) type of request on a database. Once we have the result of this query, we would like to simply print the result. The querying of the database might be slow, so we would like to execute this code in a separate thread and trigger the printing of the result when it is available. Once we have launched this task, we do not really want to take care of it anymore.

     

    What tools do we have in Java 7 to create an API that would implement this code? We can wrap the task to be executed in a Callable, and submit this object to an ExecutorService. This pattern was introduced in Java 5 and is now well known.

     

    Callable<String> task = () -> "select user from User";
    Future<String> future = executorService.submit(task);
    

     

    The only way to get the result from a future object is to call its get() method in the thread that submitted the task. This method is blocking, so this call will block the thread until the result is available to print.

     

    This is exactly where the CompletionStage comes to the rescue.

     

    First Chaining Pattern

     

    Let's rewrite the submission of the task using one of the CompletionStage patterns.

     

    This pattern:

     

    executor.submit(() -> {
               () -> "select user from User";
    });
    

     

    Becomes the following:

     

    CompletableFuture<List<User>> completableFuture =
         CompletableFuture.supplyAsync(() -> {           () -> dbEngine.query("select user from User");    }, executor);
    

     

    Instead of passing our Callable to the submit() method of an ExecutorService, we pass it to the static supplyAsync() method of CompletableFuture. This method can also take an Executor as a second parameter, giving the client a choice for the pool of threads that is going to execute the Callable.

     

    It returns an instance of CompletableFuture, a new class from Java 8. On this object, we can call the following:

     

    completableFuture.thenAccept(System.out::println);
    

     

    The consumer passed to the thenAccept() method will be called automatically—without any client intervention—when the result is available. No more thread blocking as in the previous case.

     

    What Is a CompletionStage?

     

    In a nutshell, a CompletionStage is a model that carries a task. We will see in the following sections that a task can be an instance of Runnable, Consumer, or Function. The task is an element of a chain. CompletionStage elements are linked together in different ways along the chain. An "upstream" element is a CompletionStage that is executed before the element we are considering. Consequently, a "downstream" element is a CompletionStage that is executed after the element we are considering.

     

    The execution of a CompletionStage is triggered upon the completion of one or more upstream  CompletionStages. Those CompletionStages might return values, and these values can be fed to this CompletionStage. The completion of this CompletionStage can also produce a result and trigger other downstream CompletionStages.

     

    So a CompletionStage is an element of a chain.

     

    The CompletionStage interface has an implementation called CompletableFuture. Note that CompletableFuture is also an implementation of the Future interface. CompletionStage does not extend Future.

     

    A task has a state:

     

    • It might be running.
    • It might be completed normally and might have produced a result.
    • It might be completed exceptionally and might have produced an exception.

     

    More Methods from Future

     

    Future defines five methods in three categories:

     

    • cancel(), which is meant to cancel a running task
    • isCanceled() and isDone() to check whether the task is still running
    • get(), which comes in two flavors, the second one taking a timeout

     

    CompletableFuture adds six new Future-like methods.

     

    The first two methods are join() and getNow(value). The first, join(), blocks until the CompletableFuture is completed, just like the old get() method does. The main difference is that the join() method does not throw a checked exception, leading to simpler patterns. The getNow(value) is similar. It returns immediately, with the value provided in case this CompletableFuture has not been completed yet. Note that this call does not force the CompletableFuture to complete.

     

    The four remaining methods force the future to complete, either with a value or exceptionally, and they can override the value produced by this future if it is already completed.

     

    • The complete(value) method completes the CompletableFuture if it has not been completed, and it sets its value to the passed value. If the CompletableFuture has already completed, its return value is not changed. If you need to change this value, the method you want to call is the obtrude(value) method. This method does change the value of the CompletableFuture, even if it has already completed. This last method should be used with care and only in error recovery situations.
    • Another pair of methods works the same way, but they force the CompletableFuture to complete exceptionally: completeExceptionally(throwable) and obtrudeExceptionally(throwable). The first one throws an unchecked exception if the CompletableFuture has not completed, and the second one forces the CompletableFuture to change its state.

     

    How to Create a CompletableFuture

     

    There are several patterns for creating CompletableFutures.

     

    Creating a Completed CompletableFuture

     

    The first pattern presented here creates a CompletableFuture that is already completed. It might seem odd to create such a Future, but it can be very useful in a testing environment.

     

    CompletableFuture<String> cf =
         CompletableFuture.completedFuture("I'm done!");
    cf.isDone(); // return true
    cf.join();   // return "I'm done"
    

     

    Creating a CompletableFuture from a Task

     

    When it is passed the first pattern, a CompletableFuture can be built on two kinds of tasks: a Runnable, which does not take any argument and does not return anything, and a Supplier, which also takes no argument and which produces an object. In both cases, it is possible to pass an Executor to set the pool of threads that will execute this task.

     

    There are two patterns for each task:

     

    CompletableFuture<Void> cf1 =
         CompletableFuture.runAsync(Runnable runnable);
    CompletableFuture<T> cf2 =
        CompletableFuture.supplyAsync(Supplier<T> supplier);
    

     

    If no ExecutorService is supplied, the tasks will be executed in the common fork/join pool, the same pool that is used for the parallel execution of streams.

     

    Runnable runnable = () -> {
           System.out.println("Executing in " +
    Thread.currentThread().getName());
           };
    
    ExecutorService executor = Executors.newSingleThreadExecutor();
    
    CompletableFuture<Void> cf =
    CompletableFuture.runAsync(runnable, executor);
    cf.thenRun(() -> System.out.println("I'm done"));
    
    executor.shutdown();
    

     

    The result of the execution of the code above is the following:

     

    Executing in pool-1-thread-1
    I'm done
    

     

    In this case, the Runnable is executed in the thread pool executor we created.

     

    Building a CompletableFuture Chain

     

    As we saw in the introduction of this article, a CompletableFuture is an element of a chain. We saw in the previous section how to create the first element of such a chain from a task (a Runnable or a Supplier). Now let's see now how we can chain others tasks to this one. In fact, we already saw a first hint of this in our previous example.

     

    Tasks for an Element of the Chain

     

    This first task is modeled by a Runnable or a Supplier, two functional interfaces (you could say functions), which do not take any argument and which might or might not return something.

     

    The second element of the chain and the further ones could take the result of the previous element, if there is one. So we need different functions on which to build those elements. Let's try to understand the ones we need.

     

    The previous element of the chain might or might not produce a result. So our functions should take one object or no object. This element might or might not produce a result. So our functions should return one object or no object. That makes four possible cases. Among those four possible functions, the one that does not take any result and produces a value should be discarded, because it is not an element of a chain; it is the starting point of a chain, which we already saw in the previous section.

     

    Table 1 shows the result, with the name of the functions used in the CompletableFuture API.

     

    Table 1. Four possible functions

     

    Takes a Parameter?Returns voidReturns R
    Takes TConsumer<T>Function<T, R>
    Does not take anythingRunnableNot an element of a chain

     

    Types of Chaining

     

    Now that we have a good idea of the tasks the API supports, let's examine what chaining means. We assumed so far that chaining is about triggering a task on the result of another task, passing the result of the first one as a parameter to the second one. This is the basic one-to-one chaining.

     

    We can also compose the elements instead of chaining them. This makes sense only for tasks that take the result of the previous task and provide an object wrapped in another CompletableFuture. This is once more a one-to-one relation (not chaining, because this is composition).

     

    But we can also build a tree-like structure, where a task is triggered on two upstream tasks instead of one. We can imagine a combination of the two provided results, or a situation in which the current element is triggered on the first upstream element, which can provide a result. Both cases make sense, and we will see examples for them.

     

    Choosing an ExecutorService

     

    At last, we want to be able to decide what ExecutorService (that is, pool of threads) is going to execute our tasks. There are many cases in which we want to have a say on this:

     

    • One of our tasks might be updating a graphical user interface. In that case, we want it to be run in the human-machine interface (HMI) thread. This is the case in Swing, JavaFX, and Android.
    • We might have I/O tasks or computation tasks that need to be executed in specialized pools of threads.
    • We might have visibility issues in our variables that need a further task to be executed in the same thread as the first one.
    • We might want to execute this task asynchronously, in the default fork/join pool.

     

    In all these cases, we might have to pass an ExecutorService as a parameter.

     

    Lots of Methods to Implement

     

    That makes a lot of methods in the CompletableFuture class! Three types of tasks, times four types of chaining and composition, times three ways of specifying which ExecutorService we want this task to be run. We have 36 methods to chain tasks. This is probably one of the elements that make this class complex: the high number of available methods.

     

    Seeing them one by one would be extremely tedious, so let's have a look at a selection of them.

     

    Selection of Patterns

     

    Here are descriptions of some of the available patterns.

     

    Some One-to-One Patterns

     

    In this case, from the first CompletableFuture, we create a CompletableFuture that will execute its task when the first one is completed.

     

    CompletableFuture<String> cf1 =
    CompletableFuture.supplyAsync(() -> "Hello world");
    CompletableFuture<String> cf2 =
    cf1.thenApply(s -> s + " from the Future!");
    

     

    There are three "then-apply" methods. All of them take a function as a parameter, which takes the result of the upstream element of the chain, and produces a new object from it.

     

    We can add one step to our chain. This time, our call takes a Consumer<String> as a parameter and does not produce any result.

     

    CompletableFuture<Void> cf3 =
    cf2.thenAccept(System.out::println);
    

     

    Let's add one last step to this chain. This last call takes a Runnable as a parameter—a function that does not take any parameter—and produces no result.

     

    CompletableFuture<Void> cf4 =
    cf3.thenRun(() -> System.out.println("Done processing this chain");
    

     

    The way the names of these methods are built is very clear: then, followed by the name of the method of the function (functional interface) we take as a parameter (run for Runnable, accept for Consumer, and apply for Function). All these methods execute their tasks in the same pool of threads as the upstream task.

     

    Then, these methods can take a further suffix: async. An async method executes its task in the default fork/join pool, unless it takes an Executor, in which case, the task will be executed in this Executor.

     

    We could have written cf4 in this way:

     

    CompletableFuture<Void> cf4 =
    cf3.thenRunAsync(() -> ...);
    

     

    In this case, the provided Runnable would have been executed in the common fork/join pool.

     

    Some Two-to-One Combining Patterns

     

    Combining patterns are patterns in which the task we write takes the results of two upstream tasks. Two functions can be used in this case: BiFunction and BiConsumer. It is also possible to execute a Runnable in these patterns. Table 2 shows the three base methods.

     

    Table 2. Three base methods for two-to-one combining patterns

     

     

    MethodDescription
    <U, R> CompletionStage<R>
      thenCombine(CompletionStage<U> other, BiFunction<T, U, R> action)
    Combines the result of this and other in one, using a BiFunction
    <U> CompletionStage<Void>
    thenAcceptBoth(CompletionStage<U> other, BiConsumer<T, U> action)
    Consumes the result of this and other, using a BiConsumer
    <U> CompletionStage<Void>
    runAfterBoth(CompletionStage<U> other, BiConsumer<T, U> action)
    Triggers the execution of a Runnable on the completion of this and other

     

    These methods can also take an async suffix, which has the same semantics as the set of methods of the previous section.

     

    Some Two-to-One Selecting Patterns

     

    This last category of patterns also contains two-to-one patterns. But this time, instead of executing the downstream element once, the two upstream elements are completed, and the downstream element is executed when one of the two upstream elements is completed. This might prove very useful when we want to resolve a domain name, for instance. Instead of querying only one domain name server, we might find it more efficient to query a group of domain name servers. We do not expect to have different results from the different servers, so we do not need more answers than the first we get. All the other queries can be safely canceled.

     

    This time, the patterns are built on one result from the upstream element, because we do not need more. These methods have the either key word in their names. The combined elements should produce the same types of result, because only one of them will be selected.

     

    Table 3. Three base methods for two-to-one selecting patterns

     

     

    MethodDescription
    <R> CompletionStage<R>
    applyToEither(CompletionStage<T> other, Function<T, R> function)
    Selects the first available result from this and other, and applies the function to it
    <R> CompletionStage<R>
    acceptEither(CompletionStage<T> other, Consumer<T> consumer)
    Selects the first available result from this and other, and passes it to the consumer
    <R> CompletionStage<R>
    runAfterEither(CompletionStage<T> other, Runnable action)
    Runs the provided action after the first result from this and other have been made available

     

    These methods can also take an async suffix, which has the same semantics as the set of methods from the previous section.

     

    Examples

     

    Let's look at a couple of examples.

     

    Testing a Long-Running Call in Jersey

     

    Let's consider the following code, extracted from the Jersey documentation.

     

    @Path("/resource")
    public class AsyncResource {
    
          @Inject
          private Executor executor;
    
          @GET
          public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
    
               executor.execute(() -> {
                  String result = longOperation();
                  asyncResponse.resume(result);
             });
         } 
    }
    

     

    This code is a basic REST service, which is calling an expensive operation. The classical way to deal with this is to call this long operation in another thread asynchronously. This method does not explicitly generate a response; it is the Jersey implementation that does that.

     

    The problem we face here is the following: how can we unit-test this method? Testing the longOperation() call in itself is not an issue: we can unit-test this method separately. What we need to test here is that the result object is correctly passed to the resume() method of the asyncResponse object. This can easily be done with a mock object framework, such as Mockito, for instance. The problems we are facing are the following:

     

    • The executor.execute() call is executed in the "main" thread.
    • But the asyncResponse.resume() call is executed asynchronously in another thread, at a time in the future that we do not know.

     

    What we need in our test is some kind of a callback that is going to be called once the asyncResponse.resume() is called, so that we can test our mocks. The test of our mocks looks like the following:

     

    Mockito.verify(mockAsyncResponse).resume(result);
    

     

    We need to run this simple code:

     

    • Once the resume() method has been called
    • If possible, in the same thread as the one that executed the resume() call; that way, we are sure that we will not have any concurrent issues (especially visibility) in our mocks

     

    This is where the CompletionStage framework comes to the rescue! Instead of passing the Runnable to the executor.execute() method, we create a CompletionStage with it.

     

    This pattern:

     

    executor.submit(() -> {
               String result = longOperation();
               asyncResponse.resume(result);
           });
    

     

    Becomes the following:

     

    CompletableFuture<Void> completableFuture =
    CompletableFuture.runAsync(() -> {
               String result = longOperation();
               asyncResponse.resume(result);
           }, executor);
    

     

    And because a CompletionStage can trigger other tasks, we can add this code to our test:

     

    completableFuture
        .thenRun(() -> {
                                       Mockito.verify(mockAsyncResponse).resume(result);
        }
    );
    

     

    This code does exactly what we need:

     

    • It is triggered by the completion of the Runnable of the previous CompletionStage.
    • It is executed in the same thread.

     

    To implement this solution, we need to create a second public method in our Jersey class, which returns this CompletableFuture. If we modify the return type of a Jersey method, Jersey will try to build a response with this return type, converting it to XML or JSON. This will not work very well with a CompletableFuture.

     

    The complete testing pattern is thus the following:

     

    1. Create and train our mocks:

     

    String result = Mockito.mock(String.class);
    AsyncResponse response = Mockito.mock(AsyncResponse.class);
    Runnable train =  () -> {
        Mockito.doReturn(result).when(response).longOperation();
    }
    Runnable verify = () -> Mockito.verify(response).resume(result);
    

     

    2. Create the call and verify task:

     

    Runnable callAndVerify = () -> {
        asyncResource.executeAsync(response).thenRun(verify); }
    

     

    3. Then create the task to be tested:

     

    ExecutorService executor = Executors.newSingleThreadExecutor();
    AsyncResource asyncResource = new AsyncResource();
    asyncResource.setExecutorService(executor);
    CompletableFuture
        .runAsync(train, executor)
        .thenRun(callAndVerify);
    

     

    Because this is a unit test, we might want to make it fail if the response is not seen after a given amount of time. We can do that with the get() method from Future implemented in CompletableFuture.

     

    Analyzing the Links of a Web Page Asynchronously

     

    Let's write some asynchronous code that will automatically analyze the links of a web page and display them in a Swing panel.

     

    We want to do the following:

     

    1. Read the content of the web page.
    2. Then get the links from this page.
    3. Then display them in a Swing panel.

     

    Remember that modifying a Swing component should be done from the proper thread, and we certainly do not want to run long tasks in this thread.

     

    The complete pattern is simple:

     

    CompletableFuture.supplyAsync(
        () -> readPage("http://whatever.com/")
    )
    .thenApply(page -> linkParser.getLinks(page))
    .thenAcceptAsync(
        links -> displayPanel.display(links),
        executor
    );
    

     

    The first step is to create a Supplier that is executed asynchronously. It returns the content of the web page as a String (for instance).

     

    Then the second step gets this page and passes it to the link parser. It is a function that returns a List<String> (for instance). These two first tasks are executed in the same thread. We could change that in case we have a first pool of threads dedicated to I/O operations and a second one for CPU operations.

     

    Then, the last step just takes the list of links and displays it. Now this task accesses a Swing component, so it should be executed in the Swing thread. So we pass the right executor as a parameter to do that.

     

    The nice thing is this: the Executor interface is a functional interface. We can implement it with a lambda:

     

    Executor executor = runnable -> SwingUtilities.invokeLater(runnable);
    

     

    We can leverage the method-reference syntax to write the final version of this pattern:

     

    CompletableFuture.supplyAsync(
        () -> readPage("http://whatever.com/")
    )
    .thenApply(Parser::getLinks)
    .thenAcceptAsync(
        DisplayPanel::display,
        SwingUtilities::invokeLater
    );
    

     

    CompletableFutures along with lambdas and method references allow for the writing of very elegant patterns.

     

    Exception Handling

     

    The CompletionStage API also exposes exception handling patterns. Let's see that in an example.

     

    Suppose we have the processing chain shown in Figure 1.

     

    f1.png

    Figure 1. Processing chain

     

    All these CompletableFutures are chained together using the patterns we have seen in the previous sections.

     

    Suppose now that CF21 raises an exception. If nothing has been written to handle this exception, all the downstream CompletableFutures are in error. This means two things:

     

    • The call to isCompletedExceptionally() returns true on the CF21, CF31, and CF41 CompletableFutures.
    • The call to get() on these objects throws an ExecutionException, which causes the root exception raised by CF21.

     

    We can handle exceptions in CompletableFutures chains using the pattern shown in Figure 2.

     

    cf30 = cf21.exceptionally();
    

     

    f2.png

    Figure 2. Pattern to handle exceptions

     

    This pattern creates a CompletableFuture that has the following properties:

     

    • If CF21 completes normally, then CF30 returns the same value as CF21, transparently.
    • If CF21 raises an exception, then CF30 is able to catch it, and can transmit a normal value to CF31.

     

    There are several methods to do that, with different ways of accepting the exception.

     

    The exceptionally(Function<Throwable, T> function) call is the simplest one. It returns a CompletionStage that will complete normally if the upstream CompletionStage also completes normally. The result returned is the same as the result of the upstream  CompletionStage. On the other hand, if this upstream CompletionStage raises an exception, this exception is passed to the provided function. The returned CompletionStage then completes normally returning the result of the provided function. There is no asynchronous version of this method.

     

    The handle(BiFunction<T, Throwable, R> bifunction) call has the same semantics. It returns a CompletionStage that completes normally with the result of the provided bifunction. This bifunction is called with a null exception if the upstream CompletionStage completes normally, or it is called with a null result if it completes exceptionally. In both cases, the returned CompletionStage completes normally. This method has two sister methods called handleAsync(). These two methods work the same way, but asynchronously, in another executor. This executor can be provided as a parameter. If it isn't, then the common fork/join pool is used.

     

    The third method that can handle exceptions is whenComplete(BiConsumer<T, Throwable> biconsumer). Whereas handle() returns a CompletionStage that completes normally, the CompletionStage returned by whenComplete() does not. It follows the behavior of the CompletionStage it is built on. So if the upstream CompletionStage completes exceptionally, the CompletionStage returned by whenComplete() also completes exceptionally. The provided biconsumer is called with the returned value of the upstream  CompletionStage and its returned value. As in the handle() case, one of these two objects is null. The biconsumer is just there to consume those values; it does not return anything. So it is merely a callback that does not interfere in the processing pipeline of  CompletionStages. As for the handle() method, this method also has two sister methods called whenCompleteAsync(). These two methods work asynchronously, either in the common fork/join pool or in the provided executor.

     

    Conclusion

     

    The CompletionStage interface and CompletableFuture class bring new ways of processing data asynchronously. This API is quite complex, mainly due to the number of methods exposed by the new interface and the new class, but that makes this API very rich with lots of opportunities for finely tuning asynchronous data processing pipelines to perfectly suit the needs of your applications.

     

    This API is built on lambda expressions, leading to very clean and very elegant patterns. It gives a fine control for which thread should execute each task. It also allows the chaining and composition of tasks in a very rich way, and it has a very clean way of handling exceptions.

     

    See Also

     

     

    About the Author

     

    José Paumard is an assistant professor at the Institut Galilée (Université Paris 13), and has a PhD in applied mathematics from the ENS de Cachan. He has been teaching about Java technologies at the university since 1998. He has also worked as an independent consultant for twenty years and is a well-known expert Java/Java EE/software craftsman and trainer. Paumard gives talks at conferences, including JavaOne and Devoxx. He also writes technical articles for various media including Java Magazine and Oracle Technology Network. Passionate about education, he publishes massive open online courses (MOOC) for several companies, for example, for Oracle Virtual Technology Summit, PluralSight, Microsoft Virtual Academy, and Voxxed. He also is a member of the Java community in Paris, has been one of the lead members of the Paris JUG for six years, and is cofounder of Devoxx France. Follow him @JosePaumard.

     

    Join the Conversation

     

    Join the Java community conversation on Facebook, Twitter, and the Oracle Java Blog!