Discussions
Categories
- 385.5K All Categories
- 5.1K Data
- 2.5K Big Data Appliance
- 2.5K Data Science
- 453.4K Databases
- 223.2K General Database Discussions
- 3.8K Java and JavaScript in the Database
- 47 Multilingual Engine
- 606 MySQL Community Space
- 486 NoSQL Database
- 7.9K Oracle Database Express Edition (XE)
- 3.2K ORDS, SODA & JSON in the Database
- 584 SQLcl
- 4K SQL Developer Data Modeler
- 188K SQL & PL/SQL
- 21.5K SQL Developer
- 45 Data Integration
- 45 GoldenGate
- 298.4K Development
- 4 Application Development
- 20 Developer Projects
- 166 Programming Languages
- 295K Development Tools
- 150 DevOps
- 3.1K QA/Testing
- 646.7K Java
- 37 Java Learning Subscription
- 37.1K Database Connectivity
- 201 Java Community Process
- 108 Java 25
- 22.2K Java APIs
- 138.3K Java Development Tools
- 165.4K Java EE (Java Enterprise Edition)
- 22 Java Essentials
- 176 Java 8 Questions
- 86K Java Programming
- 82 Java Puzzle Ball
- 65.1K New To Java
- 1.7K Training / Learning / Certification
- 13.8K Java HotSpot Virtual Machine
- 94.3K Java SE
- 13.8K Java Security
- 208 Java User Groups
- 25 JavaScript - Nashorn
- Programs
- 666 LiveLabs
- 41 Workshops
- 10.3K Software
- 6.7K Berkeley DB Family
- 3.6K JHeadstart
- 6K Other Languages
- 2.3K Chinese
- 207 Deutsche Oracle Community
- 1.1K Español
- 1.9K Japanese
- 474 Portuguese
Reactive Programming with JDK 9 Flow API

