From c84b7b6980983aa9f061b6cffe5b98fd6cf4cad4 Mon Sep 17 00:00:00 2001 From: Neal Lubin Date: Wed, 3 May 2017 13:59:46 -0400 Subject: [PATCH] Add more functions to observables --- src/index.js | 4 +++ src/observables/of.js | 13 ++++++++++ src/operators/delay.js | 21 ++++++++++++++++ src/operators/flatMap.js | 51 ++++++++++++++++++++++++++++++++++++++ src/operators/switchMap.js | 44 ++++++++++++++++++++++++++++++++ src/utilities.js | 2 ++ 6 files changed, 135 insertions(+) create mode 100644 src/observables/of.js create mode 100644 src/operators/delay.js create mode 100644 src/operators/flatMap.js create mode 100644 src/operators/switchMap.js diff --git a/src/index.js b/src/index.js index b0f854d..59e6c07 100644 --- a/src/index.js +++ b/src/index.js @@ -7,14 +7,18 @@ export * from './utilities/makeHot'; // add on operators export * from './operators/map'; export * from './operators/filter'; +export * from './operators/delay'; export * from './operators/do'; export * from './operators/debounceTime'; export * from './operators/take'; export * from './operators/first'; export * from './operators/toPromise'; +export * from './operators/flatMap'; +export * from './operators/switchMap'; // add on observable types export * from './observables/fromEvent'; export * from './observables/fromPromise'; export * from './observables/range'; export * from './observables/interval'; +export * from './observables/of'; diff --git a/src/observables/of.js b/src/observables/of.js new file mode 100644 index 0000000..1aa4799 --- /dev/null +++ b/src/observables/of.js @@ -0,0 +1,13 @@ +import { Observable } from '../Observable'; + +export const of = function (...list) { + return new Observable((observer) => { + Array.from(list).forEach((value) => { + observer.next(value); + }); + + observer.complete(); + }); +}; + +Observable.of = of; diff --git a/src/operators/delay.js b/src/operators/delay.js new file mode 100644 index 0000000..bf036cf --- /dev/null +++ b/src/operators/delay.js @@ -0,0 +1,21 @@ +import { Observable } from '../Observable'; + +export const delay = function (source$, time) { + return Observable.create((observer) => { + let subscription; + const timerId = setTimeout(() => { + subscription = source$.subscribe(observer.next, observer.error, observer.complete); + }, time); + + return () => { + clearTimeout(timerId); + if (subscription) { + subscription.unsubscribe(); + } + }; + }); +}; + +Observable.prototype.delay = function (time) { + return delay(this, time); +}; diff --git a/src/operators/flatMap.js b/src/operators/flatMap.js new file mode 100644 index 0000000..cafb8e8 --- /dev/null +++ b/src/operators/flatMap.js @@ -0,0 +1,51 @@ +import { Observable } from '../Observable'; +import { isObservable } from '../utilities'; +import '../observables/of'; +import './map'; + +export const flatMap = function (source$, mapCallback) { + return new Observable((observer) => { + let subscription = { isComplete: false }; + const nextSubscriptionList = []; + const onComplete = () => { + const nextComplete = nextSubscriptionList.reduce( + (curr, sub) => curr && sub.isComplete, + true + ); + + if (subscription.complete && nextComplete) { + observer.complete(); + } + }; + + subscription = source$ + .map(mapCallback) + .map((nextValue) => { + let nextValue$ = nextValue; + + if (!isObservable(nextValue$)) { + nextValue$ = Observable.of(nextValue$); + } + + return nextValue$ + }) + .subscribe((nextValue$) => { + const nextSubscription = nextValue$.subscribe( + observer.next, + observer.error, + onComplete + ); + + nextSubscriptionList.push(nextSubscription); + }, observer.error, onComplete); + + return () => { + nextSubscriptionList.forEach((nextSub) => nextSub.unsubscribe()); + subscription.unsubscribe(); + }; + }); +}; + +Observable.prototype.flatMap = function (mapCallback) { + return flatMap(this, mapCallback); +}; diff --git a/src/operators/switchMap.js b/src/operators/switchMap.js new file mode 100644 index 0000000..ff4d6a8 --- /dev/null +++ b/src/operators/switchMap.js @@ -0,0 +1,44 @@ +import { Observable } from '../Observable'; +import { isObservable } from '../utilities'; +import '../observables/of'; +import './map'; + +export const switchMap = function (source$, mapCallback) { + return new Observable((observer) => { + let currentSubscription; + let nextSubscription; + + currentSubscription = source$ + .map(mapCallback) + .map((nextValue) => { + let nextValue$ = nextValue; + + if (!isObservable(nextValue$)) { + nextValue$ = Observable.of(nextValue$); + } + + return nextValue$ + }) + .subscribe((nextValue$) => { + // unsubscribe to the previous subscription if a new value comes in + if (nextSubscription) { + nextSubscription.unsubscribe(); + } + + nextSubscription = nextValue$.subscribe((value) => { + observer.next(value); + }, observer.error, observer.complete); + }, observer.error, observer.complete); + + return () => { + if (nextSubscription) { + nextSubscription.unsubscribe(); + } + currentSubscription.unsubscribe(); + }; + }); +}; + +Observable.prototype.switchMap = function (mapCallback) { + return switchMap(this, mapCallback); +}; diff --git a/src/utilities.js b/src/utilities.js index 8d6450e..4f2fd2e 100644 --- a/src/utilities.js +++ b/src/utilities.js @@ -1,3 +1,5 @@ export const noop = () => null; export const isPromise = (p) => p !== null && typeof p === 'object' && typeof p.then === 'function'; + +export const isObservable = (o$) => o$ !== null && typeof o$ === 'object' && typeof o$.subscribe === 'function';