Skip to content

Latest commit

 

History

History
355 lines (274 loc) · 10.4 KB

04、创建数据流.md

File metadata and controls

355 lines (274 loc) · 10.4 KB

创建数据流

同步数据流

create

函数签名: create(subscribe: function)
使用给定的订阅函数来创建 observable 。

示例 1: 发出多个值的 observable

import { Observable } from 'rxjs';

const { create } = Observable;

// 创建在订阅函数中发出 'Hello' 和 'World' 的 observable 。
const hello$: Observable<string> = create(observer => {
  observer.next('hello');
  observer.next('world');
});

hello$.subscribe(value => console.log(value));

示例 2: 基于定时器发出偶数的 observable

import { Observable } from 'rxjs';

const { create } = Observable;

const evenNumbers$: Observable<number> = create(observer => {
  let time = 0;
  const handleInterval = setInterval(() => {
    if (time % 2 === 0) {
      observer.next(time);
    }
    time++;
  }, 500);

  // 用于取消订阅
  return () => clearInterval(handleInterval);
});

const subscribe$ = evenNumbers$.subscribe(value => console.log(value));

setTimeout(() => {
  subscribe$.unsubscribe();
}, 5000);

of

列举数据
函数签名: function of<T>(...args: T[]): Observable<T>

demo1: 发出数字序列

/**
 * 发出数字序列
 */
import { Observable, of } from 'rxjs';
const source$: Observable<number> = of(1, 2, 3, 4, 5, 6, 7);
source$.subscribe(value => console.log(value));

demo2: 示例 2: 发出对象、数组和函数

import { of } from 'rxjs';

const source$ = of({name: 'yanle'}, [1,2,3,4,5,6], function(){console.log('hello rxjs')});

source$.subscribe(value => console.log(value));

range

指定范围
函数签名: function range(start: number = 0, count?: number, scheduler?: SchedulerLike): Observable<number>

/**
 * 依次发出给定区间内的数字。
 */
import { range } from 'rxjs';

const source$ = range(1, 10);
source$.subscribe(value => console.log(value));

empty

立即完成的observable

/**
 * 立即完成
 */
import { empty } from 'rxjs';

const source$ = empty();
source$.subscribe(
  () => console.log('next'),
  null,
  () => console.log('complete'),
);

throwError

throwError产⽣的Observable对象也是什么都不做,直接出错,抛出的错误就是throw的参数,下⾯是使⽤throw的⽰例代码

/**
 * 示例 1: 在订阅上抛出错误
 */
import { throwError } from 'rxjs';

const source$ = throwError('this is an error!');
source$.subscribe(
  value => console.log(value),
  err => console.log('error: ', err),
  () => console.log('complete!'),
);

异步数据流

定时产生数据interval和timer

interval 基于给定时间间隔发出数字序列

import { interval } from 'rxjs';

const source$ = interval(1000);
source$.subscribe(value=>console.log(value));

timer 给定持续时间后,再按照指定间隔时间依次发出数字。

import { timer } from 'rxjs';

// 如果只有一个参数, 那么发出 0 之后, 就立马结束了
const source$ = timer(1000);
source$.subscribe(value => console.log('source$: ', value));

// timer 接收第⼆个参数,它决定了发出序列值的频率,在本例中我们在1秒发出第⼀个值,
// 然后每0.5秒发出序列值
// 所以结果是 timer 1秒后发出值,然后每0.5秒发出值
const source2$ = timer(1000, 500);
source2$.subscribe(value => console.log('source2$: ', value));

from

将数组、promise 或迭代器转换成observable

import { from } from 'rxjs';

// 数组转为 observable
const source$ = from([1,2,3,4,5]);
source$.subscribe(value => console.log(value));

// promise 转为 observable
const source2$ = from(
  new Promise(resolve => resolve('hello world!'))
);
source2$.subscribe(value=>console.log(value));

// 集合转为observable
const map = new Map();
map.set(1, 'hi');
map.set(2, 'bye');

const source3$ = from(map);
source3$.subscribe(value => console.log(value));
// 输出: [1, Hi'], [2, Bye]

// 字符串转为 observable
const source4$ = from('hello world');
source4$.subscribe(value => console.log(value));
// 输出: 'H','e','l','l','o',' ','W','o','r','l','d'

fromPromise

如果from的参数是Promise对象,那么这个Promise成功结束,from产⽣的Observable对象就会吐出Promise成功的结果,并且⽴刻结束

实际上这个地方 formPromise 可以换做 from , 效果是一样的

import { of } from 'rxjs';
import { catchError, mergeMap } from 'rxjs/operators';
import { fromPromise } from 'rxjs/internal-compatibility';

const myPromise = willReject => new Promise((resolve, reject) => {
  if (willReject) {
    reject('rejected!');
  } else {
    resolve('resolved!');
  }
});

