Rxjs

響應(yīng)式編程簡(jiǎn)介

響應(yīng)式編程是一種面向數(shù)據(jù)流和變化傳播的編程方式,即可以在編程中很方便的表達(dá)靜態(tài)或動(dòng)態(tài)的數(shù)據(jù)流,相關(guān)的計(jì)算模型會(huì)自動(dòng)將變化的值通過(guò)數(shù)據(jù)流進(jìn)行傳播。
Rxjs就是一個(gè)透過(guò)Observable組合各種非同步行為的Library,它是一個(gè)使用可觀察數(shù)據(jù)流進(jìn)行異步編程的編程接口,結(jié)合了觀察者模式、迭代器模式和函數(shù)式編程。

Rxjs概念

Rxjs全稱Reactive Extension for JavaScript,JavaScript的響應(yīng)式擴(kuò)展,響應(yīng)式的思路就是把隨時(shí)間不斷變化的數(shù)據(jù)、狀態(tài)、事件等等轉(zhuǎn)成可被觀察的序列(Observable Sequence),然后訂閱序列中那些Observable對(duì)象的變化,一旦有變化,就會(huì)執(zhí)行事先安排好的各種轉(zhuǎn)換和操作。

核心:可觀察序列(Observable),訂閱(subscribe),操作符(operators),調(diào)度(schedulers)

適用場(chǎng)景

  • 同步和異步
  • 獲取與訂閱
  • 查詢數(shù)據(jù)的組合
  • 已有數(shù)據(jù)和未來(lái)數(shù)據(jù)的更新策略

Promise異步編程的缺點(diǎn):1.單一數(shù)據(jù)模式;2.對(duì)異步事件的控制力很弱

Observable (建立與訂閱)

observable在未被訂閱之前只是個(gè)實(shí)例對(duì)象,不會(huì)有任何輸出。在被訂閱(subscribe)時(shí)進(jìn)行數(shù)據(jù)操作,具有多種operators。operator是Observable的方法,可以對(duì)元素進(jìn)行封裝處理。一般是返回一個(gè)新的observable實(shí)例。

observer:用來(lái)訂閱observable的實(shí)例對(duì)象,內(nèi)含next,complete,error三個(gè)參數(shù),next一定要有,其他兩項(xiàng)可選。

subscription:observable訂閱返回的實(shí)例,可以用來(lái)退訂(unsubscribe),也可以與其他訂閱合并。

// 建立(發(fā)布)
let mouseMove = Observable.fromEvent(DOM,'mousemove');
// 訂閱
let subscription = mouseMove.subscribe(x => console.log(x));
// 移除
subscription.unsubscribe();

如果有一個(gè)observable內(nèi)部還是一個(gè)observable,可以用mergeAll()將其展開,類似吧二維數(shù)組轉(zhuǎn)成一位數(shù)組,一般會(huì)和map一起使用,所以通常簡(jiǎn)寫為mergeMap,但在序列元素處理過(guò)程中我只想去最新處理的元素可以使用switchMap(),如果是需要保留正在處理的,讓新的即將處理暫時(shí)等待舊的處理完再執(zhí)行可以用exhaustMap()

Observable
  .fromEvent(DOM,'click')
  .switchMap(()=>
    ajax('url')
  )

對(duì)于請(qǐng)求的處理,可以設(shè)置在多少次請(qǐng)求失敗之后再拋出異常。

const defaultData = {success:false,data:[]};
const getPostObservable = () => 
    Observable.ajax('url')
      .retry(3) // 若三次都請(qǐng)求失敗再進(jìn)入到catch
      .catch(() => Observable.of(defaultData));
      // 簡(jiǎn)化catch
      .catch(() => [defaultData]);
  • Rxjs使得我們能夠完全依賴于聲明語(yǔ)句來(lái)獲取動(dòng)態(tài)的值。
// js常規(guī)代碼
let a = 3;
let b = a * 10;

a = 9
b = a * 10 
console.log(b)

// rxjs代碼
import { interval } from 'rxjs'; 
import { map } from 'rxjs/operators';

const a = interval(1000).pipe(
  map(_ => Math.random().toFixed(1)),
);

const b = a.pipe(
  map(item => Number(item) * 10)
)
// 實(shí)時(shí)監(jiān)聽a依賴的變化
b.subscribe(v => console.log(v))
  • Rxjs監(jiān)聽DOM事件

fromEvent將指定元素上的事件轉(zhuǎn)換成事件流

