barbarian meets coding

WebDev, UX & a Pinch of Fantasy

Rx.js - Reactive Extensions Wiki

This article is part of my personal wiki where I write personal notes while I am learning new technologies. You are welcome to use it for your own learning!

An Introduction to Reactive Extensions

Rx.js let’s you program applications using reactive programming, a special flavor of programming where you operate on event streams, which are sequences of events happening over time. It is useful because it simplifies programming asynchronous applications greatly.

A Quick Intro to Event Streams

You can get an idea of what an event stream is if you compare it with a JavaScript array. (Check example of JsFiddle).

While a JavaScript array contains a collection of items that are static in time:

1
2
3
4
5
6
7
8
var numbers = [1,2,3,4,5,6]

var result = numbers
   .filter(n => n > 3)
   .reduce((a,x) => a+x, 0)

console.log(numbers) // => 1,2,3,4,5,6
console.log(result)  // => 15

An stream can push these same collection of items over time:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// A stream
var numbers$ = Rx.Observable
  .interval(500)
  .take(6)
  .map(i => numbers[i]);

var result$ = numbers$
  .filter(n => n > 3)
  .reduce((a,x) => a+x, 0);

numbers$.subscribe(n => console.log(n));
// => 1
// => 2
// => 3... etc
result$.subscribe(n => console.log(n));
// =. 15

Using RxJs With The DOM

RxJS makes it very easy to turn DOM events into streams by using the fromEvent method. For instance, we can create a click counter like this:

1
2
3
4
5
6
7
8
9
let button = document.querySelector('.increase-counter');
let text = document.querySelector('.counter');
var clicks$ = Rx.Observable
  .fromEvent(button, 'click');

clicks$.subscribe(e => {
  console.log('increase count');
  text.innerText = parseInt(text.innerHTML)+1;
});

We can take advantage of the numerous operators provided by RxJs to create very compact programs without side-effects that would otherwise require a lot of code. For instance, here’s a double click detector:

1
2
3
4
5
6
7
8
9
10
11
12
let button = document.querySelector('.double-click')
let text = document.querySelector('.message')

let clicks$ = Rx.Observable
  .fromEvent(button, 'click')

let doubleClicks$ = clicks$
  .buffer( _ => clicks$.debounce(300))
  .map(clicksWithin300ms => clicksWithin300ms.length)
  .filter(clicksWithin300ms => clicksWithin300ms === 2)

doubleClicks$.subscribe(_ => text.innerText = "double Click!!");

Using RxJS with Asynchronous Requests

A common pattern in handling asynchronous requests is the Promise. A promise is very similar to an event stream that only pushes a single event in the stream when the promise is either resolved or rejected. RxJS offers a very simple way to convert a promise into an event stream by using the fromPromise operator. The following example shows how to use RxJS to make an async request to Github’s API using the new fetch API:

1
2
3
4
5
6
7
8
9
10
let usersUrl = 'https://api.github.com/users'
let requests$ = Rx.Observable.just(usersUrl)

let responses$ = requests$
  .flatMap(url => Rx.Observable.fromPromise(fetch(url)))
  .flatMap(r => Rx.Observable.fromPromise(r.json()))

responses$.subscribe(users => {
   console.log(users[0].login); // => mojombo
})

The example above uses flatMap instead of map. Why is that? The reason is that if we were to just using map we would get a stream of streams (or a metastream). In order to have a single stream, which is much easier structure to work with, we collapse or flatten the metastream into a single stream so that all values emitted by any separate stream are pushed into this flattened stream.

References

Comments