// 先发出true , 然后在发出 false
const source$ = of(false, true);
const example$ = source$.pipe(
  // 这个地方, 如果先发射出true, 那么 promise 就会走到rejected 里面
  // 这个时候实际上状态已经停止了
  mergeMap(value => fromPromise(myPromise(value))),
  catchError(
    error => of(`Error: ${error}`), // 如果出现了, 那么重新返回一个 observable
  ),
);
example$.subscribe(
  value => console.log('next: ', value),
  err => console.log('err: ', err),
  () => console.log('complete'),
);

fromEvent

如果从事⽹页开发,fromEvent是最可能会被⽤到的操作符,因为⽹页应⽤总是要获取⽤户在⽹页中的操作事件, ⽽fromEvent最常见的⽤法就是把DOM中的事件转化为Observable对象中的数据

fromEvent的第⼀个参数是⼀个事件源,在浏览器中,最常见的事件源就是特定的DOM元素, 第⼆个参数是事件的名称,对应DOM事件就是click、mousemove这样的字符串。

import { fromEvent } from 'rxjs';
import { map } from 'rxjs/operators';

const source$ = fromEvent(document, 'click');
const resource$ = source$.pipe(
  map(event => `Event time: ${event.timeStamp}`)
);

resource$.subscribe(value => console.log(value));

fromEvent除了可以从DOM中获得数据,还可以从Node.js的events中获得数据,下⾯是⼀段⽰例代码:

import * as EventEmitter from 'events';
import { fromEvent } from 'rxjs';

const emitter = new EventEmitter();
const source$ = fromEvent(emitter, 'message');
source$.subscribe(value => console.log(value));

emitter.emit('message', 1);
emitter.emit('message', 2);
emitter.emit('other', 'oops');
emitter.emit('message', 3);

fromEventPattern

fromEvent能够从事件源产⽣Observable,但是要求数据源表现得像是浏览器的DOM或者Node.js的EventEmitter。

fromEventPattern接受两个函数参数,分别对应产⽣的Observable对象被订阅和退订时的动作, 因为这两个参数是函数,具体的动作可以任意定义,所以可以⾮常灵活。

import { EventEmitter } from 'events';
import { fromEventPattern } from 'rxjs';

const emitter = new EventEmitter();

const addHandler = handler => emitter.addListener('msg', handler);
const removeHandler = handler => emitter.removeListener('msg', handler);

const source$ = fromEventPattern(addHandler, removeHandler);

const subscription$ = source$.subscribe(value => console.log(value));

emitter.emit('msg', 'hello');
emitter.emit('msg', 'world');

subscription$.unsubscribe();
emitter.emit('msg', 'end');

ajax - 略过

用一个简单的例子表现一下ajax的用法

// <div>
//    <button id="getStar">Get RxJS Star Count</button>
//    <div id="text"></div>
// </div>

// 希望用ajax 请求
import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';

fromEvent(document.querySelector('#getStar'), 'click')
  .subscribe(
    ()=> ajax.get('xxxxxx', {responseType: 'json'})
      .subscribe(value=>{
        const starCount = value.response;
        document.querySelector('#text').innerHTML = starCount;
      })
  );

但是一般别这么用, 因为实际项目中可能会有各种符合自己场景的ajax 封装, 拿到数据直接用 from 就可以创建 observable 了。

defer

Observable只是⼀个代理(Proxy),在创建之时并不会做分配资源的⼯作,只有当被订阅的时候,才会去创建真正占⽤资源的Observable, 之前产⽣的代理Observable会把所有⼯作都转交给真正占⽤资源的Observable。

这种推迟占⽤资源的⽅法是⼀个惯⽤的模式,在RxJS中,defer这个操作符实现的就是这种模式。

defer接受⼀个函数作为参数,当defer产⽣的Observable对象被订阅的时候,defer的函数参数就会被调⽤, 预期这个函数会返回另⼀个Observable对象,也就是defer转嫁所有⼯作的对象。因为Promise和Observable的关系, defer也很贴⼼地⽀持返回Promise对象的函数参数,当参数函数返回Promise对象的时候,省去了应⽤层开发者使⽤fromPromise转化⼀次的劳动。

// 最基本的使用
import { defer, of } from 'rxjs';
import { ajax } from 'rxjs/ajax';

const observableFactory$ = () => of('http://xxx');
const source$ = defer(observableFactory$);
source$.subscribe(value => console.log(value));


// 因为本身of产生数据流占用的内存资源是极少的, 看不出来效果
// ⽐如,我们希望通过AJAX来获取服务器端的数据,可是并不想在程 序启动阶段就把AJAX请求发送出去,
// 就可以利⽤defer产⽣⼀个Observable对象,
// 当这个Observable对象被订阅的时候才发送AJAX请求,
const observableFactory2$ = () => ajax.get('https://api.github.com/repos/ReactiveX/rxjs');
const source2$ = defer(observableFactory2$);
setTimeout(()=> {
  source2$.subscribe(
    value => console.log(value.response.stargazers_count),
    () => console.log('error'),
    () => console.log('complete'),
  );
}, 3000);