const btn = document.getElementById('btn');

// js常規(guī)代碼
let pre = new Date().getTime();

btn.addEventListener('click',function(event){
  const now = new Date().getTime();
  const silence = now - pre;
  if(silence > 1000){
    pre = now;
    console.log(event);
  }
})

// rxjs代碼
// fromEvent將制定元素上的事件轉(zhuǎn)換成事件流
import { fromEvent } from 'rxjs'; 
import { throttleTime } from 'rxjs/operators';

const obs = fromEvent(btn,'click').pipe(
  throttleTime(1000),
);
obs.subscribe(v => console.log(v))
  • Rxjs處理網(wǎng)絡(luò)請(qǐng)求
import { fromEvent,interval,from,merge } from 'rxjs'; 
import { mergeMap } from 'rxjs/operators';

const btn = document.getElementById('btn')
// DOM事件流---
const click = fromEvent(btn,'click');
// 程序邏輯流---
const inner = interval(3000);

const request = from(fetch('https://api.github.com/users').then(res => res.json()))
// 合并事件流
const response = merge(
  click,
  inner,
).pipe(
  mergeMap(_ => request)
)
response.subscribe(v => console.log(v))

操作符(Operators)

操作符是 Observable 類型上的方法,比如 .map(...)、.filter(...)、.merge(...),等等。其本質(zhì)上是純函數(shù),當(dāng)操作符被調(diào)用時(shí),它們不會(huì)改變已經(jīng)存在的 Observable 實(shí)例,是一個(gè)無(wú)副作用的操作。相反,它們返回一個(gè)新的 Observable ,它的 subscription 邏輯基于第一個(gè) Observable 。

操作符有著不同的用途,它們可作如下分類:創(chuàng)建、轉(zhuǎn)換、過(guò)濾、組合、錯(cuò)誤處理、工具,等等。這里介紹一些常用的操作符。

一、創(chuàng)造observabl類

  • create
// RxJS v6+
import { Observable } from 'rxjs'; 
/*
  創(chuàng)建在訂閱函數(shù)中發(fā)出 'Hello' 和 'World' 的 observable 。
*/
const hello = Observable.create(function(observer) {
  observer.next('Hello');
  observer.next('World');
});

// 輸出: 'Hello'...'World'
const subscribe = hello.subscribe(val => console.log(val));
  • of

of類似于一個(gè)迭代器,將參數(shù)迭代然后發(fā)出。它接收任意多個(gè)參數(shù),參數(shù)可以是任意類型,然后它會(huì)把這些參數(shù)逐個(gè)放入流中。

// RxJS v6+
import { of } from 'rxjs';
// 依次發(fā)出提供的任意數(shù)量的值
const source = of(1, 2, 3, 4, 5, { name: 'Brian' }, [1, 2, 3], function hello() {
  return 'Hello';
});
// 輸出: 1,2,3,4,5,{name: 'Brian}, [1,2,3], function hello() { return 'Hello' }
const subscribe = source.subscribe(val => console.log(val));
  • from

from的參數(shù)必須是一個(gè)類數(shù)組(set,iterator,promise等),其他和of一樣

// RxJS v6+
import { from } from 'rxjs';

// 將數(shù)組作為值的序列發(fā)出
const arraySource = from([1, 2, 3, 4, 5]);
// 輸出: 1,2,3,4,5
const subscribe = arraySource.subscribe(val => console.log(val));
  • fromEvent

將事件轉(zhuǎn)換成 observable 序列。

// RxJS v6+
import { fromEvent } from 'rxjs';
import { map } from 'rxjs/operators';

// 創(chuàng)建發(fā)出點(diǎn)擊事件的 observable
const source = fromEvent(document, 'click');
// 映射成給定的事件時(shí)間戳
const example = source.pipe(map(event => `Event time: ${event.timeStamp}`));
// 輸出 (示例中的數(shù)字以運(yùn)行時(shí)為準(zhǔn)): 'Event time: 7276.390000000001'
const subscribe = example.subscribe(val => console.log(val));
  • fromPromise

創(chuàng)建由 promise 轉(zhuǎn)換而來(lái)的 observable,并發(fā)出 promise 的結(jié)果。

import { of } from 'rxjs/observable/of';
import { fromPromise } from 'rxjs/observable/fromPromise';
import { mergeMap, catchError } from 'rxjs/operators';

