- Observables are lazy Push collections of multiple values.
- Observables are like functions with zero arguments, but generalize those to allow multiple values.
- Subscribing to an Observable is analogous to calling a Function.
-
Creating Observables
-
Rx.Observable.create
operator is an alias for theObservable
constructor, and it takes one argument: Thesubscribe
function (Note: here, thesubscribe
function doesn't indicate Observable prototypesubscribe
function. It simply indicates the function which thecreate
operator takes as an argument.)var observable = Rx.Observable.create(function subscribe(observer) { var id = setInterval(() => { observer.next('hi') }, 1000); });
-
Observables can be created with
create
, but usually we use the so-called creation operators, likeof
,from
,interval
, etc.
-
-
Subscribing to Observables
- The
subscribe
function calls are not shared among multiple Observers of the same Observable. - When calling
observable.subscribe
with an Observer, thesubscribe
function is run for that given Observer.
- The
-
Executing the Observable
-
The body of the
subscribe
function represents an "Observable execution", a lazy computation that only happens for each Observer that subscribes. -
There are three types of values an Observable Execution can deliver:
- "Next" notification: sends a value such as a Number, a String, an Object, etc.
- "Error" notification: sends a JavaScript Error or exception.
- "Complete" notification: does not send a value.
-
In an Observable Execution, zero to infinite
Next
notifications may be delivered. If either anError
orComplete
notification is delivered, then nothing else can be delivered afterwards.var observable = Rx.Observable.create(function subscribe(observer) { try { observer.next(1); observer.next(2); observer.next(3); observer.complete(); } catch (err) { observer.error(err); // delivers an error if it caught one } });
-
-
Disposing/Unsubscribing Observables
-
When
observable.subscribe
is called, it returns an object: theSubscription
. -
With
subscription.unsubscribe()
, you can cancel the ongoing execution:var observable = Rx.Observable.from([10, 20, 30]); var subscription = observable.subscribe(x => console.log(x)); // Later subscription.unsubscribe();
-
Each Observable must define how to dispose resources of that execution when we create the Observable using
create()
. You can do that by returning a customunsubscribe
function from withinfunction subscribe()
.var observable = Rx.Observable.create(function subscribe(observer) { // Keep track of the interval resource var intervalID = setInterval(() => { observer.next('hi'); }, 1000); // Provide a way of canceling and disposing the interval resource return function unsubscribe() { clearInterval(intervalID); }; });
-