Reactive Streams In Java 9

Reactive programming is becoming ubiquitous when it comes to building any real-time system in recent times. Software designs are shifting from the conventional servers and containers to be event-driven, responsive, scalable and handle failures gracefully. The problem with the traditional approach of creating multithreaded, synchronized and shared applications was that the end products were tightly coupled. Resources can be blocked depending on which thread has acquired the lock. Reactive Programming enables better utilization of resources by using event handlers and asynchronous operations with non-blocking backpressure. Java 9 introduced Flow API to incorporate Reactive Streams. This proves to be a stepping stone to enable programmers to use this paradigm effectively.

So, What Exactly Are Reactive Streams?

While handling live data streams, blocking operations performed in multithreaded environments often hinder the performance of the said systems. Handling runtime errors in such systems are usually cumbersome. Over the years, software designers have found that it would be great if a producer can give information about next item or about an error that has been encountered along with the current item. Sending error events from a consumer also will be great to create a more efficient, fault-tolerant system. Thus, the concept of reactive streams was born.

Reactive streams offer a better alternative by supporting asynchronous stream processing where the receiver or subscriber in the publish-subscribe system does not need to buffer data. Thus, such systems provide non-blocking backpressure.

Flow API

Java 9 introduced Flow API as a part of implementation in accordance with Java Enhancement Proposal(JEP-266). JEP 266 aims at adding a couple of enhancements to the concurrency packages.

Before Java 9, developers used to depend on third-party implementations such as RxJava or Akka-Streams for creating a publish-subscribe or flow-controlled application for asynchronous stream processing. Implementations for Reactive Streams abide by the clauses stated in Reactive Manifesto.

In a publish-subscribe environment, a set of publishers produce items. Subscribers can subscribe to the publishers and consume the items published by them. One thing to note about reactive streams is that they can be used in both synchronous and asynchronous environments.

Reactive Streams in Java 9 consists of four main nested interfaces that come along with the implementation of java.util.concurrent.Flow class:

  • Publisher
  • Subscriber
  • Subscription
  • Processor

Let us learn more about these new interfaces.

Publisher

A publisher publishes items or related messages which shall be received by Subscribers. The publisher will publish according to demands received from its Subscribers. A given subscriber invokes Publisher.subscribe() to start receiving items published by the publisher. This is the only method defined in the Publisher interface.

Subscriber

A subscriber receives the items from a publisher to which it has subscribed. There are four methods that come with this interface:

  • onSubscribe(Flow.Subscription subscription)
    For a given Subscription, this is the first method invoked before invoking any other subscriber methods. Usually, an implementation class calls subscription.request to receive items
  • onNext(T item)
    After processing the data inside current item, a subscriber can get the next item that follows it by invoking the onNext method.
  • onError(Throwable T)
    In publish-subscribe environment, there can be instances where either the subscriber or the publisher encounters a fatal error. In such a scenario, it will be desirable not to invoke further subscribe methods. The onError method can be used in such cases.
  • onComplete()
    This method is invoked when the system understands that for a live subscription, no additional method invocations will be occurring in the near future.

Subscription

A subscription defines the message control that links a subscriber to a given publisher. Subscription contains two methods:

  • request(long n)
    Sometimes, a given subscriber will have a number of unfulfilled requests. When the system invokes subscription.request(), the subscriber receives upto n additional onNext() invocations.
  • cancel()This method is used to make the Subscriber stop receiving anymore messages.

Processor

Sometimes, a given subscriber will have to publish items for its own set of subscribers. In such cases, Processor will be used. A Processor can be a subscriber and a publisher at the same time. Thus, the interface inherits from both of these nested interfaces.

A Sample Flow Implementation

In this section, we will see how to create a flow implementation. At first, decide on what type of data should be processed in the publisher and subscriber. The example shows how to use publisher, subscriber, subscription and processor effectively in a multi threaded environment to process atomic variables. There are two dedicated subscriptions for the publisher and processor respectively. Even though a single subscriber subscribes to these, the behaviour of the flow process is determined by the corresponding subscription implementation.  The program uses Thread.sleep() to delay the execution in order to to demonstrate how the Flow APIs actually work.

Subscription for the publisher:

Subscription for the Processor:

Implementation of a subscriber:

Implementation of processor:

Implementation of Publisher:

The main entry point is shown in the code below:

Output of the above code will be as follows:

To Sum Up

With the advancement of Information Technology and Computing there rose the need for systems that react to situations. Consequently, it was necessary not to limit Java only to Object-Oriented Programming Paradigm. Many front-end technologies embraced reactive paradigm. Third party libraries like RxJs, RxJava, etc came into existence because of the demand for incorporation of this paradigm. Although Java 9 comes with an implementation for Reactive Streams, what it provides is still minimal. But the Java developer community hopes to get enhancements for the API in the near future.

Leave a Reply