Reactive Programming with JDK 9 Flow API

Version 6

    Reactive programming is about processing an asynchronous stream of data items, where applications react to the data items as they occur. This article by Rahul Srivastava presents an example using the JDK 9 Flow API.


     

    By Rahul Srivastava

     

    What is Reactive Programming ?

     

    Reactive programming is about processing an asynchronous stream of data items, where applications react to the data items as they occur. A stream of data is essentially a sequence of data items occurring over time. This model is more memory efficient because the data is processed as streams, as compared to iterating over the in-memory data.

     

    In the Reactive Programming model, there is a Publisher and a Subscriber. The Publisher publishes a stream of data, to which the Subscriber is asynchronously subscribed.

     

    The model also provides a mechanism to introduce higher order functions to operate on the stream by means of Processors. Processors transform the data stream without the need for changing the Publisher or the Subscriber. The Processor (or a chain of Processors) sit between the Publisher and the Subscriber to transform one stream of data to another. The Publisher and the Subscriber are independent of the transformation that happen to the stream of data.

     

    push.png

     

     

    Why Reactive Programming ?

    • Simpler code, making it more readable.
    • Abstracts away from boiler plate code to focus on business logic.
    • Abstracts away from low-level threading, synchronization, and concurrency issues.
    • Stream processing implies memory efficient
    • The model can be applied almost everywhere to solve almost any kind of problem.

     

     

    JDK 9 Flow API

     

    The Flow APIs  in JDK 9 correspond to the Reactive Streams Specification, which is a defacto standard. The Reactive Streams Specification is one of the initiatives to standardize Reactive Programming. Several  implementations already support the Reactive Streams Specification.

     

    The Flow API (and the Reactive Streams API), in some ways, is a combination of ideas from Iterator and Observer patterns. The Iterator is a pull model, where the application pulls items from the source. The Observer is a push model, where the items from the source are pushed to the application. Using the Flow API, the application initially requests for N items, and then the publisher pushes at most N items to the Subscriber. So its a mix of Pull and Push programming models.

     

    pull-push.png

     

     

    The Flow API Interfaces (At a glance)

     

    @FunctionalInterface 
    public static interface Flow.Publisher<T> {
        public void    subscribe(Flow.Subscriber<? super T> subscriber);
    } 
    
    public static interface Flow.Subscriber<T> {
        public void    onSubscribe(Flow.Subscription subscription);
        public void    onNext(T item) ;
        public void    onError(Throwable throwable) ;
        public void    onComplete() ;
    } 
    
    public static interface Flow.Subscription {
        public void    request(long n);
        public void    cancel() ;
    } 
    
    public static interface Flow.Processor<T,R>  extends Flow.Subscriber<T>, Flow.Publisher<R> {
    }
    

     

     

    The Subscriber

     

    The Subscriber subscribes to the Publisher for the callbacks. Data items are not pushed to the Subscriber unless requested, but multiple items may be requested. Subscriber method invocations for a given Subscription are strictly ordered. The application can react to the following callbacks, which are available on the subscriber.

     

     

    Callback

    Description

    onSubscribe

    Method invoked prior to invoking any other Subscriber methods for the given Subscription.

    onNext

    Method invoked with a Subscription's next item.

    onError

    Method invoked upon an unrecoverable error encountered by a Publisher or Subscription, after which no other Subscriber methods are invoked by the Subscription.


    If a Publisher encounters an error that does not allow items to be issued to a Subscriber, that Subscriber receives onError, and then receives no further messages.

    onComplete

    Method invoked when it is known that no additional Subscriber method invocations will occur for a Subscription that is not already terminated by error, after which no other Subscriber methods are invoked by the Subscription.


    When it is known that no further messages will be issued to it, a subscriber receives onComplete.

     

     

    Sample Subscriber

     

    import java.util.concurrent.Flow.*;
    ...
    
    public class MySubscriber<T> implements Subscriber<T> {
      private Subscription subscription;
    
      @Override
      public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded
      }
    
      @Override
      public void onNext(T item) {
        System.out.println("Got : " + item);
        subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded
      }
    
      @Override
      public void onError(Throwable t) {
        t.printStackTrace();
      }
    
      @Override
      public void onComplete() {
        System.out.println("Done");
      }
    }
    

     

     

    The Publisher

     

    The publisher publishes the stream of data items to the registered subscribers. It publishes items to the subscriber asynchronously, normally using an Executor. Publishers ensure that Subscriber method invocations for each subscription are strictly ordered.

     

    Example publishing a stream of data items to Subscribers using JDK's SubmissionPublisher

     

    import java.util.concurrent.SubmissionPublisher;
    ...
        //Create Publisher
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    
        //Register Subscriber
        MySubscriber<String> subscriber = new MySubscriber<>();
        publisher.subscribe(subscriber);
    
        //Publish items
        System.out.println("Publishing Items...");
        String[] items = {"1", "x", "2", "x", "3", "x"};
        Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
        publisher.close();
    

     

     

    The Subscription

     

    Links a Flow.Publisher and Flow.Subscriber. Subscribers receive items only when requested, and may cancel at any time, via the Subscription.

     

     

    Method

    Description

    request

    Adds the given number of n items to the current unfulfilled demand for this subscription.

    cancel

    Causes the Subscriber to (eventually) stop receiving messages.

     

     

    The Processor

     

    A component that acts as both a Subscriber and Publisher. The processor sits between the Publisher and Subscriber, and transforms one stream to another. There could be one or more processor chained together, and the result of the final processor in the chain, is processed by the Subscriber. The JDK does not provide any concrete Processors so it is left upto the individual to write whatever processor one requires.

     

    Sample Processor to transform String to Integer

     

    import java.util.concurrent.Flow.*;
    import java.util.concurrent.SubmissionPublisher;
    ...
    
    public class MyTransformProcessor<T,R> extends SubmissionPublisher<R> implements Processor<T, R> {
    
      private Function function;
      private Subscription subscription;
    
      public MyTransformProcessor(Function<? super T, ? extends R> function) {
        super();
        this.function = function;
      }
    
      @Override
      public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
      }
    
      @Override
      public void onNext(T item) {
        submit((R) function.apply(item));
        subscription.request(1);
      }
    
      @Override
      public void onError(Throwable t) {
        t.printStackTrace();
      }
    
      @Override
      public void onComplete() {
        close();
      }
    }
    

     

     

    Sample code to transform data stream using processor

     

    import java.util.concurrent.SubmissionPublisher;
    ...
    
        //Create Publisher
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
    
        //Create Processor and Subscriber
        MyFilterProcessor<String, String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));
    
        MyTransformProcessor<String, Integer> transformProcessor = new MyTransformProcessor<>(s -> Integer.parseInt(s));
    
        MySubscriber<Integer> subscriber = new MySubscriber<>();
    
        //Chain Processor and Subscriber
        publisher.subscribe(filterProcessor);
        filterProcessor.subscribe(transformProcessor);
        transformProcessor.subscribe(subscriber);
    
        System.out.println("Publishing Items...");
        String[] items = {"1", "x", "2", "x", "3", "x"};
        Arrays.asList(items).stream().forEach(i -> publisher.submit(i));
        publisher.close();
    

     

     

    Back pressure

    Back pressure is built when Publishers are producing at a much faster rate than the rate at which the data items are consumed by the Subscribers. The size of the buffer where the unprocessed items are being buffered might be restricted. The Flow API does not provide any APIs to signal or deal with back pressure as such, but there could be various strategies one could implement by oneself to deal with back pressure. See how RxJava deals with back pressure.

     

     

    Summary

    Adding Reactive Programming API to JDK 9 is a good start. Many other products have also started to offer Reactive Progamming API to access their functionality. Though the Flow API allows programmers to start writing reactive programs, the eco system still has to evolve.

     

    For example, a reactive program may still end up accessing DB using traditional APIs, maybe because not all DBs support API for Reactive Programming.  – i.e. the APIs a reactive program may depend on, might not support reactive programming model yet.

     

     

    References

     

     

    About the Author

    Rahul Srivastava is an ex-committer for the Xerces2-J project at Apache. He is currently Principal Member of the Technical Staff for the Application Server development team at Oracle.