Friday, May 1, 2020

Reactive Programming with RxJava

This time I read Reactive Programming with RxJava by Ben Christensen and Tomasz Nurkiewicz. Various notes will follow (mainly new things / things I want to write specially down)

Some basics / How RxJava works (Ch. 1)

The contract of an RxJava Observable:

  • Events onNext(), onCompleted() and onError()can never be emitted concurrently.
  • E.g. onNext() can't be invoked in a thread if it is still being executed on another thread.
  • Why? Mainly because concurrency is difficult.
  • See also full Rx Observable contract.

The Observable type is lazy, meaning it does nothing until it is subscribed to.

  • Subscription, not construction starts work.
  • Observables can be reused.

Observable duality with Iterable:

Pull (Iterable) Push (Observable)
T next onNext(T)
throws Exception onError(Throwable)
returns onCompleted

Cardinalities:

One Many
Synchronous T getData() Iterable getData()
Asynchronous Future getData() Observable getData()

Concurrency vs Parallelism

  • Parallelism is when multiple tasks are simultaneously executed e.g. on different CPUs or machines.
  • Concurrency is composition and/or interleaving of multiple tasks, that can be done even with a machine with a single core.

Reactive Extensions (Ch. 2)

Observable is a push-based stream of events.

Subscribing to notifications from Observable. Remember the contract: Even if events are emitted from many threads, callbacks will not be invoked from more than one thread at a time.

Observable<Foo> foos = ...

foos.subscribe(
    (Foo foo) -> { System.out.println(foo); },
    (Throwable t) -> { t.printStackTrace(); },
    () -> System.out.println("No more");
)

Example for Observable.create:

Observable<Integer> ints = Observable.
    create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            log("Create");
            subscriber.onNext(5);
            subscriber.onNext(6);
            subscriber.onCompleted();
            log("Completed");
        }
    });
log("Start");
ints.subscribe(i -> log("Element" + i));
log("End");

produces

main: Start
main: Create
main: Element: 5
main: Element: 6
main: Completed
main: End

By default, emitting does not begin until we actually subscribe. Every time subscribe() is called, handler is invoked again.

In some cases (e.g. DB query or heavy computation) this might not be wanted -> cache() operator. NOTE: cache() and infinite stream is a recipe for disaster.

Operators wrap existing Observables, enhancing them typically by intercepting subscription.

Hot and cold observables

  • Cold Observable is completely lazy, begins emit events only when subscribed to.
    • Without observers, nothing happens.
    • Each subscriber receives its own copy of the stream.
  • Hot Observables are independent of consumers.
    • Hot Observable might emit events even without Subscribers.
    • Typically when we have no control over the source of events, e.g. UI actions.
    • Note that you can't be sure whether you have received all events from the beginning.

Subject & ConnectableObservable

Subject both extends Observable and implements Observer at the same time. Some examples (JavaDocs: RxJava 1 rx.subjects, RxJava 2 io.reactivex.subjects)

  • PublishSubject
  • AsyncSubject
  • BehaviorSubject
  • ReplaySubject

See also ConnectableObservable (RxJava 1, RxJava 2)

Operators and Transformations (Ch. 3)

An operator is a function that takes upstream Observable<T> and returns downstream Observable<R> (T and R might or might not be the same).

Operators are often viewed as /marble diagrams/, see e.g. Observable javadocs (See e.g. Observable.map() - RxJava 1 / RxJava 2). See also interactive marble diagrams at https://rxmarbles.com/.

There are many operators for various purposes (not going to list them here).

Custom operators

  • In case of a transformation from an Observable to another Observable, there is an interface Observable.Transformer that can be given to Observable.compose. (This would usually be a series of operators)
  • In case existing operators are not enough, there is Observable.lift.
    • When compose() transforms Observables, lift() transforms Subscribers.
    • Note: Backpressure & subscription mechanism need to be taken into account.
    • Note: Operators are usually operators for lift(), e.g. observable.filter(predicate) is implemented as observable.lift(new OperatorFilter<>(predicate))

Applying Reactive Programming to Existing Applications (Ch. 4)

Notes on Multithreading in RxJava

Even though not enforced or suggested by the type system, many Observables are asynchronous from the very beginning, and you should assume that.

  • A blocking subscribe() method happens very rarely, when a lambda within Observable.create() is not backed by any asynchronous process or stream.
  • However, by default (with create()) everything happens in the client thread.

Scheduler

  • RxJava is concurrency-agnostic - does not introduce concurrency on its own.
  • Certain operators cannot work property without concurrency.
  • RxJava has Scheduler (similar to ScheduledExecutorService)
  • Used together with subscribeOn() and observeOn() as well as when creating certain types of Observables.
  • Naively, a Scheduler can be thinked to be like a thread pool and a Worker like a thread inside that pool.

Built-in schedulers:

  • Schedulers.newThread()
    • a new thread each time
    • "hardly ever a good choice"
  • Schedulers.io()
    • similar to newThread() but recycles threads and can possibly handle future requests
    • for I/O bound tasks, waiting for network/disk
  • Schedulers.computation()
    • for entirely CPU-bound tasks
    • by default number of threads executed in parallel is limited by value of availableProcessors()
  • Schedulers.from(Executor executor)
  • Schedulers.immediate()
    • Invokes a task within the client thread in a blocking fashion
  • Schedulers.trampoline()
    • Similar to immediate() but executes a task when all previously scheduled tasks complete.
  • Schedulers.test()
    • For testing purposes
    • Ability to arbitrarily advance the clock

