Skip to content

Commit

Permalink
Add more functions to observables
Browse files Browse the repository at this point in the history
  • Loading branch information
maniator committed May 3, 2017
1 parent 8fde66c commit c84b7b6
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,18 @@ export * from './utilities/makeHot';
// add on operators
export * from './operators/map';
export * from './operators/filter';
export * from './operators/delay';
export * from './operators/do';
export * from './operators/debounceTime';
export * from './operators/take';
export * from './operators/first';
export * from './operators/toPromise';
export * from './operators/flatMap';
export * from './operators/switchMap';

// add on observable types
export * from './observables/fromEvent';
export * from './observables/fromPromise';
export * from './observables/range';
export * from './observables/interval';
export * from './observables/of';
13 changes: 13 additions & 0 deletions src/observables/of.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Observable } from '../Observable';

export const of = function (...list) {
return new Observable((observer) => {
Array.from(list).forEach((value) => {
observer.next(value);
});

observer.complete();
});
};

Observable.of = of;
21 changes: 21 additions & 0 deletions src/operators/delay.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { Observable } from '../Observable';

export const delay = function (source$, time) {
return Observable.create((observer) => {
let subscription;
const timerId = setTimeout(() => {
subscription = source$.subscribe(observer.next, observer.error, observer.complete);
}, time);

return () => {
clearTimeout(timerId);
if (subscription) {
subscription.unsubscribe();
}
};
});
};

Observable.prototype.delay = function (time) {
return delay(this, time);
};
51 changes: 51 additions & 0 deletions src/operators/flatMap.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { Observable } from '../Observable';
import { isObservable } from '../utilities';
import '../observables/of';
import './map';

export const flatMap = function (source$, mapCallback) {
return new Observable((observer) => {
let subscription = { isComplete: false };
const nextSubscriptionList = [];
const onComplete = () => {
const nextComplete = nextSubscriptionList.reduce(
(curr, sub) => curr && sub.isComplete,
true
);

if (subscription.complete && nextComplete) {
observer.complete();
}
};

subscription = source$
.map(mapCallback)
.map((nextValue) => {
let nextValue$ = nextValue;

if (!isObservable(nextValue$)) {
nextValue$ = Observable.of(nextValue$);
}

return nextValue$
})
.subscribe((nextValue$) => {
const nextSubscription = nextValue$.subscribe(
observer.next,
observer.error,
onComplete
);

nextSubscriptionList.push(nextSubscription);
}, observer.error, onComplete);

return () => {
nextSubscriptionList.forEach((nextSub) => nextSub.unsubscribe());
subscription.unsubscribe();
};
});
};

Observable.prototype.flatMap = function (mapCallback) {
return flatMap(this, mapCallback);
};
44 changes: 44 additions & 0 deletions src/operators/switchMap.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { Observable } from '../Observable';
import { isObservable } from '../utilities';
import '../observables/of';
import './map';

export const switchMap = function (source$, mapCallback) {
return new Observable((observer) => {
let currentSubscription;
let nextSubscription;

currentSubscription = source$
.map(mapCallback)
.map((nextValue) => {
let nextValue$ = nextValue;

if (!isObservable(nextValue$)) {
nextValue$ = Observable.of(nextValue$);
}

return nextValue$
})
.subscribe((nextValue$) => {
// unsubscribe to the previous subscription if a new value comes in
if (nextSubscription) {
nextSubscription.unsubscribe();
}

nextSubscription = nextValue$.subscribe((value) => {
observer.next(value);
}, observer.error, observer.complete);
}, observer.error, observer.complete);

return () => {
if (nextSubscription) {
nextSubscription.unsubscribe();
}
currentSubscription.unsubscribe();
};
});
};

Observable.prototype.switchMap = function (mapCallback) {
return switchMap(this, mapCallback);
};
2 changes: 2 additions & 0 deletions src/utilities.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
export const noop = () => null;

export const isPromise = (p) => p !== null && typeof p === 'object' && typeof p.then === 'function';

export const isObservable = (o$) => o$ !== null && typeof o$ === 'object' && typeof o$.subscribe === 'function';

0 comments on commit c84b7b6

Please sign in to comment.