// 基于輸入來(lái)決定是 resolve 還是 reject 的示例 promise
const myPromise = willReject => {
  return new Promise((resolve, reject) => {
    if (willReject) {
      reject('Rejected!');
    }
    resolve('Resolved!');
  });
};
// 先發(fā)出 true,然后是 false
const source = of(true, false);
const example = source.pipe(
  mergeMap(val =>
    fromPromise(myPromise(val)).pipe(
      // 捕獲并優(yōu)雅地處理 reject 的結(jié)果
      catchError(error => of(`Error: ${error}`))
    )
  )
);
// 輸出: 'Error: Rejected!', 'Resolved!'
const subscribe = example.subscribe(val => console.log(val));
  • timer

timer有兩個(gè)參數(shù),第一個(gè)參數(shù)表示到發(fā)送第一個(gè)值的間隔時(shí)間,第二個(gè)參數(shù)表示從發(fā)送第二個(gè)參數(shù)開始,沒(méi)發(fā)送一個(gè)值的間隔時(shí)間,如果第二個(gè)參數(shù)為空則發(fā)送第一個(gè)參數(shù)后,終止,執(zhí)行complete函數(shù)。

// RxJS v6+
import { timer } from 'rxjs';
/*
  timer 接收第二個(gè)參數(shù),它決定了發(fā)出序列值的頻率,在本例中我們?cè)?秒發(fā)出第一個(gè)值,
  然后每2秒發(fā)出序列值
*/
const source = timer(1000, 2000);
// 輸出: 0,1,2,3,4,5......
const subscribe = source.subscribe(val => console.log(val));
  • interval

基于給定時(shí)間間隔發(fā)出數(shù)字序列。相當(dāng)于 timer(1000, 1000),也就是說(shuō)初始等待時(shí)間和間隔時(shí)間是一樣的。

// RxJS v6+
import { interval } from 'rxjs';

// 每1秒發(fā)出數(shù)字序列中的值
const source = interval(1000);
// 數(shù)字: 0,1,2,3,4,5....
const subscribe = source.subscribe(val => console.log(val));

二、合并創(chuàng)建器

一般有DOM事件流和程序邏輯流,我們可以對(duì)這些流進(jìn)行不同形式的合并,創(chuàng)建一個(gè)新的流,常見的合并方式有三種:并聯(lián)、串聯(lián)、拉鏈。

  • merge - 并聯(lián)
// RxJS v6+
import { merge } from 'rxjs/operators';
import { interval, merge } from 'rxjs';

// 每2.5秒發(fā)出值
const first = interval(2500);
// 每1秒發(fā)出值
const second = interval(1000);
// 作為實(shí)例方法使用
const example = first.pipe(merge(second));
// 輸出: 0,1,0,2....
const subscribe = example.subscribe(val => console.log(val));

// 從一個(gè) observable 中發(fā)出輸出值
const example1 = merge(
  first.pipe(mapTo('FIRST!')),
  second.pipe(mapTo('SECOND!')),
);
// 輸出: "SECOND!", "FIRST!"
const subscribe1 = example1.subscribe(val => console.log(val));
  • concat - 串聯(lián)

例如;兩個(gè)流中的內(nèi)容被按照順序放進(jìn)了輸出流中,前面的流尚未結(jié)束時(shí),后面的流就會(huì)一直等待。

// RxJS v6+
import { of, concat } from 'rxjs';

// 發(fā)出 1,2,3
const sourceOne = of(1, 2, 3);
// 發(fā)出 4,5,6
const sourceTwo = of(4, 5, 6);

// 作為靜態(tài)方法使用
const example = concat(sourceOne, sourceTwo);
// 輸出: 1,2,3,4,5,6
const subscribe = example.subscribe(val => console.log(val));
  • zip - 拉鏈

在所有 observables 發(fā)出后,將它們的值作為數(shù)組發(fā)出,通常用于合并兩個(gè)數(shù)據(jù)有對(duì)應(yīng)關(guān)系的數(shù)據(jù)源。

// RxJS v6+
import { delay } from 'rxjs/operators';
import { of, zip } from 'rxjs';

const sourceOne = of('Hello');
const sourceTwo = of('World!');
// 一直等到所有 observables 都發(fā)出一個(gè)值,才將所有值作為數(shù)組發(fā)出
const example = zip(
  sourceOne,
  sourceTwo.pipe(delay(1000)),
);
// 輸出: ["Hello", "World!"]
const subscribe = example.subscribe(val => console.log(val));

