This time I read Reactive Programming with RxJava by Ben Christensen and Tomasz Nurkiewicz. Various notes will follow (mainly new things / things I want to write specially down)
Some basics / How RxJava works (Ch. 1)
The contract of an RxJava Observable
:
- Events
onNext()
,onCompleted()
andonError()
can never be emitted concurrently. - E.g.
onNext()
can't be invoked in a thread if it is still being executed on another thread. - Why? Mainly because concurrency is difficult.
- See also full Rx Observable contract.
The Observable
type is lazy, meaning it does nothing until it is subscribed to.
- Subscription, not construction starts work.
- Observables can be reused.
Observable
duality with Iterable
:
Pull (Iterable) | Push (Observable) |
---|---|
T next | onNext(T) |
throws Exception | onError(Throwable) |
returns | onCompleted |
Cardinalities:
One | Many | |
---|---|---|
Synchronous | T getData() | Iterable |
Asynchronous | Future |
Observable |
Concurrency vs Parallelism
- Parallelism is when multiple tasks are simultaneously executed e.g. on different CPUs or machines.
- Concurrency is composition and/or interleaving of multiple tasks, that can be done even with a machine with a single core.
Reactive Extensions (Ch. 2)
Observable
is a push-based stream of events.
Subscribing to notifications from Observable
. Remember the contract: Even if events are emitted from many threads, callbacks will not be invoked from more than one thread at a time.
Observable<Foo> foos = ...
foos.subscribe(
(Foo foo) -> { System.out.println(foo); },
(Throwable t) -> { t.printStackTrace(); },
() -> System.out.println("No more");
)
Example for Observable.create
:
Observable<Integer> ints = Observable.
create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
log("Create");
subscriber.onNext(5);
subscriber.onNext(6);
subscriber.onCompleted();
log("Completed");
}
});
log("Start");
ints.subscribe(i -> log("Element" + i));
log("End");
produces
main: Start
main: Create
main: Element: 5
main: Element: 6
main: Completed
main: End
By default, emitting does not begin until we actually subscribe. Every time subscribe()
is called, handler is invoked again.
In some cases (e.g. DB query or heavy computation) this might not be wanted -> cache()
operator. NOTE: cache()
and infinite stream is a recipe for disaster.
Operators wrap existing Observable
s, enhancing them typically by intercepting subscription.
Hot and cold observables
- Cold Observable is completely lazy, begins emit events only when subscribed to.
- Without observers, nothing happens.
- Each subscriber receives its own copy of the stream.
- Hot Observables are independent of consumers.
- Hot Observable might emit events even without Subscribers.
- Typically when we have no control over the source of events, e.g. UI actions.
- Note that you can't be sure whether you have received all events from the beginning.
Subject
& ConnectableObservable
Subject
both extends Observable
and implements Observer
at the same time. Some examples (JavaDocs: RxJava 1 rx.subjects, RxJava 2 io.reactivex.subjects)
- PublishSubject
- AsyncSubject
- BehaviorSubject
- ReplaySubject
See also ConnectableObservable (RxJava 1, RxJava 2)
Operators and Transformations (Ch. 3)
An operator is a function that takes upstream Observable<T>
and returns downstream Observable<R>
(T
and R
might or might not be the same).
Operators are often viewed as /marble diagrams/, see e.g. Observable
javadocs (See e.g. Observable.map()
- RxJava 1 / RxJava 2). See also interactive marble diagrams at https://rxmarbles.com/.
There are many operators for various purposes (not going to list them here).
Custom operators
- In case of a transformation from an
Observable
to anotherObservable
, there is an interfaceObservable.Transformer
that can be given toObservable.compose
. (This would usually be a series of operators) - In case existing operators are not enough, there is
Observable.lift
.- When
compose()
transformsObservable
s,lift()
transformsSubscriber
s. - Note: Backpressure & subscription mechanism need to be taken into account.
- Note: Operators are usually operators for
lift()
, e.g.observable.filter(predicate)
is implemented asobservable.lift(new OperatorFilter<>(predicate))
- When
Applying Reactive Programming to Existing Applications (Ch. 4)
Notes on Multithreading in RxJava
Even though not enforced or suggested by the type system, many Observable
s are asynchronous from the very beginning, and you should assume that.
- A blocking
subscribe()
method happens very rarely, when a lambda withinObservable.create()
is not backed by any asynchronous process or stream. - However, by default (with
create()
) everything happens in the client thread.
Scheduler
- RxJava is concurrency-agnostic - does not introduce concurrency on its own.
- Certain operators cannot work property without concurrency.
- RxJava has
Scheduler
(similar toScheduledExecutorService
) - Used together with
subscribeOn()
andobserveOn()
as well as when creating certain types ofObservable
s. - Naively, a
Scheduler
can be thinked to be like a thread pool and aWorker
like a thread inside that pool.
Built-in schedulers:
Schedulers.newThread()
- a new thread each time
- "hardly ever a good choice"
Schedulers.io()
- similar to
newThread()
but recycles threads and can possibly handle future requests - for I/O bound tasks, waiting for network/disk
- similar to
Schedulers.computation()
- for entirely CPU-bound tasks
- by default number of threads executed in parallel is limited by value of
availableProcessors()
Schedulers.from(Executor executor)
Schedulers.immediate()
- Invokes a task within the client thread in a blocking fashion
Schedulers.trampoline()
- Similar to
immediate()
but executes a task when all previously scheduled tasks complete.
- Similar to
Schedulers.test()
- For testing purposes
- Ability to arbitrarily advance the clock
subscribeOn & observeOn
- What happens if there are two
subscribeOn()
invocations betweenObservable
andsubscribe()
?subscribe()
closest to the originalObservable
wins.
- Note: In entirely reactive software stacks,
subscribeOn()
is almost never used, yet allObservable
s are asynchronous.- In general,
subscribeOn()
is used quite rarely, mostly when retrofitting existing APIs or libraries.
- In general,
- In general,
flatMap()
andmerge()
are the operators for parallelism
Simple analogies:
Observable
without anyScheduler
works like a single-threaded program with blocking method calls passing data between one another.Observable
with a singlesubscribeOn()
is like starting a big task in the backgroundThread
. The program within thatThread
is still sequential, but at least it runs in the background.Observable
usingflatMap()
where each internalObservable
hassubscribeOn()
works likejava.util.concurrent.ForkJoinPool
, where each substream is a fork of execution andflatMap()
is a safe join merge.
observeOn()
:
- Everything below
observeOn()
is run within the suppliedScheduler
. subscribeOn()
&observeOn()
work well together when you want to physically decouple producer (Observable.create()
) and consumer (Subscriber
) .
Note that many operators use some Scheduler
by default, typically Schedulers.computation()
.
Reactive from Top to Bottom (Ch. 5)
Non-blocking applications tend to provide great performance and throughput for a fraction of the hardware. By limiting the number of threads, we're able to fully utilize CPU without consuming gigabytes of memory.
C10k problem:
- area of research and optimization that tried to achieve 10,000 concurrent connections on a single commodity server
- Whole text available at: Beating the C10k Problem
- Source code for examples can be found from https://github.com/nurkiewicz/rxjava-book-examples
The advice for interacting with relational databases is to actually have a dedicated, well-tuned thread pool and isolate the blocking code there.
Short introduction to CompletableFuture
. Semantically, you can treat CompletableFuture
like an Observable
with the following characteristics:
- It is hot.
- It is cached.
- It emits exactly one element or exception.
Flow Control and Backpressure (Ch. 6)
RxJava has two ways of dealing with producers being more active than subscribers:
- Various flow-control mechanisms such as sampling and batching (built-in operators)
- Subscribers can propagate their demand and request only as many items as they can process - by using a feedback channel known as backpressure.
Flow control operators
sample()
&throttleFirst()
buffer()
(different overloaded versions) - extremely flexible, quite complex.window()
(different overloaded versions)debounce()
Backpressure
- In essence, backpressure is a feedback channel from the consumer to producer.
- a protocol that allows the consumer to request how much data it can consume at a time.
- "consumer" here: terminal subscribers as well as intermediate operators
- Switches from push to pull-push model
- When creating
Observable
s, think about correctly handling the backpressure requests.
Testing and troubleshooting (Ch. 7)
According to the Reactive Manifesto, reactive systems should be responsive, resilient, elastic and message-driven. Here responsiveness and resiliency are discussed. Observable
is a container for values or errors.
Case Studies (Ch. 8)
A distributed system is one in which the failure of a computer you didn't even know existed can render your own computer unusable
- Leslie Lamport, 1987
Managing Failures with Hystrix
UPDATE NOTE: Nowadays Hystrix is in maintenance mode, the most prominent successor being resilience4j.
RxJava has many operators that support writing scalable, reactive and resilient applications
- Declarative concurrency with
Scheduler
s. - Timeouts and various error handling mechanisms.
- Parallelizing work with
flatMap
and limiting concurrency at the same time.
Hystrix helps to work with actions that can potentially fail, applying clever logic around such code. Mechanisms include:
- Bulkhead patterns by cutting off misbehaving actions entirely for a certain time.
- Bulkheads are large walls across a ship's hull that create watertight compartments.
- Failing fast by applying timeouts, limiting concurrency, and implementing a so-called circuit breaker:
- Circuit breaker's responsibility is to interrupt flow of electricity in order to protect various devices from overload and even catching fire.
- Batching requests by collapsing small orders into one big order.
- Note that batching makes no sense under low load.
- Collecting, publishing, and visualizing performance statistics.
Hystrix helps with self-healing in two fronts
- By turning off broken commands temporarily, it allows downstream dependencies to recover.
- After recovering, the system returns back to normal operation.
Other ways of expressing asynchronous computation
Note that there are also other ways of expressing asynchronous computation, for example:
- Java 8
CompletableFuture
&CompletionStage
- Parallel
Stream
(see note below) Flux
andMono
from Project Reactor- Guava
ListenableFuture
Note on parallel Java 8 Stream
s: All parallel streams share a hardcoded ForkJoinPool
aligned with the number of CPUs -> Works generally fine with CPU-intensive tasks but does not work well for I/O intensive tasks.
Be aware of operators consuming uncontrolled amounts of memory.
distinct()
caching all seen events- Buffering events with
toList()
andbuffer()
- Caching with
cache()
andReplaySubject
Note: Backpressure helps to keep memory usage low by failing fast.
Future Directions (Ch. 9)
- Reactive Streams API
- RxJava 2 (discussed in the end of the book)
- E.g.: Separate
io.reactivex.Observable
(non-backpressured) andio.reactivex.Flowable
(backpressure-enabled)
- E.g.: Separate
- RxJava 3 (released after book)
Further updates after the book:
- Project Reactor has gained momentum, being foundation for Reactive Spring.
- Hystrix is nowadays only in maintenance mode. The most prominent successor is resilience4j.
Links
- RxJava wiki
- Reactive Manifesto
- A Decision Tree of Observable Operators
- RxKotlin - "a lightweight library that adds convenient extension functions to RxJava"