subscribeOn & observeOn

  • What happens if there are two subscribeOn() invocations between Observable and subscribe()?
    • subscribe() closest to the original Observable wins.
  • Note: In entirely reactive software stacks, subscribeOn() is almost never used, yet all Observables are asynchronous.
    • In general, subscribeOn() is used quite rarely, mostly when retrofitting existing APIs or libraries.
  • In general, flatMap() and merge() are the operators for parallelism

Simple analogies:

  • Observable without any Scheduler works like a single-threaded program with blocking method calls passing data between one another.
  • Observable with a single subscribeOn() is like starting a big task in the background Thread. The program within that Thread is still sequential, but at least it runs in the background.
  • Observable using flatMap() where each internal Observable has subscribeOn() works like java.util.concurrent.ForkJoinPool , where each substream is a fork of execution and flatMap() is a safe join merge.

observeOn():

  • Everything below observeOn() is run within the supplied Scheduler.
  • subscribeOn() & observeOn() work well together when you want to physically decouple producer (Observable.create()) and consumer (Subscriber) .

Note that many operators use some Scheduler by default, typically Schedulers.computation().

Reactive from Top to Bottom (Ch. 5)

Non-blocking applications tend to provide great performance and throughput for a fraction of the hardware. By limiting the number of threads, we're able to fully utilize CPU without consuming gigabytes of memory.

C10k problem:

The advice for interacting with relational databases is to actually have a dedicated, well-tuned thread pool and isolate the blocking code there.

Short introduction to CompletableFuture. Semantically, you can treat CompletableFuture like an Observable with the following characteristics:

  • It is hot.
  • It is cached.
  • It emits exactly one element or exception.

Flow Control and Backpressure (Ch. 6)

RxJava has two ways of dealing with producers being more active than subscribers:

  • Various flow-control mechanisms such as sampling and batching (built-in operators)
  • Subscribers can propagate their demand and request only as many items as they can process - by using a feedback channel known as backpressure.

Flow control operators

  • sample() & throttleFirst()
  • buffer() (different overloaded versions) - extremely flexible, quite complex.
  • window() (different overloaded versions)
  • debounce()

Backpressure

  • In essence, backpressure is a feedback channel from the consumer to producer.
  • a protocol that allows the consumer to request how much data it can consume at a time.
  • "consumer" here: terminal subscribers as well as intermediate operators
  • Switches from push to pull-push model
  • When creating Observables, think about correctly handling the backpressure requests.

Testing and troubleshooting (Ch. 7)

According to the Reactive Manifesto, reactive systems should be responsive, resilient, elastic and message-driven. Here responsiveness and resiliency are discussed. Observable is a container for values or errors.

Case Studies (Ch. 8)

A distributed system is one in which the failure of a computer you didn't even know existed can render your own computer unusable

  • Leslie Lamport, 1987

Managing Failures with Hystrix

UPDATE NOTE: Nowadays Hystrix is in maintenance mode, the most prominent successor being resilience4j.

RxJava has many operators that support writing scalable, reactive and resilient applications

  • Declarative concurrency with Schedulers.
  • Timeouts and various error handling mechanisms.
  • Parallelizing work with flatMap and limiting concurrency at the same time.

Hystrix helps to work with actions that can potentially fail, applying clever logic around such code. Mechanisms include:

  • Bulkhead patterns by cutting off misbehaving actions entirely for a certain time.
    • Bulkheads are large walls across a ship's hull that create watertight compartments.
  • Failing fast by applying timeouts, limiting concurrency, and implementing a so-called circuit breaker:
    • Circuit breaker's responsibility is to interrupt flow of electricity in order to protect various devices from overload and even catching fire.
  • Batching requests by collapsing small orders into one big order.
    • Note that batching makes no sense under low load.
  • Collecting, publishing, and visualizing performance statistics.

Hystrix helps with self-healing in two fronts

  • By turning off broken commands temporarily, it allows downstream dependencies to recover.
  • After recovering, the system returns back to normal operation.

Other ways of expressing asynchronous computation

Note that there are also other ways of expressing asynchronous computation, for example:

  • Java 8 CompletableFuture & CompletionStage
  • Parallel Stream (see note below)
  • Flux and Mono from Project Reactor
  • Guava ListenableFuture

Note on parallel Java 8 Streams: All parallel streams share a hardcoded ForkJoinPool aligned with the number of CPUs -> Works generally fine with CPU-intensive tasks but does not work well for I/O intensive tasks.

Be aware of operators consuming uncontrolled amounts of memory.

  • distinct() caching all seen events
  • Buffering events with toList() and buffer()
  • Caching with cache() and ReplaySubject

Note: Backpressure helps to keep memory usage low by failing fast.

Future Directions (Ch. 9)

  • Reactive Streams API
  • RxJava 2 (discussed in the end of the book)
    • E.g.: Separate io.reactivex.Observable (non-backpressured) and io.reactivex.Flowable (backpressure-enabled)
  • RxJava 3 (released after book)

Further updates after the book: