Getting Started With Rx.js: A Gentle Introduction

| Comments

The first talk about Reactive Extensions I listened to was in 2010 at a .NET conference in Sweden. Back then it looked like an awesome way to write ultra-responsive (in the fast-to-respond sense) WPF applications. I watched in awe as the speaker demonstrated how to magically orchestrate a throng of diverse asynchronous operations into one beautiful, declarative and concise set of instructions.

I watched it then, admired it and never saw it again, never used it myself, not even once. Yep… that happens sometimes :)

Fast-forward to today and it feels like Rx.js is exploding (EXPLODIIIING!!!), with frameworks like cycle.js that let you build applications right on top of Rx.js or Angular2 that has chosen Rx.js as its core async pattern.

Rx.js Logo

But what is Rx.js, why is it so popular and ever so useful?

Let’s find out

JavaScript and Async

JavaScript and async go hand in hand. As a single-threaded runtime every expensive operation in JavaScript is handled asynchronously either via callbacks, or most recently through promises.

The richer the user experience an application requires, the more advanced an application becomes and the more you need to skillfully orchestrate diverse asynchronous operations: handling DOM events, mixing them with asynchronous requests to an API, more DOM events, more requests, etc

Traditional async patterns like callbacks don’t scale well in this type of scenario because they muddle the flow of the application, they are not easily composable and they are not very good at handling errors.

Promises improve inmensely on callbacks because they are composable, they define a very uniform API and a clear way to handle errors. However, they can only express one single task that will be completed some time in the future.

Here is where Observables and Rx.js come in. Rx.js generalizes that concept of a promise into an asynchronous sequence of data through time and gives you a ton of operators that help you manipulate that sequence in whichever way you want.

An Example Before You Go Crazy With All This Bla Bla Bla

You can find the source code and a live sample of this example on jsFiddle

Let’s imagine a program where whenever you click on a button you want to pick a random player for a game of virtual darts and display it on the screen… Behold! iDart!!.

If you reflect about it, you can think of the clicks of a button as a asynchronous sequence that happens over the lifetime of a program. With that in mind, solving this problem would consist in converting this sequence of clicks into a request to an API that would in turn become a UI element displaying a player.

Rx.js gives you the tools to reason about programs in this special way, and let’s you write programs that operate strictly on these asynchronous data sequences.

Imagine that we have a very simple HTML template to describe the UI of our very simple app:

1
2
3
4
5
<h1>Players Ready!!</h1>
<section class="players">
    <!-- your players here -->
</section>
<button class="load-more">load more</button>

And then define how data flows in our application. We start with our load-more button:

1
let btnLoadMore = document.querySelector('.load-more');

And we create an asynchronous sequence of data that I will call observable from now on (because it’s much shorter and I don’t get paid by the word):

1
2
3
4
let requests$ = Rx.Observable
  .fromEvent(btnLoadMore, 'click')
  .map( _ => 'https://api.github.com/users')
  .startWith('https://api.github.com/users')

The Rx.Observable.fromEvent(btnLoadMore, 'click') creates an observable of clicks that represents that idea of clicks over the lifetime of a program. Now that we have a tangible way to represent that, we can take advantage of it to create a requests$ observable that is a sequence of urls that will only happen whenever a user clicks on the button. Finally, the startsWith operator pushes a url at the beginning of the sequence to ensure that we have a starting request even when the user hasn’t yet clicked on the button.

A great way to represent observables is through marble diagrams which display pieces of data made available over time like this:

1
2
3
4
5
6
7
8
9
// this arrow represents time passing by...
// ... and never coming back...
// --------------->

// the output of each of these observables goes as follows:

fromEvent -----c---c------c--->    c (click)
map       -----u---u------u--->    u (url)
startWith u----u---u------u--->    u (url)

So we have created an observable of requests (or perhaps more exactly an observable of urls that represent the intention of performing a request). The next step is to transform these requests into actual responses:

1
2
3
let responses$ = requests$
  .flatMap(url => Rx.Observable.fromPromise(fetch(url)))
  .flatMap(r => Rx.Observable.fromPromise(r.json()))

In order to do that we use flatMap which does a couple of things in this example. First it does a map operation and therefore transforms a bit of data from the original observable (a url) into another observable that represents the asynchronous task of doing a request to a remote server. Second, it flattens the resulting observable so that instead of having an observable of observables we get an observable of responses which is easier to work with.

You can see the difference between map and flatMap more clearly using Marble diagrams:

1
2
3
4
5
6
requests$          -------u-------------u------->   u (url)
requests$.map      ----------------------------->   R (response)
                          |             |
                          ----R--->     |
                                        ----R--->
requests$.flatmap  -----------R-------------R--->   R (response)

Where the arrows coming from the original stream (sequence of data over time) in requests$.map represent new streams. As a result we get some sort of stream of streams also known as metastream (that could be the coolest word you’ll learn this week) and therefore the need to flatten it.

Another interesting thing to note from the last example above is that I am using the new fetch API to make requests and then deserialize the body of the request into JSON. Since both of those APIs return a promise, we need to convert them to observables using the fromPromise operator. And I’ll repeat the example so you have a chance to see it again because repetition is great for learning:

1
2
3
let responses$ = requests$
  .flatMap(url => Rx.Observable.fromPromise(fetch(url)))
  .flatMap(r => Rx.Observable.fromPromise(r.json()))

Ok so now that we got our responses with a collection of players coming right from GitHub we just pick one at random doing the following transformation:

1
2
3
4
5
6
7
let randomPlayer$ = responses$
  .map(pickRandomItem)

function pickRandomItem(arr){
  var randomIndex = Math.floor(Math.random() * arr.length)
  return arr[randomIndex]
}

Finally we subscribe to the randomPlayer$ observable so that whenever we get a new random player emitted we append it to the DOM. This subscribe method is particularly interesting because the observable is dormant until someone subscribes to it. Before this line of code nothing has happened yet, no requests have been done to the GitHub API to retrieve our first player. It is when you subscribe to an observable that it comes to life and starts performing its magic for you.

1
randomPlayer$.subscribe(appendPlayerToDOM)

The appendPlayerToDOM is just a function that does some DOM manipulation using vanilla JS:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
function appendPlayerToDOM(u){
  var t = toTemplate(u)
  var users = document.querySelector('.players')
  users.appendChild(t)
}

function toTemplate(user){
  var a = document.createElement("a")
  a.textContent = user.login
  a.href = user.html_url
  var img = document.createElement("img")
  img.src = user.avatar_url
  var div = document.createElement("div")
  div.appendChild(img)
  div.appendChild(a)
  return div;
}

Since I did the example following a very step by step approach it may seem like a lot of code, but the reality (and the coolest) thing is that we can see the complete code for the application at a glance and instantly understand what’s going on:

1
2
3
4
5
6
7
8
9
10
11
let requests$ = Rx.Observable
  .fromEvent(btnLoadMore, 'click')
  .map( _ => 'https://api.github.com/users')
  .startWith('https://api.github.com/users')

let responses$ = requests$
  .flatMap(url => Rx.Observable.fromPromise(fetch(url)))
  .flatMap(r => Rx.Observable.fromPromise(r.json()))

let randomPlayer$ = responses$
  .map(pickRandomItem)

Or:

1
2
3
4
5
6
7
let randomPlayer$ = Rx.Observable
  .fromEvent(btnLoadMore, 'click')
  .map( _ => 'https://api.github.com/users')
  .startWith('https://api.github.com/users')
  .flatMap(url => Rx.Observable.fromPromise(fetch(url)))
  .flatMap(r => Rx.Observable.fromPromise(r.json()))
  .map(pickRandomItem)

Or:

1
2
3
4
5
6
1. Grab all the clicks of the 'load more' button
2. Transform them into user urls
3. Start with a user url even before the user has clicked on the button
4. Transform them into requests
5. Transform the response into a collection of users
6. Pick a user randomly

I don’t know if you share my opinion but I think it is awesome to be able to see this little application as a continous flow of data in different forms and shapes until it gets to the result that we want.

A Better Async Pattern: Observables and Asynchronous Data Streams

As you have seen from the previous example, Observables make operating on diverse asynchronous operations very natural and straightforward. They are super composable and lend themselves naturally to working with sequences of asynchronous data.

You can see an Observable as a combination of two classic programming design patterns: the Observer and the Iterator patterns.

The Observer pattern lets a number of observers get notified when something changes in a subject that they are observing. And this is essentially what is happening when we subscribe to an Observable and get notified when a new piece of data is emitted by the Observable. The subscribe method is indeed helping us create an observer with a very simple API (in the previous example just one single method).

Since Observables also have this dimension of a collection of data or sequence then we get to the Iterator pattern. This pattern lets us abstract how we iterate over a specific collection by wrapping this iteration inside an object and providing a uniform API to get the next element by calling a next method and whether we are finished iterating through the hasNext method. With Observables, instead of the client iterating over a sequence and pulling values, the client gets pushed values as they appear through time, and can redefine how these sequences are iterated by using different Rx.js operators and transforming observables. In fact, if you look at the API of an Rx.js observer you’ll see that it exposes an onNext and onCompleted method which map conceptually to next and hasNext from the iterator pattern.

