Reactive programming is becoming ubiquitous when it comes to building any real-time system in recent times. Software designs are shifting from the conventional servers and containers to be event-driven, responsive, scalable and handle failures gracefully. The problem with the traditional approach of creating multithreaded, synchronized and shared applications was that the end products were tightly coupled. Resources can be blocked depending on which thread has acquired the lock. Reactive Programming enables better utilization of resources by using event handlers and asynchronous operations with non-blocking backpressure. Java 9 introduced Flow API to incorporate Reactive Streams. This proves to be a stepping stone to enable programmers to use this paradigm effectively.
So, What Exactly Are Reactive Streams?
While handling live data streams, blocking operations performed in multithreaded environments often hinder the performance of the said systems. Handling runtime errors in such systems are usually cumbersome. Over the years, software designers have found that it would be great if a producer can give information about next item or about an error that has been encountered along with the current item. Sending error events from a consumer also will be great to create a more efficient, fault-tolerant system. Thus, the concept of reactive streams was born.
Reactive streams offer a better alternative by supporting asynchronous stream processing where the receiver or subscriber in the publish-subscribe system does not need to buffer data. Thus, such systems provide non-blocking backpressure.
Flow API
Java 9 introduced Flow API as a part of implementation in accordance with Java Enhancement Proposal(JEP-266). JEP 266 aims at adding a couple of enhancements to the concurrency packages.
Before Java 9, developers used to depend on third-party implementations such as RxJava or Akka-Streams for creating a publish-subscribe or flow-controlled application for asynchronous stream processing. Implementations for Reactive Streams abide by the clauses stated in Reactive Manifesto.
In a publish-subscribe environment, a set of publishers produce items. Subscribers can subscribe to the publishers and consume the items published by them. One thing to note about reactive streams is that they can be used in both synchronous and asynchronous environments.
Reactive Streams in Java 9 consists of four main nested interfaces that come along with the implementation of java.util.concurrent.Flow class:
- Publisher
- Subscriber
- Subscription
- Processor
Let us learn more about these new interfaces.
Publisher
A publisher publishes items or related messages which shall be received by Subscribers. The publisher will publish according to demands received from its Subscribers. A given subscriber invokes Publisher.subscribe() to start receiving items published by the publisher. This is the only method defined in the Publisher interface.
Subscriber
A subscriber receives the items from a publisher to which it has subscribed. There are four methods that come with this interface:
- onSubscribe(Flow.Subscription subscription)
For a given Subscription, this is the first method invoked before invoking any other subscriber methods. Usually, an implementation class calls subscription.request to receive items - onNext(T item)
After processing the data inside current item, a subscriber can get the next item that follows it by invoking the onNext method. - onError(Throwable T)
In publish-subscribe environment, there can be instances where either the subscriber or the publisher encounters a fatal error. In such a scenario, it will be desirable not to invoke further subscribe methods. The onError method can be used in such cases. - onComplete()
This method is invoked when the system understands that for a live subscription, no additional method invocations will be occurring in the near future.
Subscription
A subscription defines the message control that links a subscriber to a given publisher. Subscription contains two methods:
- request(long n)
Sometimes, a given subscriber will have a number of unfulfilled requests. When the system invokes subscription.request(), the subscriber receives upto n additional onNext() invocations. - cancel()This method is used to make the Subscriber stop receiving anymore messages.
Processor
Sometimes, a given subscriber will have to publish items for its own set of subscribers. In such cases, Processor will be used. A Processor can be a subscriber and a publisher at the same time. Thus, the interface inherits from both of these nested interfaces.
A Sample Flow Implementation
In this section, we will see how to create a flow implementation. At first, decide on what type of data should be processed in the publisher and subscriber. The example shows how to use publisher, subscriber, subscription and processor effectively in a multi threaded environment to process atomic variables. There are two dedicated subscriptions for the publisher and processor respectively. Even though a single subscriber subscribes to these, the behaviour of the flow process is determined by the corresponding subscription implementation. The program uses Thread.sleep() to delay the execution in order to to demonstrate how the Flow APIs actually work.
Subscription for the publisher:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
public class PubSubscriptionImpl implements Subscription { private final ExecutorService executor; private Subscriber subscriber; private final AtomicInteger itemValue; private AtomicBoolean isCanceled; private final CompletableFuture<Void> terminated; public PubSubscriptionImpl(Subscriber subscriber, ExecutorService executor, CompletableFuture<Void> terminated) { this.subscriber = subscriber; this.executor = executor; this.terminated = terminated; itemValue = new AtomicInteger(); isCanceled = new AtomicBoolean(false); } @Override public void request(long n) { System.out.println("PubSubscriptionImpl :: request recieved to process : " + n + " items..."); if (isCanceled.get()) return; if (n < 0) executor.execute(() -> subscriber.onError(new IllegalArgumentException())); else publishItems(n); } @Override public void cancel() { isCanceled.set(true); shutdown(); } private void publishItems(long n) { for (int i = 0; i < n; i++) { executor.execute(() -> { int nextVal = itemValue.incrementAndGet(); System.out.println("PubSubscriptionImpl :: publish item: " + nextVal); subscriber.onNext(nextVal); }); } try { Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("Encountered exception : " + e.getMessage()); } } private void shutdown() { System.out.println("PubSubscriptionImpl :: Shut down executor..."); executor.shutdown(); newSingleThreadExecutor().submit(() -> { System.out.println("PubSubscriptionImpl :: Shutdown complete."); terminated.complete(null); }); } } |
Subscription for the Processor:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
public class ProcSubscriptionImpl implements Subscription { private final ExecutorService executor; private Subscriber subscriber; private AtomicBoolean isCanceled; private ConcurrentLinkedQueue<Integer> resources; private final CompletableFuture<Void> terminated; public ProcSubscriptionImpl(Subscriber subscriber, ExecutorService executor, ConcurrentLinkedQueue<Integer> resources, CompletableFuture<Void> terminated) { this.executor = executor; this.subscriber = subscriber; this.resources = resources; this.terminated = terminated; isCanceled = new AtomicBoolean(false); } @Override public void request(long n) { if (isCanceled.get()) return; if (n < 0) executor.execute(() -> subscriber.onError(new IllegalArgumentException())); else if (resources.size() > 0) publishItems(n); else if (resources.size() == 0) { subscriber.onComplete(); } } private void publishItems(long n) { int remainItems = resources.size(); if ((remainItems == n) || (remainItems > n)) { for (int i = 0; i < n; i++) { executor.execute(() -> { subscriber.onNext(resources.poll()); }); } System.out.println("ProcSubscriptionImpl :: Remaining " + (resources.size() - n) + " items to be published to Subscriber!"); } else if ((remainItems > 0) && (remainItems < n)) { for (int i = 0; i < remainItems; i++) { executor.execute(() -> { subscriber.onNext(resources.poll()); }); } subscriber.onComplete(); } else { System.out.println(" ProcSubscriptionImpl :: Processor contains no item!"); } } @Override public void cancel() { isCanceled.set(true); shutdown(); } private void shutdown() { System.out.println("ProcSubscriptionImpl :: Shut down executor..."); executor.shutdown(); newSingleThreadExecutor().submit(() -> { System.out.println("ProcSubscriptionImpl :: Shutdown complete."); terminated.complete(null); }); } } |
Implementation of a subscriber:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
public class SimpleSubscriber implements Subscriber<Integer> { private long numRequest = 0; private Subscription subscription; private long count; public void setNumRequest(long n) { this.numRequest = n; count = numRequest; } @Override public void onSubscribe(Subscription subscription) { System.out.println("SimpleSubcriber :: Subscribed"); this.subscription = subscription; requestItems(numRequest); } private void requestItems(long numRequest) { System.out.println("SimpleSubcriber :: Requested " + numRequest + " items"); subscription.request(numRequest); } @Override public void onNext(Integer item) { if (item != null) { System.out.println("SimpleSubcriber :: next item is : " + item); synchronized (this) { count--; if (count == 0) { System.out.println("SimpleSubcriber :: Cancelling subscription..."); subscription.cancel(); } } } else { System.out.println("SimpleSubcriber :: The item is null."); } try { // adding delay of 1 sec Thread.sleep(1000); } catch (InterruptedException e) { System.out.println("Encountered interrupted exception: " + e.getMessage()); } } @Override public void onComplete() { System.out.println("SimpleSubcriber :: onComplete(): No more item to be processed"); } @Override public void onError(Throwable t) { System.out.println("SimpleSubcriber :: Encountered error: " + t.getMessage()); } } |
Implementation of processor:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
public class ProcessorSample implements Processor<Integer, String> { private Subscription procSubscription; private final ExecutorService executor = Executors.newFixedThreadPool(4); private ProcSubscriptionImpl subscription; private long numRequest; private ConcurrentLinkedQueue<Integer> resources; private final CompletableFuture<Void> terminated = new CompletableFuture<>(); public ProcessorSample() { numRequest = 0; resources = new ConcurrentLinkedQueue<Integer>(); } public void setNumRequest(long n) { this.numRequest = n; } @Override public void subscribe(Subscriber subscriber) { subscription = new ProcSubscriptionImpl(subscriber, executor, resources, terminated); subscriber.onSubscribe(subscription); } @Override public void onSubscribe(Subscription subscription) { System.out.println("ProcessorSample :: Subscribed..."); procSubscription = subscription; startProcessing(); } private void startProcessing() { System.out.println("ProcessorSample :: Started processing " + numRequest + " items"); procSubscription.request(numRequest); } @Override public void onNext(Integer item) { if (null == item) throw new NullPointerException(); resources.add(item); System.out.println("ProcessorSample :: processing item: " + item ); } @Override public void onComplete() { System.out.println("ProcessorSample :: Complete!"); } @Override public void onError(Throwable t) { System.out.println("ProcessorSample :: Encountered error : " + t.getMessage()); } } |
Implementation of Publisher:
1 2 3 4 5 6 7 8 9 10 11 12 |
public class PublisherSample implements Publisher<Integer> { private final ExecutorService executor = Executors.newFixedThreadPool(4); private PubSubscriptionImpl subscription; private final CompletableFuture<Void> terminated = new CompletableFuture<>(); @Override public void subscribe(Subscriber subscriber) { subscription = new PubSubscriptionImpl(subscriber, executor, terminated); subscriber.onSubscribe(subscription); } } |
The main entry point is shown in the code below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
public class PubSubDemo { public static void main(String[] args) throws InterruptedException { PublisherSample publisher = new PublisherSample(); SimpleSubscriber subscriber = new SimpleSubscriber(); ProcessorSample processor = new ProcessorSample(); // subscriber asks for 4 items at start and processor processes 10 items subscriber.setNumRequest(4); processor.setNumRequest(10); publisher.subscribe(processor); processor.subscribe(subscriber); } } |
Output of the above code will be as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
ProcessorSample :: Subscribed... ProcessorSample :: Started processing 10 items PubSubscriptionImpl :: request recieved to process : 10 items... PubSubscriptionImpl :: publish item: 1 PubSubscriptionImpl :: publish item: 2 PubSubscriptionImpl :: publish item: 3 ProcessorSample :: processing item: 3 ProcessorSample :: processing item: 2 ProcessorSample :: processing item: 1 PubSubscriptionImpl :: publish item: 5 PubSubscriptionImpl :: publish item: 4 ProcessorSample :: processing item: 5 ProcessorSample :: processing item: 4 PubSubscriptionImpl :: publish item: 6 PubSubscriptionImpl :: publish item: 8 PubSubscriptionImpl :: publish item: 7 ProcessorSample :: processing item: 8 PubSubscriptionImpl :: publish item: 9 ProcessorSample :: processing item: 6 ProcessorSample :: processing item: 9 PubSubscriptionImpl :: publish item: 10 ProcessorSample :: processing item: 10 ProcessorSample :: processing item: 7 SimpleSubcriber :: Subscribed SimpleSubcriber :: Requested 4 items ProcSubscriptionImpl :: Remaining 6 items to be published to Subscriber! SimpleSubcriber :: next item is : 3 SimpleSubcriber :: next item is : 2 SimpleSubcriber :: next item is : 1 SimpleSubcriber :: next item is : 5 SimpleSubcriber :: Cancelling subscription... ProcSubscriptionImpl :: Shut down executor... ProcSubscriptionImpl :: Shutdown complete. |
To Sum Up
With the advancement of Information Technology and Computing there rose the need for systems that react to situations. Consequently, it was necessary not to limit Java only to Object-Oriented Programming Paradigm. Many front-end technologies embraced reactive paradigm. Third party libraries like RxJs, RxJava, etc came into existence because of the demand for incorporation of this paradigm. Although Java 9 comes with an implementation for Reactive Streams, what it provides is still minimal. But the Java developer community hopes to get enhancements for the API in the near future.