Skip to content

Latest commit

 

History

History
1431 lines (1090 loc) · 43.1 KB

day_008.md

File metadata and controls

1431 lines (1090 loc) · 43.1 KB

day 8

RxJs

官方英文文档

中文文档

需要vpn 油管视频 RxJs学习视频


基本概念

  • Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
  • Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
  • Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
  • Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 map、filter、concat、flatMap 等这样的操作符来处理集合。
  • Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
  • Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeout 或 requestAnimationFrame 或其他。

Observable (可观察对象)

Observables 是多个值的惰性推送集合。它填补了下面表格中的空白:

单个值 多个值
拉取 Function Iterator
推送 Promise Observable

Observer (观察者)

什么是观察者? - 观察者是由 Observable 发送的值的消费者。观察者只是一组回调函数的集合,每个回调函数对应一种 Observable 发送的通知类型:next、error 和 complete 。

Subscription (订阅)

什么是 Subscription ? - Subscription 是表示可清理资源的对象,通常是 Observable 的执行。Subscription 有一个重要的方法,即 unsubscribe,它不需要任何参数,只是用来清理由 Subscription 占用的资源。

var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
// 稍后:
// 这会取消正在进行中的 Observable 执行
// Observable 执行是通过使用观察者调用 subscribe 方法启动的
subscription.unsubscribe();

Subscription 还可以合在一起,这样一个 Subscription 调用 unsubscribe() 方法,可能会有多个 Subscription 取消订阅 。

var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);

var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
    // subscription 和 childSubscription 都会取消订阅
    subscription.unsubscribe();
}, 1000);

结果

second: 0
first: 0
second: 1
first: 1
second: 2

Subscriptions 还有一个 remove(otherSubscription) 方法,用来撤销一个已添加的子 Subscription 。

Subject (主体)

  • 什么是 Subject? - RxJS Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,而普通的 Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)。
Subject 像是 Observable,但是可以多播给多个观察者。Subject 还像是 EventEmitters,维护着多个监听器的注册表。
  • 每个 Subject 都是 Observable 。 - 对于 Subject,你可以提供一个观察者并使用 subscribe 方法,就可以开始正常接收值。从观察者的角度而言,它无法判断 Observable 执行是来自普通的 Observable 还是 Subject 。 在 Subject 的内部,subscribe 不会调用发送值的新执行。它只是将给定的观察者注册到观察者列表中,类似于其他库或语言中的 addListener 的工作方式。

  • 每个 Subject 都是观察者。 - Subject 是一个有如下方法的对象: next(v)、error(e) 和 complete() 。要给 Subject 提供新值,只要调用 next(theValue) ,它会将值多播给已注册监听该 Subject 的观察者们。

在下面的示例中,我们为 Subject 添加了两个观察者,然后给 Subject 提供一些值:

var subject = new Rx.Subject();