三、錯(cuò)誤處理

  • retry - 失敗時(shí)重試

有些錯(cuò)誤是可以通過(guò)重試進(jìn)行恢復(fù)的,比如臨時(shí)性的網(wǎng)絡(luò)丟包。甚至一些流程的設(shè)計(jì)還會(huì)故意借助重試機(jī)制,比如當(dāng)你發(fā)起請(qǐng)求時(shí),如果后端發(fā)現(xiàn)你沒(méi)有登錄過(guò),就會(huì)給你一個(gè) 401 錯(cuò)誤,然后你可以完成登錄并重新開始整個(gè)流程。

retry 操作符就是負(fù)責(zé)在失敗時(shí)自動(dòng)發(fā)起重試的,它可以接受一個(gè)參數(shù),用來(lái)指定最大重試次數(shù)。

// RxJS v6+
import { interval, of, throwError } from 'rxjs';
import { mergeMap, retry } from 'rxjs/operators';

// 每1秒發(fā)出值
const source = interval(1000);
const example = source.pipe(
  mergeMap(val => {
    // 拋出錯(cuò)誤以進(jìn)行演示
    if (val > 5) {
      return throwError('Error!');
    }
    return of(val);
  }),
  // 出錯(cuò)的話可以重試2次
  retry(2)
);
/*
  輸出:
  0..1..2..3..4..5..
  0..1..2..3..4..5..
  0..1..2..3..4..5..
  "Error!: Retried 2 times then quit!"
*/
const subscribe = example.subscribe({
  next: val => console.log(val),
  error: val => console.log(`${val}: Retried 2 times then quit!`)
});

四、選擇器類

  • take

有的時(shí)候我門希望獲取Observable前幾個(gè)數(shù)然后結(jié)束(執(zhí)行complete方法)

var source = interval(1000);
var example = source.pipe(take(3));

example.subscribe({
    next: (value) => { console.log(value); },
    error: (err) => { console.log('Error: ' + err); },
    complete: () => { console.log('complete'); }
});
// 0
// 1
// 2
// complete

取得首次點(diǎn)擊的坐標(biāo)

<div id="locationDisplay">
  Where would you click first?
</div>

// RxJS v6+
import { fromEvent } from 'rxjs';
import { take, tap } from 'rxjs/operators';

const oneClickEvent = fromEvent(document, 'click').pipe(
  take(1),
  tap(v => {
    document.getElementById(
      'locationDisplay'
    ).innerHTML = `Your first click was on location ${v.screenX}:${v.screenY}`;
  })
);

const subscribe = oneClickEvent.subscribe();
  • takeUntil

發(fā)出值,直到提供的 observable 發(fā)出值,它便完成。

// RxJS v6+
import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

// 每1秒發(fā)出值
const source = interval(1000);
// 5秒后發(fā)出值
const timer$ = timer(5000);
// 當(dāng)5秒后 timer 發(fā)出值時(shí), source 則完成
const example = source.pipe(takeUntil(timer$));
// 輸出: 0,1,2,3
const subscribe = example.subscribe(val => console.log(val));
  • filter

過(guò)濾出符合給定條件的值。

// RxJS v6+
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';

// 發(fā)出 ({name: 'Joe', age: 31}, {name: 'Bob', age:25})
const source = from([{ name: 'Joe', age: 31 }, { name: 'Bob', age: 25 }]);
// 過(guò)濾掉年齡小于30歲的人
const example = source.pipe(filter(person => person.age >= 30));
// 輸出: "Over 30: Joe"
const subscribe = example.subscribe(val => console.log(`Over 30: ${val.name}`));
  • first

發(fā)出第一個(gè)值或第一個(gè)通過(guò)給定表達(dá)式的值,與之相對(duì)應(yīng)的還有l(wèi)ast。

// RxJS v6+
import { from } from 'rxjs';
import { first } from 'rxjs/operators';

const source = from([1, 2, 3, 4, 5]);
// 沒(méi)有參數(shù)則發(fā)出第一個(gè)值
const example = source.pipe(first());
// 發(fā)出通過(guò)測(cè)試的第一項(xiàng)
const example = source.pipe(first(num => num === 5));
// 輸出: "First to pass test: 5"
// 沒(méi)有值通過(guò)的話則發(fā)出默認(rèn)值
const example = source.pipe(first(val => val > 5, 'Nothing'));
// 輸出: 'Nothing'
const subscribe = example.subscribe(val =>
  console.log(`First to pass test: ${val}`)
);

  • skip

