Concurrency on the JVM: Beyond Thread.start()

Version 3

    by Tomasz Nurkiewicz

     

    Several techniques for running programs concurrently and tools for asynchronous programming

     

    Despite popular belief, Moore's law is still holding. However, due to physical limits hardware manufacturers are no longer increasing the frequency of single-core CPUs. Instead we see more cores and more CPUs, even on mobile devices.

     

    Java is not behind with respect to this trend. In this article, we will briefly look at a wide range of parallelization options available in Java and other Java Virtual Machine (JVM) languages. We will explore, through examples, new features of Java SE 8, such as CompletableFuture and parallel streams. In addition, we will explore the actor model in Scala and software transactional memory and agents in Clojure. Finally, we'll explore the GPars library for Groovy. We will also quickly browse the tools that are available these days for asynchronous programming.

     

    The structure of the article will roughly follow the simplest and low-level solutions to the most comprehensive and challenging implementations. The libraries and frameworks are by no means similar or interchangeable with each other. Also keep in mind that no library will ever solve every concurrency problem; all of them have their shortcomings.

     

    ExecutorService

     

    ExecutorService was introduced in Java SE 5.0, one of the most groundbreaking Java updates when it comes to concurrency. Despite being more than a decade old, many developers still face issues when using this handy interface. Listing 1 shows a quick example:

     

    ExecutorService pool = new ThreadPoolExecutor(10, 10,         0L, TimeUnit.MILLISECONDS,         new ArrayBlockingQueue<>(1_000));  Callable<Integer> calculateTask = () -> //... Callable<String> loadTask = () -> //...  Future<Integer> intFuture = pool.submit(calculateTask); Future<String> strFuture = pool.submit(loadTask);

     

    Listing 1. ExecutorService example

     

    Notice how I created ThreadPoolExecutor from scratch, rather than using the Executors.newFixedThreadPool(10) factory method. This more verbose expression allows me to replace the default unbounded LinkedBlockingQueue with the bound ArrayBlockingQueue.

     

    This is just one of many traps you can come across with ExecutorService. Others include incorrect exception handling, self-deadlock, and improper shutdown. Still, ExecutorService should be your first choice when it comes to running multiple independent tasks concurrently.

     

    Parallel Streams

     

    Parallel streams in Java SE 8 are almost as exciting as lambda expressions. Simply change stream() to parallelStream() and you have multithreaded filtering over a large collection, as shown in Listing 2:

     

    veryLargeList.parallelStream()         .filter(s -> s.startsWith("Lorem"))         .findAny();

     

    Listing 2. Parallel streams example

     

    The code in Listing 2 is extremely clever: it splits veryLargeList into multiple chunks and starts filtering them independently and concurrently. Due to findAny(), if a matching element is found in any of the chunks, computation is eagerly interrupted and the result is returned as soon as it's available.

     

    Parallel streams allow seamless parallelization without much effort, but they come with their own set of drawbacks. Notice how I emphasized that the list is very large. The cost of context switching and coordination between threads will most likely diminish the benefits of multithreading—the size where it starts to pay off is totally dependent on your problem. Also, there is no official way of supplying your own underlying thread pool, so you have little control over the concurrency level.

     

    CompletableFuture

     

    The Future<T> type was introduced together with ExecutorService in Java SE 5.0. However, a bare Future is blocking and useless if we want to write efficient, nonblocking and scalable code. scala.concurrent.Future and even JavaScript proved that a more comprehensive API is needed. Java developers had to fall back to Guava with its ListenableFuture until CompletableFuture API was designed.

     

    CompletableFuture implements the Future interface by adding a few dozen useful methods. Most of them are nonblocking and event-driven, meaning you can simply register callbacks whenever Future is completed. However, most of these methods return CompletableFuture, so you can chain and combine Futures together in a nonblocking fashion.

     

    Let's take the simple example in Listing 3. We have a bunch of methods that load a user, track the user's device, build recommendations, and guess at the best recommendation:

     

    User loadUser(UUID id)  //...  GeoLocation trackDevice()  //...  Recommendations buildRecommendations(User user, GeoLocation geo)  //...  CompletableFuture<String> guessBest(Recommendations recommendations)  //...

     

    Listing 3. CompletableFuture example

     

    We would like to parallelize as much as possible without complex locks, semaphores, and latches, thereby orchestrating background operations. Take a moment to study the code in Listing 4:

     

    CompletableFuture<User> userFuture =          CompletableFuture.supplyAsync(() -> loadUser(uuid), executorService);  CompletableFuture<GeoLocation> geoFuture =          CompletableFuture.supplyAsync(this::trackDevice);  CompletableFuture<Recommendations> recommendFuture =         userFuture.thenCombine(geoFuture,                 (User user, GeoLocation geo) ->                         buildRecommendations(user, geo));  CompletableFuture<String> bestFuture =          recommendFuture.thenCompose(                 (Recommendations r) -> guessBest(r));

     

    Listing 4. Example of declaratively building a pipeline of asynchronous computations with CompletableFuture

     

    The code in Listing 4 spawns a background task of loading a user by uuid and runs it within executorService. The CompletableFuture.supplyAsync() is analogous to ExecutorService.submit(); however, it returns CompletableFuture rather than a pure Future.

     

    Obviously calling supplyAsync() alone doesn't block; it just starts the background computation. geoFuture is created in a similar way, but this time we don't specify the thread pool explicitly and, instead, we rely on the JVM default pool (which is not a good idea most of the time).

     

    The pleasure begins here. We have two Futures running concurrently. We can now combine them and run another operation when both are done. Notice how thenCombine delivers User and GeoLocation to the lambda expression. Pay attention to types: the lambda returns a Recommendations object but the whole thenCombine expression yields CompletableFuture<Recommendations>. This suggests that thenCombine is nonblocking as well, which happens to be the case. In other words, we took two asynchronous tasks and declaratively said that when both of them are done, we'll run a third task and have a background computation for that third task.

     

    Last, but not least, we take CompletableFuture<Recommendations> and compose it with the guessBest() method that already returns CompletableFuture<String>. The JDK will chain these two Futures and immediately return another Future that finishes when the chain of asynchronous operations is done.

     

    CompletableFuture allows us to declaratively build pipelines of asynchronous computations with forks (concurrent tasks) and joins (waiting for the results of multiple forks). CompletableFuture is the JDK's answer to reactive programming. Obviously, using Futures all the way down will look and feel more complex (especially during debugging), but in return we get more-robust code.

     

    ForkJoinPool

     

    ForkJoinPool, introduced in Java SE 7, is an extension to the ExecutorService abstraction that provides a better API for tasks that are dependent on each other. While ExecutorService excels when individual tasks are independent (dependencies can even lead to deadlock), ForkJoinPool works great for divide-and-conquer algorithms.

     

    If you have a big task that can be split into multiple pieces that work independently (the fork stage) and later need to merge the results back (the join stage), ForkJoinPool is heavily optimized for that. Moreover, you can even split already divided tasks, forming a deep execution tree. Think about a recursive file system search with each directory being analyzed concurrently and the results being propagated up. Or, consider a parallel sort algorithm that sorts the two halves of a list individually and then merges the results during the join phase.

     

    Let's see an example of a recursive file tree traversal. Please keep in mind that there are better tools for that, for example, Files.walkFileTree(). Listing 5 is an example just for educational purposes.

     

    final ForkJoinPool fp = new ForkJoinPool(4); final List<Path> matching = fp.invoke(         new RecursiveSearch(Paths.get("/home/user")) );  //-----------------------------------  class RecursiveSearch extends RecursiveTask<List<Path>> {      private final Path basePath;      RecursiveSearch(Path basePath) {         this.basePath = basePath;     }      @Override     protected List<Path> compute() {         try (DirectoryStream<Path> children = Files.newDirectoryStream(basePath, Files::isDirectory)) {             List<ForkJoinTask<List<Path>>> subTasks = StreamSupport                     .stream(children.spliterator(), false)                     .map(child -> new RecursiveSearch(child).fork())                     .collect(toList());             List<Path> descendants = subTasks.stream()                     .flatMap(task -> task.join().stream())                     .collect(toList());             return ImmutableList.<Path>builder()                     .addAll(descendants)                     .add(basePath)                     .build();         } catch (IOException e) {             throw new RuntimeException(e);         }     } }

     

    Listing 5. Example recursive file tree traversal

     

    We first submit a single task to ForkJoinPool, asking to scan the /home/user directory. This task spawns subtasks (fork()) and waits for all of them to finish (join()). Each subtask returns a list of visited directories. Naive implementation using ExecutorService would cause deadlock, because higher-level tasks would block child tasks, occupying threads. However, ForkJoinPool employs a modern technique called work stealing, which is beyond the scope of this article. It is enough to say that when a parent task is blocked inside join(), rather than waiting idle, it can "steal" other pending tasks, more effectively using the thread pool.

     

    RxJava

     

    RxJava is an extremely powerful open source library that extends the Future API to multiple values. The main abstraction is Observable<T>—an asynchronous, nonblocking stream of events of type T. It turns out that many problems can be effectively modeled as Observables: for example, incoming messages from a broker, GUI events, and file system changes. Using APIs encapsulating such systems tends to be cumbersome, typically leading to the so-called callback hell.

     

    RxJava's declarative API allows you to combine and compose streams of events almost without worrying about the underling threading issues. Imagine we have an API that notifies us about every Twitter status update. Without RxJava, such an API would be either blocking or fed with callbacks. Listing 6 is how such a hypothetical library could look like:

     

    Observable<Tweet> tweets = allTweets();  Observable<List<String>> batches = tweets         .filter(tweet -> tweet.hasHashTag("#java"))         .sample(100, TimeUnit.MILLISECONDS)         .map(Tweet::contents)         .buffer(10);

     

    Listing 6. Example API for notifying us of Twitter updates

     

    This code looks almost as if we were dealing with ordinary collections. First, we create an instance of Observable<Tweet> that abstracts the stream of tweets. Later, we filter selected tweets only, take a sample every 100 milliseconds (the last tweet in that time window), transform each event into its contents (map) and, finally, batch tweets in groups of ten.

     

    You must understand that every such transformation (known as an operator in RxJava) is fully asynchronous. RxJava doesn't wait for an event but lazily applies the operator once events arrive. Moreover, the code in Listing 6 will not trigger any operator until someone actually subscribes to the stream, that is, until there is someone who wants to consume.

     

    Also keep in mind that Observable is immutable; every operator returns a new instance, without affecting downstream Observables. Last, but not least, Observables are type-safe. For example, if you remove the last buffer(10) operator, the expression type would be Observable<String>.

     

    GPars

     

    GPars is a Swiss Army knife of concurrency libraries. It implements several popular idioms in concurrent programming under one open source umbrella in the popular and dynamic language Groovy. Agents, actors, and software transactional memory are available, but because these will be covered later, let's focus on yet another provided concurrency idiom: dataflow concurrency.

     

    This model uses dataflow variables, which are similar to Java Futures. You can write (store, bind) to a dataflow variable once and read it multiple times. And, importantly, threads (tasks) trying to read from a yet unbound variable will wait. This means changes in dataflow variables propagate downstream deterministically.

     

    In Listing 7, you'll find three tasks executing concurrently. Loading a customer and tracking her address happens concurrently. But the first task has to wait for these two to complete. The moment we explicitly require a value (for example, customerVar.val), we must wait until something (another task) binds that variable (customerVar << loadById(42)).

     

    final DataflowVariable<Customer> customerVar = new DataflowVariable<Customer>() final DataflowVariable<Address> addressVar = new DataflowVariable<Address>() final DataflowVariable<Recommendation> recommendationVar = new DataflowVariable<Recommendation>()  task {     Customer customer = customerVar.val     Address address = addressVar.val     recommendationVar << prepareRecommendation(customer, address) }  task {     customerVar << loadById(42) }  task {     addressVar << track() }  println recommendationVar.val

     

    Listing 7. Example of three tasks executing concurrently

     

    Tasks (task {...}) surrounding snippets of code are so-called green threads, and they do not necessarily correspond to native JVM threads. It's again worth noting that GPars implements many other idioms, but we will explore them in dedicated implementations.

     

    Agents

     

    Agents encapsulate a single immutable value (state) and allow atomic read/write access. Thus, they are very similar to the Atomic* variables known in Java. However, agents are asynchronous by design. The only way to modify an agent's state is by asynchronously sending a mutating function.

     

    Conceptually, an agent consists of a state variable and a thread-safe queue of pending state change functions. When an arbitrary thread sends a function to an agent, this function is applied to agent's state and its result replaces the old state with the new state. Agents are thread-safe; all modifications are sequenced and applied in order, one after another.

     

    The synthetic example in Listing 8 first creates an agent with an empty list inside. Then it sends a function prepending integers from 0 to 4 to the current state. However, because (send ...) applies the prepending asynchronously (it schedules the modification only by placing it in a queue), we have no guarantee of what will be printed in the last line. It can be an empty list if no modification was yet performed or it could be 4 3 2 1 0, depending on thread scheduling.

     

    (def ag (agent ()))  (dotimes [i 5]   (send ag #(conj % i)))  (-> ag deref print)

     

    Listing 8. Agent example

     

    The main benefit of agents is that they are thread-safe, and thus easy to understand. There are no race conditions, and modifications are always applied sequentially.

     

    Actors

     

    Akka is the most-known JVM implementation of actors. An actor is an independent object that can be interacted with only by exchanging messages. Every actor can have an internal state and has a mailbox—a queue of pending messages sent to it but not yet handled. In that regard, actors share much similarity with agents. However, agents assume that messages are functions to be applied on the current state, whereas in Akka, we are free to send any type of message and interpret it in any way. Both agents and actors handle one message at a time, so you don't have to synchronize state inside an actor.

     

    As an exercise, let's implement agents using Akka (see Listing 9). Please keep in mind that Akka has its own agents implementation inspired by Clojure, which is not covered in this article.

     

    class AgentActor[T](initial: T) extends Actor {    private[this] var state = initial    override def receive = {     case Apply(fun: (T => T)) =>       state = fun(state)     case Deref =>       sender() ! state   } }

     

    Listing 9. Example of implementing agents using Akka

     

    The AgentActor understands two types of messages: Apply, which mutates the current actor's state, and Deref, which sends the state back to the sender, whoever that is. The only way to interact with this actor is by sending messages to it, as shown in Listing 10:

     

    val system = ActorSystem("Main") val agentActor = system.actorOf(Props(new AgentActor("")))  agentActor ! Apply((s: String) => s + "a") agentActor ! Apply((s: String) => s + "b") agentActor ! Apply((s: String) => s + "c") agentActor ! Apply((s: String) => s.toUpperCase())  implicit val timeout: Timeout = Timeout(1, TimeUnit.SECONDS) agentActor ? Deref foreach println

     

    Listing 10. Example of sending messages to an actor

     

    The code in Listing 10 creates an instance of AgentActor with an initial state of an empty String. Later, we mutate that string by appending a, b, and c. These messages are handled asynchronously, one by one. Additionally, it is guaranteed that if the same actor sends multiple messages to another actor, they will be handled in the same order. That's why sending Deref will always result in printing ABC.

     

    By the way, the construct agentActor ? Deref is called an ask, and it means "send a message to an actor and return a Future that will complete when said actor responds." In our case, the response is sent inside the Deref handler (sender() ! state).

     

    STM

     

    Software transactional memory (STM), as found in Clojure, is probably one of the most comprehensive concurrency models available on the JVM. STM takes the concept of atomic transactions into memory. When multiple variables are modified within a transaction, these changes are either applied fully or not at all (atomicity), and they are not visible to other threads until the transaction is committed (visibility).

     

    There are other interesting properties, for example, a variable read multiple times is guaranteed to have the same value throughout the transaction, even if changes to it were committed in the meantime (repeatable reads, as known from relational databases).

     

    Under the hood, STM uses optimistic locking with a compare-and-set approach. During a commit, all values are updated but if some of them were changed (committed) in the meantime, the transaction is restarted. Therefore, all mutations of transactional variables should be side-effect free. This is similar to AtomicInteger.updateAndGet().

     

    The text book example of STM is a bank with accounts and money transfers. The main goal is to always keep the state of the database consistent and make sure money does not disappear. Obviously, in real world we would use a persistent database, but for educational purposes let's consider the simple list of accounts shown in Listing 11:

     

    (def accounts [             (ref {:name "Alice", :age 20, :balance 100})             (ref {:name "Bob", :age 23, :balance 200})             (ref {:name "Eve", :age 21, :balance 50})             (ref {:name "Jane", :age 24, :balance 0})             ])

     

    Listing 11. Example of a list of bank accounts

     

    Notice that each and every account is wrapped in (ref ...). This means that access to the internal data structure must be guarded by the software transaction.

     

    Let's see how an atomic transfer can be performed. Remember that money should be credited from one account and debited from the other atomically. Either both operations or neither operation must be performed. And, finally, intermediate changes to accounts should not be visible until everything is performed. This requires transactions in relational databases or some complex locks in memory. Traditionally, you would use fine-grained locks, one for each account—risking deadlocks—or you would use a coarse-grained lock covering all accounts, degrading performance. Listing 12 shows how STM deals with that dilemma:

     

    (defn change-balance-by [amount account]   (update-in account [:balance] #(+ % amount)))  (defn transfer-cash [from to amount]   (dosync     (alter from (partial change-balance-by (- amount)))     (alter to (partial change-balance-by amount))))

     

    Listing 12. Example of an atomic transfer with STM

     

    The transfer-cash functions take two accounts and an amount to transfer. The crucial part is dosync: it wraps the software transaction. Inside the transaction, we alter two refs (two transactional resources) by applying a function to the current ref state. When the transaction is about to be committed, STM makes sure the altered refs were not modified in the meantime.

     

    If we were unlucky, the transaction is retried by applying the same functions again on changed refs. At the same time, other transactions are free to read all accounts or even modify other accounts without blocking. So we have the benefits of fine-grained locking without explicit locks. Also, since Clojure's STM implementation is optimistic, assuming conflicts and retries are rare, under normal circumstances it can be very effective. However, under high contention or when refs are too small or too big, STM might become a performance bottleneck.

     

    Conclusion

     

    We explored several techniques for running our programs concurrently. Some were rather low level, while others are still under active research. However, the main point I want you to learn is that there is no silver bullet. None of these idioms is universal and works well for all problems. Choosing the wrong abstraction or library will not only substantially decrease readability and maintainability, but also very likely degrade performance. So, before diving into a particular technology just because it looks cool, benchmark—and proceed only if you get significant throughput improvement.

     

    See Also

     

     

    About the Author

     

    Tomasz Nurkiewicz has spent half of his life programming, the last eight years using Java, and works in the financial sector. He is passionate about alternative JVM languages, often disappointed with the quality of software written these days (so often by himself!), and hates long methods and hidden side effects. He believes that computers were invented so developers can automate boring and repetitive tasks. Currently he is writing an RxJava book for O'Reilly.

     

    Join the Conversation

     

    Join the Java community conversation on Facebook, Twitter, and the Oracle Java Blog!