Reactive programming is a programming technique for asynchronous applications that lets you structure your code based on “reaction” to data input changes instead of an imperative programming style where you have to poll or block and wait for changes to happen.
If you’re not 100% familiar with ReactiveX (RxJava being the implementation for the JVM), perhaps you know Java Stream, which is a similar concept introduced in Java 8.
In our last post we covered RxJava 2 for Kotlin and Java programming, we gave a basic overview of its key building blocks: observables, subscriptions, operators and schedulers.
Today we’re going to talk about something more complicated. If you do not know what RxJava is, we recommend you start reading our previous post first! We use it quite a lot in our Android development.
In this piece we’re going to discuss three distinct ideas:
- How we can work with data flows,
- What kind of observables we have and
- How we can we can transform an imperative function into a reactive one.
Types of Observables
In the RxJava2 framework we have five different types of objects that can be observed:
The names are a bit confusing. Surely they’re all observable, right? They all respond to the observer pattern. Well, the first category on the list is so-called because it was the original type of ‘observable’. The four other classes were all created later.
We can group this quintet into two clusters. The first cluster includes observable and flowable; the second encompasses single, maybe and completable. Why do we make this distinction? Well, Observable and Flowable objects are what’s known as endless observables. That means that they can provide infinite flows of data. On the other hand, Single, Maybe and Completable are meant to send just one element (or no element at all) and then complete.
Here is a quick summary of what each one does, we’ll dive into the details of each in a minute:
- Observable: emit a stream elements (endlessly)
- Flowable: emit a stream of elements (endlessly, with backpressure)
- Single: emits exactly one element
- Maybe: emits zero or one elements
- Completable: emits a “complete” event, without emitting any data type, just a success/failure
So let’s try to talk about each one. Before we do so, though, a little note: given that we talked about observables in the last post, we’d like to introduce Subjects and Processors too. Hope that’s cool with you.
Basically subjects perform the role of observables and subscribers at the same time. They can emit values and also listen out for them. On the other hand processors are exactly the same as subjects but they come with what’s known as backpressure management (we’ll talk about that later).
Observable and Subjects
Observables are the most basic object we can observe, as we discussed in the previous post. But, when you combine both observables and observers, it gets more complicated.
By default the Subject class is abstract (which means it doesn’t provide an implementation) but the framework provides several default implementations that can be super-useful.
This is the most Simple subject. It just takes items from one observable and diverts them to another, like a kind of intermediary. But it doesn’t cache any event, so notifications about past elements aren’t forwarded to each new observer.
This is the marble diagram of a PublishSubject:
As you can see, once the PublishableSubject emits an error, all the subscribers are notified and won’t receive anything more.
A PublishableSubject is useful, for instance, in bypassing hardware events like scroll positions, mouse events, clicks, etc… so you can subscribe several observers to them but you just want to listen out for newer events.
This Subject replays events to current and late observers, and it can be created in several ways:
- create. This introduces an unbounded subject that replays everything
- createWithSize(int). This adds a bounded subject that only retains the amount of items indicated.
- createWithTime(int, TimeUnit). This creates a time-bound subject that retains only those objects contained in the specified time window.
- createWithTimeAndSize. This is a combination of 2 and 3.
This is how the marble diagram of an unbounded ReplaySubject would look:
As you can see, each new subscriber is getting all the elements emitted by the parent. This Subject is useful, for instance, if we want to cast data into several views that can be created in different moments while ensuring they have exactly the same content.
This subject, used in Android’s Presenters/ViewModels, is quite similar to the PublishSubject, but it caches the most recent value emitted. When we have a configuration change (i.e: Screen Rotation) we usually lose the subscription and we have to resubscribe to it. Thanks to the BehaviorSubject, we’ll have the most recent data without needing to get new data.
This subject caches the last event emitted and sends it to the observers only when an onComplete event is emitted. This subject can be used when we don’t care about the data stream, only the last object.
AsyncSubject requires completion before sending the last object to the observers. That means that, in any case, it can be an endless data flow observable.
Here is the marble diagram of this subject:
Flowables and Processors
Flowables are like observables but they support backpressure. To better understand this concept I’d recommend the Wiki page on Backpressure on the official documentation.
Basically a backpressure strategy indicates what to do with emitted items if they can’t be processed as fast as they are received. We can imagine, for instance, a flowable that sends gyroscope data with a really fast frequency and we need to apply a strong computation algorithm over each emitted item.
If the type spend for the algorithm is considerably higher than the time between each item’s emission, then backpressure strategy is applied. (If we use an Observable instead of a Flowable, then we will have a backpressure exception).
So now that we know what a backpressure strategy is, I’ll list the available options.
This is one of the most simple strategies. We are just indicating that we’ll ignore all the streamed items that can’t be processed until downstream can accept more of them. It’s well worth taking a look at the official documentation examples about how and when we can use it.
Creates a bounded or unbounded buffer that holds the emitted items that couldn’t be processed by the downstream. Notice that if we set a bounded buffer it can lead to a backpressure exception as we can overflow its bounds. Check this section if you want to learn more about how to create a buffer strategy
This is the last strategy. It’s similar to the drop strategy but it keeps the last emitted item. Unlike drop, it ensures that at least one element is retained. Like the other strategies, It’s interesting to take a look at the official documentation on Latest to see examples of how and when to use this strategy.
And last but not least, once we’ve talked about flowables and their backpressure strategies we have to mention processors. They have the same homologous function as the subject for observables but, in addition, they support backpressure.
Completable, Single and Maybe
Now it’s the turn of the one-shot observable sources. We are going to introduce Single, Maybe and Completable.
A Single is an observable that only emits one item and then completes. The single ensures that one item will be sent, so it’s super-useful when we want to ensure we haven’t got empty outputs.
As you can see there are 2 methods in the subscription they are all mutually exclusive so just one of them can be called at the end. To know:
- onError will be called whenever an error is thrown in some point of the stream.
- onSuccess will be called if we get a response from the stream, in this case information about the user and we use a Single because we know 100% that a user has information to show.
Maybe works in a similar way to single, but with a particular property: it can complete without emitting a value. This is useful when we have optional emissions, for example when we want to get a logged user but we are not signed in yet.
This is an example of how a Maybe works:
As you can see there are 3 methods in the subscription they are all mutually exclusive so just one of them can be called at the end. To know:
- onError will be called whenever an error is thrown in some point of the stream.
- onSuccess will be called if we get a response from the stream
- onComplete will be called if we have an empty output
The Completable is the last of the trio and it just broadcasts a complete event. This is useful when we need to carry out actions that don’t require a specific output (like when we make a login or send data, or when we simply need an OK/KO). In general we can think of a completable as a replacement for a runnable, which just executes code and then ends (whereas the completable sends a notification at the end-point).
As you can see, there are two methods in the subscription. They are both mutually exclusive so only one of them can be called at the end. It’s worth knowing:
- onError will be called whenever an error is thrown at some point of the stream or if, in this case, we couldn’t perform the login for some reason.
- onComplete will be called if everything went right. In this case we don’t have any output, just a code block that will be invoked afterwards.
Ok, that’s all for now. We hope you found this article useful and will come back to read the rest of the series. We’ll be delving into React in ever-more detail over the next few weeks, so there’ll be some useful advice for everyone.
Before you go, just a quick note about Bugfender, the remote logging tool we’ve built (which also includes crash reporting and in-app feedback, not that we like to brag). Bugfender works with Reactive extensions, so you can use it to track the logs of apps built using RxJava2. We think it’s a really useful piece of technology.
For more info go to https://bugfender.com/.