跳過(guò)N個(gè)(由參數(shù)提供)發(fā)出值。

// RxJS v6+
import { from } from 'rxjs';
import { skip, filter } from 'rxjs/operators';

const numArrayObs = from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

// 3,4,5...
const skipObs = numArrayObs.pipe(skip(2)).subscribe(console.log);

// 3,4,5...
const filterObs = numArrayObs
  .pipe(filter((val, index) => index > 1))
  .subscribe(console.log);
// 同樣的輸出!
  • takeWhile

發(fā)出值,直到提供的表達(dá)式結(jié)果為 false 。

takeWhile() 和 filter() 的區(qū)別

// RxJS v6+
import { of } from 'rxjs';
import { takeWhile, filter } from 'rxjs/operators';

// 發(fā)出 3, 3, 3, 9, 1, 4, 5, 8, 96, 3, 66, 3, 3, 3
const source = of(3, 3, 3, 9, 1, 4, 5, 8, 96, 3, 66, 3, 3, 3);

// 允許值通過(guò)直到源發(fā)出的值不等于3,然后完成
// 輸出: [3, 3, 3]
source
  .pipe(takeWhile(it => it === 3))
  .subscribe(val => console.log('takeWhile', val));

// 輸出: [3, 3, 3, 3, 3, 3, 3]
source
  .pipe(filter(it => it === 3))
  .subscribe(val => console.log('filter', val));

  • reduce

將源 observalbe 的值歸并為單個(gè)值,當(dāng)源 observable 完成時(shí)將這個(gè)值發(fā)出。

// RxJS v6+
import { of } from 'rxjs';
import { reduce } from 'rxjs/operators';

const source = of(1, 2, 3, 4);
const example = source.pipe(reduce((acc, val) => acc + val));
// 輸出: Sum: 10'
const subscribe = example.subscribe(val => console.log('Sum:', val));
  • scan

隨著時(shí)間的推移進(jìn)行歸并。

// RxJS v6+
import { Subject } from 'rxjs';
import { scan } from 'rxjs/operators';

const subject = new Subject();
// scan 示例,隨著時(shí)間的推移構(gòu)建對(duì)象
const example = subject.pipe(
  scan((acc, curr) => Object.assign({}, acc, curr), {})
);
// 輸出累加值
const subscribe = example.subscribe(val =>
  console.log('Accumulated object:', val)
);
// subject 發(fā)出的值會(huì)被添加成對(duì)象的屬性
// {name: 'Joe'}
subject.next({ name: 'Joe' });
// {name: 'Joe', age: 30}
subject.next({ age: 30 });
// {name: 'Joe', age: 30, favoriteLanguage: 'JavaScript'}
subject.next({ favoriteLanguage: 'JavaScript' });
  • concatMap 和 mergeMap

concatMap將值映射成內(nèi)部 observable,并按順序訂閱和發(fā)出。mergeMap映射成 observable 并發(fā)出值。

concatMap 和 mergeMap 之間的區(qū)別

// RxJS v6+
import { of } from 'rxjs';
import { concatMap, delay, mergeMap } from 'rxjs/operators';

// 發(fā)出延遲值
const source = of(2000, 1000);
// 將內(nèi)部 observable 映射成 source,當(dāng)前一個(gè)完成時(shí)發(fā)出結(jié)果并訂閱下一個(gè)
const example = source.pipe(
  concatMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
);
// 輸出: With concatMap: Delayed by: 2000ms, With concatMap: Delayed by: 1000ms
const subscribe = example.subscribe(val =>
  console.log(`With concatMap: ${val}`)
);

// 展示 concatMap 和 mergeMap 之間的區(qū)別
const mergeMapExample = source
  .pipe(
    // 只是為了確保 meregeMap 的日志晚于 concatMap 示例
    delay(5000),
    mergeMap(val => of(`Delayed by: ${val}ms`).pipe(delay(val)))
  )
  .subscribe(val => console.log(`With mergeMap: ${val}`));

使用 promise 進(jìn)行 mergeMap