subject.subscribe({
    next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

//结果
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

因为 Subject 是观察者,这也就在意味着你可以把 Subject 作为参数传给任何 Observable 的 subscribe 方法,如下面的示例所展示的:

var subject = new Rx.Subject();

subject.subscribe({
    next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // 你可以提供一个 Subject 进行订阅
//结果
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

使用上面的方法,我们基本上只是通过 Subject 将单播的 Observable 执行转换为多播的。这也说明了 Subjects 是将任意 Observable 执行共享给多个观察者的唯一方式。

还有一些特殊类型的 Subject:BehaviorSubject、ReplaySubject 和 AsyncSubject。

多播的 Observables

“多播 Observable” 通过 Subject 来发送通知,这个 Subject 可能有多个订阅者,然而普通的 “单播 Observable” 只发送通知给单个观察者。

多播 Observable 在底层是通过使用 Subject 使得多个观察者可以看见同一个 Observable 执行。

在底层,这就是 multicast 操作符的工作原理:观察者订阅一个基础的 Subject,然后 Subject 订阅源 Observable 。下面的示例与前面使用 observable.subscribe(subject) 的示例类似:

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

// 在底层使用了 `subject.subscribe({...})`:
multicasted.subscribe({
    next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
    next: (v) => console.log('observerB: ' + v)
});

// 在底层使用了 `source.subscribe(subject)`:
multicasted.connect();

multicast 操作符返回一个 Observable,它看起来和普通的 Observable 没什么区别,但当订阅时就像是 Subject 。multicast 返回的是 ConnectableObservable,它只是一个有 connect() 方法的 Observable 。

connect() 方法十分重要,它决定了何时启动共享的 Observable 执行。因为 connect() 方法在底层执行了 source.subscribe(subject),所以它返回的是 Subscription,你可以取消订阅以取消共享的 Observable 执行。

引用计数

手动调用 connect() 并处理 Subscription 通常太笨重。通常,当第一个观察者到达时我们想要自动地连接,而当最后一个观察者取消订阅时我们想要自动地取消共享执行。

如果不想显式调用 connect(),我们可以使用 ConnectableObservable 的 refCount() 方法(引用计数),这个方法返回 Observable,这个 Observable 会追踪有多少个订阅者。当订阅者的数量从0变成1,它会调用 connect() 以开启共享的执行。当订阅者数量从1变成0时,它会完全取消订阅,停止进一步的执行。

    refCount 的作用是,当有第一个订阅者时,多播 Observable 会自动地启动执行,而当最后一个订阅者离开时,多播 Observable 会自动地停止执行。
var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

// 这里其实调用了 `connect()`,
// 因为 `refCounted` 有了第一个订阅者
console.log('observerA subscribed');
subscription1 = refCounted.subscribe({
    next: (v) => console.log('observerA: ' + v)
});

setTimeout(() => {
    console.log('observerB subscribed');
    subscription2 = refCounted.subscribe({
        next: (v) => console.log('observerB: ' + v)
    });
}, 600);

setTimeout(() => {
    console.log('observerA unsubscribed');
    subscription1.unsubscribe();
}, 1200);

// 这里共享的 Observable 执行会停止,
// 因为此后 `refCounted` 将不再有订阅者
setTimeout(() => {
    console.log('observerB unsubscribed');
    subscription2.unsubscribe();
}, 2000);

// 执行结果:
//
// observerA subscribed
// observerA: 0
// observerB subscribed
// observerA: 1
// observerB: 1
// observerA unsubscribed
// observerB: 2
// observerB unsubscribed
// refCount() 只存在于 ConnectableObservable,它返回的是 Observable,而不是另一个 ConnectableObservable 。

BehaviorSubject

Subject 的其中一个变体就是 BehaviorSubject,它有一个“当前值”的概念。它保存了发送给消费者的最新值。并且当有新的观察者订阅时,会立即从 BehaviorSubject 那接收到“当前值”。

BehaviorSubjects 适合用来表示“随时间推移的值”。举例来说,生日的流是一个 Subject,但年龄的流应该是一个 BehaviorSubject 。

在下面的示例中,BehaviorSubject 使用值0进行初始化,当第一个观察者订阅时会得到0。第二个观察者订阅时会得到值2,尽管它是在值2发送之后订阅的。

var subject = new Rx.BehaviorSubject(0); // 0是初始值

subject.subscribe({
    next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
});

subject.next(3);
// 输出:
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3

ReplaySubject

ReplaySubject 类似于 BehaviorSubject,它可以发送旧值给新的订阅者,但它还可以记录 Observable 执行的一部分。

ReplaySubject 记录 Observable 执行中的多个值并将其回放给新的订阅者。

当创建 ReplaySubject 时,你可以指定回放多少个值:

var subject = new Rx.ReplaySubject(3); // 为新的订阅者缓冲3个值

subject.subscribe({
    next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
// With output:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5

除了缓冲数量,你还可以指定 window time (以毫秒为单位)来确定多久之前的值可以记录。在下面的示例中,我们使用了较大的缓存数量100,但 window time 参数只设置了500毫秒。

var subject = new Rx.ReplaySubject(100, 500 /* windowTime */);

subject.subscribe({
    next: (v) => console.log('observerA: ' + v)
});

var i = 1;
setInterval(() => subject.next(i++), 200);

setTimeout(() => {
    subject.subscribe({
        next: (v) => console.log('observerB: ' + v)
    });
}, 1000);
// 从下面的输出可以看出,第二个观察者得到的值是3、4、5,这三个值是订阅发生前的500毫秒内发生的:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// ...

AsyncSubject

AsyncSubject 是另一个 Subject 变体,只有当 Observable 执行完成时(执行 complete()),它才会将执行的最后一个值发送给观察者。

var subject = new Rx.AsyncSubject();

subject.subscribe({
    next: (v) => console.log('observerA: ' + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
    next: (v) => console.log('observerB: ' + v)
});

subject.next(5);
subject.complete();
// 输出:
// observerA: 5
// observerB: 5

Operators (操作符)

操作符是 Observable 类型上的方法,比如 .map(...)、.filter(...)、.merge(...),等等。当操作符被调用时,它们不会改变已经存在的 Observable 实例。相反,它们返回一个新的 Observable ,它的 subscription 逻辑基于第一个 Observable 。

操作符是函数,它基于当前的 Observable 创建一个新的 Observable。这是一个无副作用的操作:前面的 Observable 保持不变。

操作符本质上是一个纯函数 (pure function),它接收一个 Observable 作为输入,并生成一个新的 Observable 作为输出。订阅输出 Observable 同样会订阅输入 Observable 。

实例操作符 vs. 静态操作符

  • 什么是实例操作符? - 通常提到操作符时,我们指的是实例操作符,它是 Observable 实例上的方法。
  • 什么是静态操作符? - 除了实例操作符,还有静态操作符,它们是直接附加到 Observable 类上的。
静态操作符是附加到 Observalbe 类上的纯函数,通常用来从头开始创建 Observalbe 。

创建

bindCallback
给它一个签名为f(x, callback)的函数f,返回一个函数g, 调用'g(x)'的时候会返回一个 Observable。
bindNodeCallback
就像是 bindCallback, 但是回调函数必须形如 callback(error, result)这样
combineLatest
组合多个 Observables 来创建一个 Observable ,该 Observable 的值根据每个输入 Observable 的最新值计算得出的。
concat #
连接多个输入 Observable,顺序的发出它们的值,一个 Observable 接一个 Observable。

concat通过一次订阅一个将多个 Observables 连接起来,并将值合并到输出 Observable 中。 你可以传递一个输入 Observable 数组,或者直接把它们当做参数传递。 传递一个空数组会 导致输出 Observable 立马触发完成状态。

create #

创建自定义的 Observable ,它可以做任何你想做的事情

defer

创建一个 Observable,当被订阅的时候,调用 Observable 工厂为每个观察者创建新的 Observable。

惰性创建 Observable, 也就是说, 当且仅当它被订阅的时候才创建。
empty

创建一个什么数据都不发出并且立马完成的 Observable。

仅仅发出 complete 通知,其他什么也不做。
from #

从一个数组、类数组对象、Promise、迭代器对象或者类 Observable 对象创建一个 Observable.

几乎可以把任何东西都能转化为Observable.

将各种其他对象和数据类型转化为 Observables。 from将 Promise、类数组对象、迭代器对象转化为 Observable ,该 Observable 发出 promise、数组或者迭代器的成员。 字符串,在这种上下文里,会被当做字符数组。 类 Observable 对象(包括ES2015里的 Observable 方法)也可以通过此操作符进行转换。

fromEvent #

创建一个 Observable,该 Observable 发出来自给定事件对象的指定类型事件。

// 创建一个来自于 DOM 事件,或者 Node 的 EventEmitter 事件或者其他事件的 Observable。

// 发出 DOM document 上的点击事件。
var clicks = Rx.Observable.fromEvent(document, 'click');
clicks.subscribe(x => console.log(x));
// 结果:
// 每次点击 document 时,都会在控制台上输出 MouseEvent 。
fromPromise #

将 Promise 转化为 Observable。

返回一个仅仅发出 Promise resolve 过的值然后完成的 Observable。
// 将 Fetch 返回的 Promise 转化为 Observable。
var result = Rx.Observable.fromPromise(fetch('http://myserver.com/'));
result.subscribe(x => console.log(x), e => console.error(e));
interval #

创建一个 Observable ,该 Observable 使用指定的 IScheduler ,并以指定时间间隔发出连续的数字。

定期发出自增的数字。
// 每1秒发出一个自增数
var numbers = Rx.Observable.interval(1000);
numbers.subscribe(x => console.log(x));
merge #

创建一个输出 Observable ,它可以同时发出每个给定的输入 Observable 中值。

通过把多个 Observables 的值混合到一个 Observable 中来将其打平。

合并两个 Observables: 时间间隔为1秒的 timer 和 clicks

var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var clicksOrTimer = Rx.Observable.merge(clicks, timer);
clicksOrTimer.subscribe(x => console.log(x));

// 结果如下:
// 每隔1s发出一个自增值到控制台
// document被点击的时候MouseEvents会被打印到控制台
// 因为两个流被合并了,所以你当它们发生的时候你就可以看见.

合并3个Observables, 但是只并行运行2个

var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var concurrent = 2; // the argument
var merged = Rx.Observable.merge(timer1, timer2, timer3, concurrent);
merged.subscribe(x => console.log(x));

// 结果如下:
// - timer1和timer2将会并行运算
// - timer1每隔1s发出值,迭代10次
// - timer2每隔1s发出值,迭代6次
// - timer1达到迭代最大次数,timer2会继续,timer3开始和timer2并行运行
// - 当timer2达到最大迭代次数就停止,timer3将会继续每隔500ms发出数据直到结束
never

创建一个不向观察者发出任何项的 Observable 。

从不发出任何项的 Observable 。

这个静态操作符对于创建既不发出数据也不触发错误和完成通知的 Observable。 可以用来测试或 者和其他 Observables进行组合。 注意,由于不会发送完成通知,这个 Observable 的 subscription 不会被自动地清理。Subscriptions 需要手动清理。

of #

创建一个 Observable,它会依次发出由你提供的参数,最后发出完成通知。

发出你提供的参数,然后完成。

这个静态操作符适用于创建简单的 Observable ,该 Observable 只发出给定的参数, 在发送完这些参数后发出完成通知。它可以用来和其他 Observables 组合比如说concat。 默认情况下,它使用null调度器,这意味着next通知是同步发出的, 尽管使用不同的调度器可以决定这些通知何时送到。

// 发出 10、20、 30, 然后是 'a'、 'b'、 'c', 紧接着开始每秒发出。
var numbers = Rx.Observable.of(10, 20, 30);
var letters = Rx.Observable.of('a', 'b', 'c');
var interval = Rx.Observable.interval(1000);
var result = numbers.concat(letters).concat(interval);
result.subscribe(x => console.log(x));
range #

创建一个 Observable ,它发出指定范围内的数字序列。

发出区间范围内的数字序列。

range 操作符顺序发出一个区间范围内的连续整数, 你可以决定区间的开始和长度。 默认情况下, 不使用 调度器仅仅同步的发送通知, 但是也可以可选的使用可选的调度器来控制发送。

// 发出从1到10的数
var numbers = Rx.Observable.range(1, 10);
numbers.subscribe(x => console.log(x));
throw #

创建一个不发送数据给观察者并且立马发出错误通知的 Observable。

仅仅发出 error 通知,其他什么也不做。

这个静态操作符对于创建简单的只发出错误通知的 Observable 十分有用。 可以被用来和其他 Observables 组合, 比如在 mergeMap 中使用

// 映射并打平成字母序列abc,但当数字为13时抛出错误。
var interval = Rx.Observable.interval(1000);
var result = interval.mergeMap(x =>
    x === 13 ?
        Rx.Observable.throw('Thirteens are bad') :
        Rx.Observable.of('a', 'b', 'c')
);
result.subscribe(x => console.log(x), e => console.error(e));
timer #

创建一个 Observable,该 Observable 在初始延时(initialDelay)之后开始发送并且在每个时间周期( period)后发出自增的数字。

就像是interval, 但是你可以指定什么时候开始发送。
// 每隔1秒发出自增的数字,3秒后开始发送。
var numbers = Rx.Observable.timer(3000, 1000);
numbers.subscribe(x => console.log(x));

// 5秒后发出一个数字
var numbers = Rx.Observable.timer(5000);
numbers.subscribe(x => console.log(x));
webSocket

包装浏览器提供的兼容w3c的WebSocket对象.

// 包装浏览器的WebSocket

let socket$ = Observable.webSocket('ws://localhost:8081');

socket$.subscribe(
    (msg) => console.log('message received: ' + msg),
    (err) => console.log(err),
    () => console.log('complete')
);

socket$.next(JSON.stringify({op: 'hello'}));
zip #

将多个 Observable 组合以创建一个 Observable,该 Observable 的值是由所有输入 Observables 的值按顺序计算而来的。

如果最后一个参数是函数, 这个函数被用来计算最终发出的值.否则, 返回一个顺序包含所有输入值的数组.

// 从不同的源头结合年龄和名称

let age$ = Observable.of < number > (27, 25, 29);
let name$ = Observable.of < string > ('Foo', 'Bar', 'Beer');
let isDev$ = Observable.of < boolean > (true, true, false);

Observable
    .zip(age$,
        name$,
        isDev$,
        (age: number, name: string, isDev: boolean) => ({age, name, isDev}))
    .subscribe(x => console.log(x));

// 输出:
// { age: 27, name: 'Foo', isDev: true }
// { age: 25, name: 'Bar', isDev: true }
// { age: 29, name: 'Beer', isDev: false }

过滤操作符

audit

在另一个 Observable 决定的时间段里忽略源数据,然后发出源 Observable 中最新发出的值, 然后重复此过程。

就像是auditTime, 但是沉默持续时间段由第二个 Observable 决定。
// 以每秒最多点击一次的频率发出点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.audit(ev => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));
auditTime

duration 毫秒内忽略源值,然后发出源 Observable 的最新值, 并且重复此过程。

当它看见一个源值,它会在接下来的 duration 毫秒内忽略这个值以及接下来的源值,过后发出最新的源值。
// 以每秒最多点击一次的频率发出点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.auditTime(1000);
result.subscribe(x => console.log(x));
buffer

缓冲源 Observable 的值直到 closingNotifier 发出。

将过往的值收集到一个数组中,并且仅当另一个 Observable 发出通知时才发出此数组。
// 每次点击发出 interval Observable 最新缓冲的数组。
var clicks = Rx.Observable.fromEvent(document, 'click');
var interval = Rx.Observable.interval(1000);
var buffered = interval.buffer(clicks);
buffered.subscribe(x => console.log(x));
bufferCount

缓冲源 Observable 的值直到缓冲数量到达设定的 bufferSize.

将过往的值收集到一个数组中,当数组数量到达设定的 bufferSize 时发出该数组。
// 将最后两次点击事件作为数组发出
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferCount(2);
buffered.subscribe(x => console.log(x));

// 在每次点击的时候, 以数组的形式发出最后两次点击
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferCount(2, 1);
buffered.subscribe(x => console.log(x));
bufferTime

在特定时间周期内缓冲源 Observable 的值。

将过往的值收集到数组中,并周期性地发出这些数组。
// 每一秒都发出最新点击事件的数组
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferTime(1000);
buffered.subscribe(x => console.log(x));
// 每5秒钟,发出接下来2秒内的点击事件(译者注:后3秒内的点击会被忽略)
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferTime(2000, 5000);
buffered.subscribe(x => console.log(x));
bufferToggle

缓冲源 Observable 的值,openings 发送的时候开始缓冲,closingSelector 发送的时候结束缓冲。

将过往数据收集到数组中. 当opening发送的时候开始收集, 然后调用closingSelector 函数获取 Observable ,该Observable 告知什么时候关闭缓冲。
// 每隔一秒钟,发出接下来500毫秒内的点击事件。
var clicks = Rx.Observable.fromEvent(document, 'click');
var openings = Rx.Observable.interval(1000);
var buffered = clicks.bufferToggle(openings, i =>
    i % 2 ? Rx.Observable.interval(500) : Rx.Observable.empty()
);
buffered.subscribe(x => console.log(x));
bufferWhen

缓冲源 Observable 的值, 使用关闭 Observable 的工厂函数来决定何时关闭、发出和重置缓冲区。

将过往的值收集到数组中, 当开始收集数据的时候, 调用函数返回 Observable, 该 Observable 告知何时关闭缓冲区并重新开始收集。
// 发出每个随机秒(1-5秒)数内的最新点击事件数组。
var clicks = Rx.Observable.fromEvent(document, 'click');
var buffered = clicks.bufferWhen(() =>
    Rx.Observable.interval(1000 + Math.random() * 4000)
);
buffered.subscribe(x => console.log(x));

concatMap

将源值投射为一个合并到输出 Observable 的 Observable,以串行的方式等待前一个完成再合并下一个 Observable。
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.concatMap(ev => Rx.Observable.interval(1000).take(4));
result.subscribe(x => console.log(x));

// 结果如下:
// (结果是串行的)
// 对于"document"对象上的点击事件,都会以1秒的间隔发出从0到3的值
// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3

count

计算源的发送数量,并当源完成时发出该数值。
// 记录第一次点击之前经过了几秒
var seconds = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var secondsBeforeClick = seconds.takeUntil(clicks);
var result = secondsBeforeClick.count();
result.subscribe(x => console.log(x));


// 记录1到7中间有多少个奇数
var numbers = Rx.Observable.range(1, 7);
var result = numbers.count(i => i % 2 === 1);
result.subscribe(x => console.log(x));
// 结果是:
// 4

debounce

只有在另一个 Observable 决定的一段特定时间经过后并且没有发出另一个源值之后,才从源 Observable 中发出一个值。
// 在一顿狂点后只发出最新的点击
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounce(() => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

debounceTime

只有在特定的一段时间经过后并且没有发出另一个源值,才从源 Observable 中发出一个值。
// 在一顿狂点后只发出最新的点击
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.debounceTime(1000);
result.subscribe(x => console.log(x));

defaultIfEmpty

如果源 Observable 在完成之前没有发出任何 next 值,则发出给定的值,否则返回 Observable 的镜像。
// 如果在5秒内没有点击事件发生,发出"no clicks"
var clicks = Rx.Observable.fromEvent(document, 'click');
var clicksBeforeFive = clicks.takeUntil(Rx.Observable.interval(5000));
var result = clicksBeforeFive.defaultIfEmpty('no clicks');
result.subscribe(x => console.log(x));

delay

通过给定的超时或者直到一个给定的时间来延迟源 Observable 的发送。
// 每次点击延迟1秒
var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delay(1000); // each click emitted after 1 second
delayedClicks.subscribe(x => console.log(x));

// 延时所有的点击直到到达未来的时间点
var clicks = Rx.Observable.fromEvent(document, 'click');
var date = new Date('March 15, 2050 12:00:00'); // in the future
var delayedClicks = clicks.delay(date); // click emitted only after that date
delayedClicks.subscribe(x => console.log(x));

delayWhen

在给定的时间范围内,延迟源 Observable 所有数据项的发送,该时间段由另一个 Observable 的发送决定。
// 将每次点击延迟0到5秒的随机时间
var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delayWhen(event =>
    Rx.Observable.interval(Math.random() * 5000)
);
delayedClicks.subscribe(x => console.log(x));

distinct

返回 Observable,它发出由源 Observable 所发出的所有与之前的项都不相同的项。

如果提供了 keySelector 函数,那么它会将源 Observable 的每个值都投射成一个新的值,这个值会用来检查是否与先前投射的值相等。如果没有提供 keySelector 函数,它会直接使用源 Observable 的每个值来检查是否与先前的值相等。
// 使用数字的简单示例
Observable.of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1)
    .distinct()
    .subscribe(x => console.log(x)); // 1, 2, 3, 4


// 使用 keySelector 函数的示例
interface Person {
    age: number,
    name: string
}

Observable.of <Person>(
    {age: 4, name: 'Foo'},
    {age: 7, name: 'Bar'},
    {age: 5, name: 'Foo'})
    .distinct((p: Person) => p.name)
    .subscribe(x => console.log(x));

// 显示:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }

distinctUntilChanged

返回 Observable,它发出源 Observable 发出的所有与前一项不相同的项。

如果提供了 compare 函数,那么每一项都会调用它来检验是否应该发出这个值。
// 使用数字的简单示例
Observable.of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4)
    .distinctUntilChanged()
    .subscribe(x => console.log(x)); // 1, 2, 1, 2, 3, 4

// 使用 compare 函数的示例
interface Person {
    age: number,
    name: string
}

Observable.of <Person>(
    {age: 4, name: 'Foo'},
    {age: 7, name: 'Bar'},
    {age: 5, name: 'Foo'},
    {age: 6, name: 'Foo'}
)
    .distinctUntilChanged((p: Person, q: Person) => p.name === q.name)
    .subscribe(x => console.log(x));

// 显示:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo' }

distinctUntilKeyChanged

返回 Observable,它发出源 Observable 发出的所有与前一项不相同的项,使用通过提供的 key 访问到的属性来检查两个项是否不同。

如果提供了 compare 函数,那么每一项都会调用它来检验是否应该发出这个值。
// 比较人名的示例

interface Person {
    age: number,
    name: string
}

Observable.of <Person>(
    {age: 4, name: 'Foo'},
    {age: 7, name: 'Bar'},
    {age: 5, name: 'Foo'},
    {age: 6, name: 'Foo'})
    .distinctUntilKeyChanged('name')
    .subscribe(x => console.log(x));

// 显示:
// { age: 4, name: 'Foo' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo' }

// 比较名字前三个字母的示例

interface Person {
    age: number,
    name: string
}

Observable.of <Person>(
    {age: 4, name: 'Foo1'},
    {age: 7, name: 'Bar'},
    {age: 5, name: 'Foo2'},
    {age: 6, name: 'Foo3'})
    .distinctUntilKeyChanged('name', (x: string, y: string) => x.substring(0, 3) === y.substring(0, 3))
    .subscribe(x => console.log(x));

// 显示:
// { age: 4, name: 'Foo1' }
// { age: 7, name: 'Bar' }
// { age: 5, name: 'Foo2' }

do

为源 Observable 上的每次发送执行副作用,但返回的 Observable 与源 Observable 是相同的。

拦截源 Observable 上的每次发送并且运行一个函数,但返回的输出 Observable 与 源 Observable 是相同的,只要不发生错误即可。
// 把每次点击映射成该点击的 clientX ,同时还输出点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var positions = clicks
    .do(ev => console.log(ev))
    .map(ev => ev.clientX);
positions.subscribe(x => console.log(x));

elementAt

只发出单个值,这个值位于源 Observable 的发送序列中的指定 index 处。
只发出第i个值, 然后完成。
// 只发出第三次的点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.elementAt(2);
result.subscribe(x => console.log(x));

// 结果:
// click 1 = nothing
// click 2 = nothing
// click 3 = 打印到控制台的 MouseEvent 对象

every

返回的 Observable 发出是否源 Observable 的每项都满足指定的条件。
// 一个简单示例:如果所有元素都小于5就发出 `true`,反之 `false`
Observable.of(1, 2, 3, 4, 5, 6)
    .every(x => x < 5)
    .subscribe(x => console.log(x)); // -> false

filter

通过只发送源 Observable 的中满足指定 predicate 函数的项来进行过滤。
// 只发出目标是 DIV 元素的点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var clicksOnDivs = clicks.filter(ev => ev.target.tagName === 'DIV');
clicksOnDivs.subscribe(x => console.log(x));

find

// 只发出源 Observable 所发出的值中第一个满足条件的值。

// 找到并发出第一个点击 DIV 元素的事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.find(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));

findIndex

// 只发出源 Observable 所发出的值中第一个满足条件的值的索引。
// 它很像 find , 但发出的是找到的值的索引, 而不是值本身。

// 找到并发出第一个点击 DIV 元素的事件的索引
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.findIndex(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));

first

// 只发出由源 Observable 所发出的值中第一个(或第一个满足条件的值)。

// 只发出第一次点击 DOM 的事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.first();
result.subscribe(x => console.log(x));
// 只发出第一次点击 DIV 元素的事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.first(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));

groupBy

// 根据指定条件将源 Observable 发出的值进行分组,并将这些分组作为 GroupedObservables 发出,每一个分组都是一个 GroupedObservable 。
// 通过 id 分组并返回数组
Observable.of<Obj>({id: 1, name: 'aze1'},
    {id: 2, name: 'sf2'},
    {id: 2, name: 'dg2'},
    {id: 1, name: 'erg1'},
    {id: 1, name: 'df1'},
    {id: 2, name: 'sfqfb2'},
    {id: 3, name: 'qfs3'},
    {id: 2, name: 'qsgqsfg2'}
)
    .groupBy(p => p.id)
    .flatMap((group$) => group$.reduce((acc, cur) => [...acc, cur], []))
    .subscribe(p => console.log(p));

// 显示:
// [ { id: 1, name: 'aze1' },
//   { id: 1, name: 'erg1' },
//   { id: 1, name: 'df1' } ]
//
// [ { id: 2, name: 'sf2' },
//   { id: 2, name: 'dg2' },
//   { id: 2, name: 'sfqfb2' },
//   { id: 2, name: 'qsgqsfg2' } ]
//
// [ { id: 3, name: 'qfs3' } ]


// 以 id 字段为主组装数据
Observable.of<Obj>({id: 1, name: 'aze1'},
    {id: 2, name: 'sf2'},
    {id: 2, name: 'dg2'},
    {id: 1, name: 'erg1'},
    {id: 1, name: 'df1'},
    {id: 2, name: 'sfqfb2'},
    {id: 3, name: 'qfs1'},
    {id: 2, name: 'qsgqsfg2'}
)
    .groupBy(p => p.id, p => p.name)
    .flatMap((group$) => group$.reduce((acc, cur) => [...acc, cur], ["" + group$.key]))
    .map(arr => ({'id': parseInt(arr[0]), 'values': arr.slice(1)}))
    .subscribe(p => console.log(p));

// 显示:
// { id: 1, values: [ 'aze1', 'erg1', 'df1' ] }
// { id: 2, values: [ 'sf2', 'dg2', 'sfqfb2', 'qsgqsfg2' ] }
// { id: 3, values: [ 'qfs1' ] }

isEmpty

如果源 Observable 是空的话,它返回一个发出 true 的 Observable,否则发出 false 。

map

将给定的 project 函数应用于源 Observable 发出的每个值,并将结果值作为 Observable 发出。
// 将每次点击映射为这次点击的 clientX
var clicks = Rx.Observable.fromEvent(document, 'click');
var positions = clicks.map(ev => ev.clientX);
positions.subscribe(x => console.log(x));

mapTo

每次源 Observble 发出值时,都在输出 Observable 上发出给定的常量值。
// 把每次点击映射成字符串 'Hi'
var clicks = Rx.Observable.fromEvent(document, 'click');
var greetings = clicks.mapTo('Hi');
greetings.subscribe(x => console.log(x));

max

max 操作符操作的 Observable 发出数字(或可以与提供的函数进行比较的项)并且当源 Observable 完成时它发出单一项:最大值的项。
// 获取一连串数字中的最大值
Rx.Observable.of(5, 4, 7, 2, 8)
    .max()
    .subscribe(x => console.log(x)); // -> 8

// 使用比较函数来获取最大值的项
interface Person {
    age: number,
    name: string
}

Observable.of<Person>({age: 7, name: 'Foo'},
    {age: 5, name: 'Bar'},
    {age: 9, name: 'Beer'})
    .max<Person>((a: Person, b: Person) => a.age < b.age ? -1 : 1)
    .subscribe((x: Person) => console.log(x.name)); // -> 'Beer'

merge

创建一个输出 Observable ,它可以同时发出每个给定的输入 Observable 中的所有值。
通过把多个 Observables 的值混合到一个 Observable 中 来将其打平。
// 合并两个 Observables: 时间间隔为1秒的 timer 和 clicks
var clicks = Rx.Observable.fromEvent(document, 'click');
var timer = Rx.Observable.interval(1000);
var clicksOrTimer = clicks.merge(timer);
clicksOrTimer.subscribe(x => console.log(x));
// 合并三个 Observables ,但只能同时运行两个
var timer1 = Rx.Observable.interval(1000).take(10);
var timer2 = Rx.Observable.interval(2000).take(6);
var timer3 = Rx.Observable.interval(500).take(10);
var concurrent = 2; // 参数
var merged = timer1.merge(timer2, timer3, concurrent);
merged.subscribe(x => console.log(x));

min

min 操作符操作的 Observable 发出数字(或可以使用提供函数进行比较的项)并且当源 Observable 完成时它发出单一项:最小值的项。
// 获取一连串数字中的最小值
Rx.Observable.of(5, 4, 7, 2, 8)
    .min()
    .subscribe(x => console.log(x)); // -> 2
// 使用比较函数来获取最小值的项
interface Person {
    age: number,
    name: string
}

Observable.of<Person>({age: 7, name: 'Foo'},
    {age: 5, name: 'Bar'},
    {age: 9, name: 'Beer'})
    .min<Person>((a: Person, b: Person) => a.age < b.age ? -1 : 1)
    .subscribe((x: Person) => console.log(x.name)); // -> 'Bar'

subscribe

// 用 Observer 对象订阅
const sumObserver = {
    sum: 0,
    next(value) {
        console.log('Adding: ' + value);
        this.sum = this.sum + value;
    },
    error() { // We actually could just remote this method,
    },        // since we do not really care about errors right now.
    complete() {
        console.log('Sum equals: ' + this.sum);
    }
};

Rx.Observable.of(1, 2, 3) // 同步发出 1, 2, 3,然后完成。
    .subscribe(sumObserver);

// 日志:
// "Adding: 1"
// "Adding: 2"
// "Adding: 3"
// "Sum equals: 6"


// 用函数订阅
let sum = 0;
Rx.Observable.of(1, 2, 3)
    .subscribe(
        function (value) {
            console.log('Adding: ' + value);
            sum = sum + value;
        },
        undefined,
        function () {
            console.log('Sum equals: ' + sum);
        }
    );

// 日志:
// "Adding: 1"
// "Adding: 2"
// "Adding: 3"
// "Sum equals: 6"

take

只发出源 Observable 最初发出的的N个值 (N = count)。
// 获取时间间隔为1秒的 interval Observable 的最初的5秒
var interval = Rx.Observable.interval(1000);
var five = interval.take(5);
five.subscribe(x => console.log(x));

takeLast

只发出源 Observable 最后发出的的N个值 (N = count)。

takeUntil

发出源 Observable 发出的值,直到 notifier Observable 发出值。
// 每秒都发出值,直到第一次点击发生
var interval = Rx.Observable.interval(1000);
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = interval.takeUntil(clicks);
result.subscribe(x => console.log(x));

takeWhile

发出在源 Observable 中满足 predicate 函数的每个值,并且一旦出现不满足 predicate 的值就立即完成。

throttle

从源 Observable 中发出一个值,然后在由另一个 Observable 决定的期间内忽略 随后发出的源值,然后重复此过程。
//以每秒最多点击一次的频率发出点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.throttle(ev => Rx.Observable.interval(1000));
result.subscribe(x => console.log(x));

throttleTime

从源 Observable 中发出一个值,然后在 duration 毫秒内忽略随后发出的源值, 然后重复此过程。
// 以每秒最多点击一次的频率发出点击事件
var clicks = Rx.Observable.fromEvent(document, 'click');
var result = clicks.throttleTime(1000);
result.subscribe(x => console.log(x));

toPromise

// 将 Observable 序列转换为符合 ES2015 标准的 Promise 。

// 使用普通的 ES2015
let source = Rx.Observable
        .of(42)
        .toPromise();

source.then((value) => console.log('Value: %s', value));
// => Value: 42

// 被拒的 Promise
// 使用标准的 ES2015
let source = Rx.Observable
        .throw(new Error('woops'))
        .toPromise();

source
        .then((value) => console.log('Value: %s', value))
        .catch((err) => console.log('Error: %s', err));
// => Error: Error: woops

Scheduler (调度器)

什么是调度器? - 调度器控制着何时启动 subscription 和何时发送通知。它由三部分组成:

  • 调度器是一种数据结构。 它知道如何根据优先级或其他标准来存储任务和将任务进行排序。
  • 调度器是执行上下文。 它表示在何时何地执行任务(举例来说,立即的,或另一种回调函数机制(比如 setTimeout 或 process.nextTick),或动画帧)。
  • 调度器有一个(虚拟的)时钟。 调度器功能通过它的 getter 方法 now() 提供了“时间”的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。
调度器可以让你规定 Observable 在什么样的执行上下文中发送通知给它的观察者。

调度器 目的 null 不传递任何调度器的话,会以同步递归的方式发送通知。用于定时操作或尾递归操作。

Rx.Scheduler.queue 当前事件帧中的队列调度(蹦床调度器)。用于迭代操作。

Rx.Scheduler.asap 微任务的队列调度,它使用可用的最快速的传输机制,比如 Node.js 的 process.nextTick() 或 Web Worker 的 MessageChannel 或 setTimeout 或其他。用于异步转换。

Rx.Scheduler.async 使用 setInterval 的调度。用于基于时间的操作符。