What is RxJS?

Overview

To better understand what exactly is RxJS we should mention what is Reactive Extensions API. As it is said on their website it is โ€œAn API for asynchronous programming with observable streamsโ€. Donโ€™t worry if this explanation doesnโ€™t make any sense to you. To better understand it you can think of a stream as any data, flowing into your application:

RxJS - An API for asynchronous programming with observable streams
RxJS – An API for asynchronous programming with observable streams

The API is designed to help you manage this data mostly asynchronously, but you should be aware that there are some synchronous functions as well.

It is built on the foundations of the observable pattern, which makes it work over separate values of data, which are produced over time. The API has an implementation for different programming languages, for instance โ€“ Rx.NET for C#, RxCpp for C++, RxJava for Java, and so on. In this article, we will focus on RxJS which is just the JavaScript implementation of the Reactive Extensions API.

One of the reasons of RxJS popularity is probably because it provides a single API for working with data, coming from different sources. When these sources are producing data over time we can think of them as a stream of data, flowing into our application. The stream can be just about anything, for instance:

  • A server response from an http request
  • DOM event – these are actually a great example for stream, because they are happening regularly in each web application
  • Function response with multiple or only one value
  • and so on…

Any data coming to your application, no matter what is its source, can be treated as a stream. The problem with so different types of sources is that they are processed differently in plain JavaScript. For instance โ€“ to process a server response you can use a callback function, to process a DOM event you can write your function. This problem is solved with RxJS by providing a single API for working with the different types of data.

It works with the observable data type, using the clearly defined interface for processing the data flowing into your application. The name observable comes from the observer development pattern, which is the foundation of RxJS.

The observer pattern

Before starting with RxJS we should be familiar with the observer pattern. The observer is a software design pattern, which is the foundation of the RxJS library. In this pattern, we have an object called the subject, which maintains a list of dependents, called observers.

What is RxJS - the Observer pattern
The observer pattern

When the subject produces some value it is sent to all of the observers, and each one of them can react independently to this value. If we think of our social network profile as a subject, and we create a post, all of our followers, which we can treat as observers, will receive this post and can react to it on their own. For example, John is sharing a video, an image, and a simple text post on his social network account. And he is doing this consistently over time:

The observer pattern - social network example
The observer pattern – social network example

Since the subject (John) is maintaining a list of observers (his followers), each of them will be notified about the uploaded video. Then they will be notified about the image, and finally, about the post with the simple text. Each of Johnโ€™s observers can react independently from the other observers to each one of the produced values.

And this is in short the observer pattern. As we can see in this example we have a single subject producing values for multiple observers. In RxJS this is called multicasting which is not so common. The, more common scenario for working with the data, is covered in the next section.

The observer pattern in RxJS

As I mentioned before, the library works with the observable data type. This observable is just an object, which is producing some data and has a function called subscribe(). Everyone in our application, which wants to receive this data, is passed as a parameter to this function. This is the observer, and we can have as many observers as we want. An observer is an object implementing a simple interface, with just 3 functions next(), error(), and complete(). The observable is calling these functions depending on the current state of the data. If there is a new value produced, it will call the next() function, if there is an error โ€“ the error() will be called, and when there are no more values to produce, it will finally call complete().

For Instance, we have an array of numbers coming from an http response:

JavaScript
let numbers: Observable<Number> = this.getNumbers();

numbers.subscribe({
  next: number => console.log(number),
  error: err => console.log(err),
  complete: () => console.log("Done recieving numbers")
});

The observable in this example is numbers, and the observer is the object passed to the subscribe function.

The observer is subscribed to the observable and the first value can be pushed to the observer by calling its next() function. When the second value is ready, the observable calls again next(). When the final value comes, it calls next() and then complete(). In your application, you will have to write the logic inside these functions, depending on how you want to process those values.

Benefits and examples with RxJS

The benefit of RxJS is in the single API, for processing all kinds of data. Let’s look at some examples if we didn’t use RxJS:

Example with plain JavaScript

In plain JavaScript you can use either callbacks or promises to process data returned asynchronously to your application. If you are using callbacks, your code will look like this:

JavaScript
this.api.get('/studentsGrades', (request: any, response: any) => {
  processResult('grades.txt', (error, result) => {
    writeToFile("filePath", result)
  })
})

In this code we have a call to an API endpoint, then a callback that will process the returned data, and then another callback that will write it to a file. It is ok, but starts too hard to read, especially if there is more nested logic.

Example with promise

If a promise is used it will be like this:

JavaScript
let gradesPromise: Promise<Grades> = getGrades();

gradesPromise
  .then(grade => writeToFile("filePath", grade))
  .catch(err => logError(err))
  .finally(() => console.log("Grades recieved!"));

The promise has a more readable interface. We call an API endpoint, which will return a promise, and chain the 3 functions then(), catch(), and finally(). It is easier to read than nested callbacks, but this is only good for a single value. When the promise is resolved, and the value is passed to the functions, we are done, and we are not going to receive more values.

Example with async and await

Please note that you may see the async await syntax:

JavaScript
async function getGradesAsync() {
  let grades = await getGradesFromServer();
  return grades;
}

It looks nice and simple but, under the hood, it works with promises, so we have the same problem with the single value.

Conclusion

RxJS gives us a clear API for processing asynchronous logic. We are calling a function, that returns an observable. Then we can call the subscribe function of this observable and pass it to an observer as a parameter. This observer implements the interface with 3 functions โ€“ next(), error(), and complete(). When the observable is ready to process a value, it calls the next() function of the observer, when there is an error it will call error(), and when it is done the complete() function will be invoked. The most important difference with the promise is that the observable can process multiple values over time. When a new value is produced, it will call the observerโ€™s next() function again. This gives us a clean API for working with various stream types.