// RxJS v6+
import { of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

// 發(fā)出 'Hello'
const source = of('Hello');
// mergeMap 還會(huì)發(fā)出 promise 的結(jié)果
const myPromise = val =>
  new Promise(resolve => resolve(`${val} World From Promise!`));
// 映射成 promise 并發(fā)出結(jié)果
const example = source.pipe(mergeMap(val => myPromise(val)));
// 輸出: 'Hello World From Promise'
const subscribe = example.subscribe(val => console.log(val));

  • switchMap

映射成 observable,完成前一個(gè)內(nèi)部 observable,發(fā)出值。switchMap 和其他打平操作符的主要區(qū)別是它具有取消效果。在每次發(fā)出時(shí),會(huì)取消前一個(gè)內(nèi)部 observable (你所提供函數(shù)的結(jié)果) 的訂閱,然后訂閱一個(gè)新的 observable 。你可以通過(guò)短語(yǔ)切換成一個(gè)新的 observable來(lái)記憶它。

每次點(diǎn)擊時(shí)重置

// RxJS v6+
import { interval, fromEvent } from 'rxjs';
import { switchMap, mapTo } from 'rxjs/operators';

// 發(fā)出每次點(diǎn)擊
const source = fromEvent(document, 'click');
// 如果3秒內(nèi)發(fā)生了另一次點(diǎn)擊,則消息不會(huì)被發(fā)出
const example = source.pipe(
  switchMap(val => interval(3000).pipe(mapTo('Hello, I made it!')))
);
// (點(diǎn)擊)...3s...'Hello I made it!'...(點(diǎn)擊)...2s(點(diǎn)擊)...
const subscribe = example.subscribe(val => console.log(val));

  • exhaustMap

映射成內(nèi)部 observable,忽略其他值直到該 observable 完成。

// RxJS v6+
import { interval, merge, of } from 'rxjs';
import { delay, take, exhaustMap } from 'rxjs/operators';

const sourceInterval = interval(1000);
const delayedInterval = sourceInterval.pipe(delay(10), take(4));

const exhaustSub = merge(
  // 延遲10毫秒,然后開始 interval 并發(fā)出4個(gè)值
  delayedInterval,
  // 立即發(fā)出
  of(true)
)
  .pipe(exhaustMap(_ => sourceInterval.pipe(take(5))))
  /*
   *  第一個(gè)發(fā)出的值 (of(true)) 會(huì)被映射成每秒發(fā)出值、 
   *  5秒后完成的 interval observable 。
   *  因?yàn)?delayedInterval 的發(fā)送是晚于前者的,雖然 observable 
   *  仍然是活動(dòng)的,但它們會(huì)被忽略。
   *
   *  與類似的操作符進(jìn)行下對(duì)比:
   *  concatMap 會(huì)進(jìn)行排隊(duì)
   *  switchMap 會(huì)在每次發(fā)送時(shí)切換成新的內(nèi)部 observable
   *  mergeMap 會(huì)為每個(gè)發(fā)出值維護(hù)新的 subscription
   */
  // 輸出: 0, 1, 2, 3, 4
  .subscribe(val => console.log(val));

五、工具類

  • delay

根據(jù)給定時(shí)間延遲發(fā)出值。

  • let

擁有完整的 observable 。

// 將數(shù)組作為序列發(fā)出
const source = Rx.Observable.from([1, 2, 3, 4, 5]);

// 傳入你自己的函數(shù)來(lái)將操作符添加到 observable 
const obsArrayPlusYourOperators = yourAppliedOperators => {
  return source.map(val => val + 1).let(yourAppliedOperators);
};
const addTenThenTwenty = obs => obs.map(val => val + 10).map(val => val + 20);
const subscribe = obsArrayPlusYourOperators(addTenThenTwenty)
    // 輸出: 32, 33, 34, 35, 36
    .subscribe(val => console.log('let FROM FUNCTION:', val));
  • toPromise

將 observable 轉(zhuǎn)換成 promise 。

// 返回基礎(chǔ)的 observable
const sample = val => Rx.Observable.of(val).delay(5000);
// 將基礎(chǔ)的 observable 轉(zhuǎn)換成 promise
const example = sample('First Example')
  .toPromise()
  // 輸出: 'First Example'
  .then(result => {
    console.log('From Promise:', result);
  });

總結(jié):

響應(yīng)式優(yōu)點(diǎn):使用聲明語(yǔ)句來(lái)獲取動(dòng)態(tài)的值,語(yǔ)義化,狀態(tài)隔離;

響應(yīng)式思維:Everything is observable; 無(wú)論是數(shù)組,DOM事件還是網(wǎng)絡(luò)請(qǐng)求,都可以被抽象成按照時(shí)間序列發(fā)生的事件。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容