Reactive Programming? Why I hear so much about it in relation to Rx.js?

Reactive Programming consists on programming working with asynchronous data streams. A stream is a sequence of data that is made available over time. Rx.js makes it very easy to do reactive programming because it gives you the Observable type that is a representation of an async data stream and a lot of operators and utilities to work on them.

Observables vs Streams

If you get caught by the Rx.js police you’ll need to know the difference between an Observable and a Stream. You can see an Observable like a lazy stream. An observable becomes a stream when it gets activated by someone subscribing to it.

A Super Simple Intro to Observables: Like an Array But Different

You can find and experiment with this example at jsFiddle

Let’s drop down a notch to a very simple example of Rx.js that may not have a lot of practical utility but will help you in your path to understanding Observables.

Imagine that you have an array of numbers in JavaScript:

1
var numbers = [1,2,3,4,5,6]

You can just log the array and verify that right now, at this point in time it has all the items you have declared it with:

1
console.log(numbers) // => 1,2,3,4,5,6

And you can take advantage of the Array.prototype methods to add all the numbers that are greater than 3:

1
2
3
var result = numbers
   .filter(n => n > 3)
   .reduce((a,x) => a+x, 0)

And again, you can log the result:

1
console.log(result)  // => 15

We can take this array and automagically transform it into an observable using Rx.js:

1
2
var numbers$ = Rx.Observable
  .range(1,6);

or, alternatively, reusing the original array:

1
2
3
4
5
// A stream
var numbers$ = Rx.Observable
  .interval(500)
  .take(6)
  .map(i => numbers[i]);

So that, instead of having an array that has all values at a specific point in time, now we have an asynchronous data sequence where items within that array are pushed or emitted through time. In this particular case, every 500 ms we will get a new number. If we subscribe to this observable we can verify how values are emitted:

1
2
3
4
numbers$.subscribe(n => console.log(n));
// => 1
// => 2
// => 3... etc

Since this sequence also represents a collection like an array, it makes sense to use array-like methods to operate on the sequence and therefore we can also use filter and reduce:

1
2
3
4
5
6
var result$ = numbers$
  .filter(n => n > 3)
  .reduce((a,x) => a+x, 0);

result$.subscribe(n => console.log(n));
// => 15

And so we can see that an observable is just like an array, but instead of representing a collection of items in space at a specific point in time, an observable represents an collection of items made availale over a period of time. Hope that makes it clearer!! :)

The Power of a One Single Abstraction

A super interesting thing that comes out of using Rx.js is that you can represent any application in terms of inputs and outputs, in terms of asynchronous flows of data that get transformed during the lifetime of a program.

Rx.js comes with a lot of operators that let you convert almost anything into observables, any primitive data type, DOM events, promises, etc… and you can even wrap anything inside an observable yourself (roll it on your own if you will). In this context then, where you can have everything becoming an observable, taking into account that observables are so easily composed with each other and the fact that Rx.js comes with a ton of operators that help you do amazing things with observables, all of the sudden you have a world of endless possibilities at your disposition.

Side-Effects of Using Rx.js

Other great side-effects of using Rx.js in your application are readability and less bug-prone code.

Rx.js declarative nature results in a more terse code base that is very readable and where you can follow the flow of data through a program step by step (just like you could appreciate in the examples above).

Additionally, transforming your application in a flow of data limits the number of side-effects, highlights the side-effects that do exist as they clearly affect something outside of the streams, which ends up making your programs less error-prone and helping you reason about what’s going on in your code.

Boom!

Want to Learn More?

Would you like to learn more about Observables, Rx.js and Reactive Programming? Then take a look at any of these resources:

Concluding

So! This article grew a little bit larger than I had envisioned in the beginning! Haha! Hope you have enjoyed it though.

Today you learned about Rx.js and reactive programming. You saw the limitations of callbacks and promises as async patterns and discovered observables as a super composable async paradigm. Observables let you represent asynchronous data sequences over time, transform them to your heart’s content into things that are useful for your program and result in super declarative and readable source code. You also saw the similarities of an observable with the observer and iterator patterns and to an array whose items are spread through time. We finally reflected about the power of using a single abstraction to represent the flows of data in a program and how using Rx.js limits the amount of side-effects required in your code.

There’s some more things to cover like, some of the underpinings of Rx.js with a closer look at the observable and observer types, tons of operators, practical recipes, error handling, rx.js and angular2, so keep posted! :)

Comments