Reactive Programming with JDK 9 Flow API — oracle-tech

    Forum Stats

  • 3,682,412 Users
  • 2,238,241 Discussions
  • 7,832,016 Comments

Discussions

Reactive Programming with JDK 9 Flow API

Bob Rhubart-OracleBob Rhubart-Oracle Posts: 692
edited September 2016 in Java APIs Discussions

Reactive programming is about processing an asynchronous stream of data items, where applications react to the data items as they occur. This article by Rahul Srivastava presents an example using the JDK 9 Flow API.


By Rahul Srivastava

What is Reactive Programming ?

Reactive programming is about processing an asynchronous stream of data items, where applications react to the data items as they occur. A stream of data is essentially a sequence of data items occurring over time. This model is more memory efficient because the data is processed as streams, as compared to iterating over the in-memory data.

In the Reactive Programming model, there is a Publisher and a Subscriber. The Publisher publishes a stream of data, to which the Subscriber is asynchronously subscribed.

The model also provides a mechanism to introduce higher order functions to operate on the stream by means of Processors. Processors transform the data stream without the need for changing the Publisher or the Subscriber. The Processor (or a chain of Processors) sit between the Publisher and the Subscriber to transform one stream of data to another. The Publisher and the Subscriber are independent of the transformation that happen to the stream of data.

push.png

Why Reactive Programming ?

  • Simpler code, making it more readable.
  • Abstracts away from boiler plate code to focus on business logic.
  • Abstracts away from low-level threading, synchronization, and concurrency issues.
  • Stream processing implies memory efficient
  • The model can be applied almost everywhere to solve almost any kind of problem.

JDK 9 Flow API

The Flow APIs  in JDK 9 correspond to the Reactive Streams Specification, which is a defacto standard. The Reactive Streams Specification is one of the initiatives to standardize Reactive Programming. Several  implementations already support the Reactive Streams Specification.

The Flow API (and the Reactive Streams API), in some ways, is a combination of ideas from Iterator and Observer patterns. The Iterator is a pull model, where the application pulls items from the source. The Observer is a push model, where the items from the source are pushed to the application. Using the Flow API, the application initially requests for N items, and then the publisher pushes at most N items to the Subscriber. So its a mix of Pull and Push programming models.

pull-push.png

The Flow API Interfaces (At a glance)

@FunctionalInterface public static interface Flow.Publisher<T> {    public void    subscribe(Flow.Subscriber<? super T> subscriber);} public static interface Flow.Subscriber<T> {    public void    onSubscribe(Flow.Subscription subscription);    public void    onNext(T item) ;    public void    onError(Throwable throwable) ;    public void    onComplete() ;} public static interface Flow.Subscription {    public void    request(long n);    public void    cancel() ;} public static interface Flow.Processor<T,R>  extends Flow.Subscriber<T>, Flow.Publisher<R> {}

The Subscriber

The Subscriber subscribes to the Publisher for the callbacks. Data items are not pushed to the Subscriber unless requested, but multiple items may be requested. Subscriber method invocations for a given Subscription are strictly ordered. The application can react to the following callbacks, which are available on the subscriber.

Callback

Description

onSubscribe

Method invoked prior to invoking any other Subscriber methods for the given Subscription.

onNext

Method invoked with a Subscription's next item.

onError

Method invoked upon an unrecoverable error encountered by a Publisher or Subscription, after which no other Subscriber methods are invoked by the Subscription.


If a Publisher encounters an error that does not allow items to be issued to a Subscriber, that Subscriber receives onError, and then receives no further messages.

onComplete

Method invoked when it is known that no additional Subscriber method invocations will occur for a Subscription that is not already terminated by error, after which no other Subscriber methods are invoked by the Subscription.


When it is known that no further messages will be issued to it, a subscriber receives onComplete.

Sample Subscriber