Reactive programming is about processing an asynchronous stream of data items, where applications react to the data items as they occur. This article by Rahul Srivastava presents an example using the JDK 9 Flow API.
By Rahul Srivastava
What is Reactive Programming ?
Reactive programming is about processing an asynchronous stream of data items, where applications react to the data items as they occur. A stream of data is essentially a sequence of data items occurring over time. This model is more memory efficient because the data is processed as streams, as compared to iterating over the in-memory data.
In the Reactive Programming model, there is a Publisher and a Subscriber. The Publisher publishes a stream of data, to which the Subscriber is asynchronously subscribed.
The model also provides a mechanism to introduce higher order functions to operate on the stream by means of Processors. Processors transform the data stream without the need for changing the Publisher or the Subscriber. The Processor (or a chain of Processors) sit between the Publisher and the Subscriber to transform one stream of data to another. The Publisher and the Subscriber are independent of the transformation that happen to the stream of data.
Why Reactive Programming ?
- Simpler code, making it more readable.
- Abstracts away from boiler plate code to focus on business logic.
- Abstracts away from low-level threading, synchronization, and concurrency issues.
- Stream processing implies memory efficient
- The model can be applied almost everywhere to solve almost any kind of problem.
JDK 9 Flow API
The Flow APIs in JDK 9 correspond to the Reactive Streams Specification, which is a defacto standard. The Reactive Streams Specification is one of the initiatives to standardize Reactive Programming. Several implementations already support the Reactive Streams Specification.
The Flow API (and the Reactive Streams API), in some ways, is a combination of ideas from Iterator and Observer patterns. The Iterator is a pull model, where the application pulls items from the source. The Observer is a push model, where the items from the source are pushed to the application. Using the Flow API, the application initially requests for N items, and then the publisher pushes at most N items to the Subscriber. So its a mix of Pull and Push programming models.
The Flow API Interfaces (At a glance)
@FunctionalInterface public static interface Flow.Publisher<T> { public void subscribe(Flow.Subscriber<? super T> subscriber);} public static interface Flow.Subscriber<T> { public void onSubscribe(Flow.Subscription subscription); public void onNext(T item) ; public void onError(Throwable throwable) ; public void onComplete() ;} public static interface Flow.Subscription { public void request(long n); public void cancel() ;} public static interface Flow.Processor<T,R> extends Flow.Subscriber<T>, Flow.Publisher<R> {}
The Subscriber
The Subscriber subscribes to the Publisher for the callbacks. Data items are not pushed to the Subscriber unless requested, but multiple items may be requested. Subscriber method invocations for a given Subscription are strictly ordered. The application can react to the following callbacks, which are available on the subscriber.
Callback | Description |
onSubscribe | Method invoked prior to invoking any other Subscriber methods for the given Subscription. |
onNext | Method invoked with a Subscription's next item. |
onError | Method invoked upon an unrecoverable error encountered by a Publisher or Subscription, after which no other Subscriber methods are invoked by the Subscription.
|
onComplete | Method invoked when it is known that no additional Subscriber method invocations will occur for a Subscription that is not already terminated by error, after which no other Subscriber methods are invoked by the Subscription.
|
Sample Subscriber
import java.util.concurrent.Flow.*;...public class MySubscriber<T> implements Subscriber<T> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); //a value ofLong.MAX_VALUE
may be considered as effectively unbounded } @Override public void onNext(T item) { System.out.println("Got : " + item); subscription.request(1); //a value ofLong.MAX_VALUE
may be considered as effectively unbounded } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { System.out.println("Done"); }}
The Publisher
The publisher publishes the stream of data items to the registered subscribers. It publishes items to the subscriber asynchronously, normally using an Executor. Publishers ensure that Subscriber method invocations for each subscription are strictly ordered.
Example publishing a stream of data items to Subscribers using JDK's SubmissionPublisher
import java.util.concurrent.SubmissionPublisher;... //Create Publisher SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); //Register Subscriber MySubscriber<String> subscriber = new MySubscriber<>(); publisher.subscribe(subscriber); //Publish items System.out.println("Publishing Items..."); String[] items = {"1", "x", "2", "x", "3", "x"}; Arrays.asList(items).stream().forEach(i -> publisher.submit(i)); publisher.close();
The Subscription
Links a Flow.Publisher and Flow.Subscriber. Subscribers receive items only when requested, and may cancel at any time, via the Subscription.
Method | Description |
request | Adds the given number of n items to the current unfulfilled demand for this subscription. |
cancel | Causes the Subscriber to (eventually) stop receiving messages. |
The Processor
A component that acts as both a Subscriber and Publisher. The processor sits between the Publisher and Subscriber, and transforms one stream to another. There could be one or more processor chained together, and the result of the final processor in the chain, is processed by the Subscriber. The JDK does not provide any concrete Processors so it is left upto the individual to write whatever processor one requires.
Sample Processor to transform String to Integer
import java.util.concurrent.Flow.*;import java.util.concurrent.SubmissionPublisher;...public class MyTransformProcessor<T,R> extends SubmissionPublisher<R> implements Processor<T, R> { private Function function; private Subscription subscription; public MyTransformProcessor(Function<? super T, ? extends R> function) { super(); this.function = function; } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; subscription.request(1); } @Override public void onNext(T item) { submit((R) function.apply(item)); subscription.request(1); } @Override public void onError(Throwable t) { t.printStackTrace(); } @Override public void onComplete() { close(); }}
Sample code to transform data stream using processor
import java.util.concurrent.SubmissionPublisher;... //Create Publisher SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); //Create Processor and Subscriber MyFilterProcessor<String, String> filterProcessor = new MyFilterProcessor<>(s -> s.equals("x")); MyTransformProcessor<String, Integer> transformProcessor = new MyTransformProcessor<>(s -> Integer.parseInt(s)); MySubscriber<Integer> subscriber = new MySubscriber<>(); //Chain Processor and Subscriber publisher.subscribe(filterProcessor); filterProcessor.subscribe(transformProcessor); transformProcessor.subscribe(subscriber); System.out.println("Publishing Items..."); String[] items = {"1", "x", "2", "x", "3", "x"}; Arrays.asList(items).stream().forEach(i -> publisher.submit(i)); publisher.close();
Back pressure
Back pressure is built when Publishers are producing at a much faster rate than the rate at which the data items are consumed by the Subscribers. The size of the buffer where the unprocessed items are being buffered might be restricted. The Flow API does not provide any APIs to signal or deal with back pressure as such, but there could be various strategies one could implement by oneself to deal with back pressure. See how RxJava deals with back pressure.
Summary
Adding Reactive Programming API to JDK 9 is a good start. Many other products have also started to offer Reactive Progamming API to access their functionality. Though the Flow API allows programmers to start writing reactive programs, the eco system still has to evolve.
For example, a reactive program may still end up accessing DB using traditional APIs, maybe because not all DBs support API for Reactive Programming. – i.e. the APIs a reactive program may depend on, might not support reactive programming model yet.
References
- Download JDK 9
- JDK 9 Flow API javadoc
- Reactive Streams Specification
- Reactive Streams API javadoc
- The Reactive Manifesto
About the Author
Rahul Srivastava is an ex-committer for the Xerces2-J project at Apache. He is currently Principal Member of the Technical Staff for the Application Server development team at Oracle.
Comments
-
Very nice and brief article but I do have a qq. I watched this https://www.youtube.com/watch?v=Cj4foJzPF80 and here they say JDK 9 has the reactive streams specification but it doesn't provide an implementation for the same. But in your article, I see SubmissionPublisher which is an implementation of the Publisher is there in JDK 9. Can you please correct me?
-
Very nice and brief article but I do have a qq. I watched this https://www.youtube.com/watch?v=Cj4foJzPF80 and here they say JDK 9 has the reactive streams specification but it doesn't provide an implementation for the same. But in your article, I see SubmissionPublisher which is an implementation of the Publisher is there in JDK 9. Can you please correct me?
@ramswaroop Yes, the YT video is incorrect in saying that probably because they spoke a little too soon (during Nov 2016)