import java.util.concurrent.Flow.*;...public class MySubscriber<T> implements Subscriber<T> {  private Subscription subscription;  @Override  public void onSubscribe(Subscription subscription) {    this.subscription = subscription;    subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded  }  @Override  public void onNext(T item) {    System.out.println("Got : " + item);    subscription.request(1); //a value of  Long.MAX_VALUE may be considered as effectively unbounded  }  @Override  public void onError(Throwable t) {    t.printStackTrace();  }  @Override  public void onComplete() {    System.out.println("Done");  }}

The Publisher

The publisher publishes the stream of data items to the registered subscribers. It publishes items to the subscriber asynchronously, normally using an Executor. Publishers ensure that Subscriber method invocations for each subscription are strictly ordered.

Example publishing a stream of data items to Subscribers using JDK's SubmissionPublisher

import java.util.concurrent.SubmissionPublisher;...    //Create Publisher    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();    //Register Subscriber    MySubscriber<String> subscriber = new MySubscriber<>();    publisher.subscribe(subscriber);    //Publish items    System.out.println("Publishing Items...");    String[] items = {"1", "x", "2", "x", "3", "x"};    Arrays.asList(items).stream().forEach(i -> publisher.submit(i));    publisher.close();

The Subscription

Links a Flow.Publisher and Flow.Subscriber. Subscribers receive items only when requested, and may cancel at any time, via the Subscription.

Method

Description

request

Adds the given number of n items to the current unfulfilled demand for this subscription.

cancel

Causes the Subscriber to (eventually) stop receiving messages.

The Processor

A component that acts as both a Subscriber and Publisher. The processor sits between the Publisher and Subscriber, and transforms one stream to another. There could be one or more processor chained together, and the result of the final processor in the chain, is processed by the Subscriber. The JDK does not provide any concrete Processors so it is left upto the individual to write whatever processor one requires.

Sample Processor to transform String to Integer

import java.util.concurrent.Flow.*;import java.util.concurrent.SubmissionPublisher;...public class MyTransformProcessor<T,R> extends SubmissionPublisher<R> implements Processor<T, R> {  private Function function;  private Subscription subscription;  public MyTransformProcessor(Function<? super T, ? extends R> function) {    super();    this.function = function;  }  @Override  public void onSubscribe(Subscription subscription) {    this.subscription = subscription;    subscription.request(1);  }  @Override  public void onNext(T item) {    submit((R) function.apply(item));    subscription.request(1);  }  @Override  public void onError(Throwable t) {    t.printStackTrace();  }  @Override  public void onComplete() {    close();  }}

Sample code to transform data stream using processor

import java.util.concurrent.SubmissionPublisher;...    //Create Publisher    SubmissionPublisher<String> publisher = new SubmissionPublisher<>();    //Create Processor and Subscriber    MyFilterProcessor<String, String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x"));    MyTransformProcessor<String, Integer> transformProcessor = new MyTransformProcessor<>(s -> Integer.parseInt(s));    MySubscriber<Integer> subscriber = new MySubscriber<>();    //Chain Processor and Subscriber    publisher.subscribe(filterProcessor);    filterProcessor.subscribe(transformProcessor);    transformProcessor.subscribe(subscriber);    System.out.println("Publishing Items...");    String[] items = {"1", "x", "2", "x", "3", "x"};    Arrays.asList(items).stream().forEach(i -> publisher.submit(i));    publisher.close();

Back pressure

Back pressure is built when Publishers are producing at a much faster rate than the rate at which the data items are consumed by the Subscribers. The size of the buffer where the unprocessed items are being buffered might be restricted. The Flow API does not provide any APIs to signal or deal with back pressure as such, but there could be various strategies one could implement by oneself to deal with back pressure. See how RxJava deals with back pressure.

Summary

Adding Reactive Programming API to JDK 9 is a good start. Many other products have also started to offer Reactive Progamming API to access their functionality. Though the Flow API allows programmers to start writing reactive programs, the eco system still has to evolve.

For example, a reactive program may still end up accessing DB using traditional APIs, maybe because not all DBs support API for Reactive Programming.  – i.e. the APIs a reactive program may depend on, might not support reactive programming model yet.

References

About the Author

Rahul Srivastava is an ex-committer for the Xerces2-J project at Apache. He is currently Principal Member of the Technical Staff for the Application Server development team at Oracle.

Comments

  • Very nice and brief article but I do have a qq. I watched this https://www.youtube.com/watch?v=Cj4foJzPF80 and here they say JDK 9 has the reactive streams specification but it doesn't provide an implementation for the same. But in your article, I see SubmissionPublisher which is an implementation of the Publisher is there in JDK 9. Can you please correct me?

    user13432748
  • Very nice and brief article but I do have a qq. I watched this https://www.youtube.com/watch?v=Cj4foJzPF80 and here they say JDK 9 has the reactive streams specification but it doesn't provide an implementation for the same. But in your article, I see SubmissionPublisher which is an implementation of the Publisher is there in JDK 9. Can you please correct me?

    @ramswaroop Yes, the YT video is incorrect in saying that probably because they spoke a little too soon (during Nov 2016)

Sign In or Register to comment.