經(jīng)典例子:小試牛刀 (輕而易舉搞定異步處理)
場景:input 輸入框?qū)崟r搜索,根據(jù)輸入的關(guān)鍵字,實時發(fā)送異步請求進行搜索;
過程拆解:
- 監(jiān)聽 input 元素的 keyup 事件;(監(jiān)聽 keyup 是有意為之)
- 觸發(fā) keyup 事件,送出 value 值;
- 根據(jù) value 值進行異步請求搜索;
- 將返回的數(shù)據(jù)渲染到 DOM;
面臨的問題:
- 不停的打字會連續(xù)觸發(fā)異步請求,占用服務(wù)器端資源;
- 相鄰的 keyup 事件,有可能發(fā)出一樣的 value 值(先輸入了
abc,請求出去了,又輸入了abcde,然后迅速的刪除了de,于是又發(fā)出了一個abc的異步請求;又或者按下了鍵盤上不會改變 value 的按鍵); - 發(fā)出多個異步請求以后,每個事件所消費的時間不一定相同。如果前一個異步的事件過程很慢,那么當它返回時,有可能把后面的異步事件率先返回的結(jié)果覆蓋;
基礎(chǔ)解決方案:
- 場景 1: debounce 搞得定;
- 場景 2: 有點難度,debounce 已經(jīng)搞不定了,因為 相同的 value 值是在 debounce 處理之后發(fā)出的;
- 場景 3: 很麻煩,由于不同的請求所消耗的時間并不一樣,所以無法保證異步返回的先后順序,而我們期待的僅僅只是最后一個異步請求返回的結(jié)果,其它的都不關(guān)心;
1,2,3 單獨來看,都是有解的,雖然麻煩一點,但如果 123 的問題要同時解決,非常麻煩。
RxJS 怎么做?
核心代碼示例:
// 初始化一個 input 事件流
const input$ = fromEvent($("input"), "keyup");
const resultEle = $('result');
input$.pipe(
// 拋出 e.target.value
map((e) => e.target.value.trim()),
// 過濾不合法的 value
filter(value => !!value),
// 去抖動
debounceTime(400),
// distinctUntilChanged: 對于任意相鄰的事件,如果它們的返回值一樣,則只要取前一個 (可類比 React 中的 shouldComponentDidUpdate 生命周期的作用)
distinctUntilChanged(),
// switchMap 其實是 map 和 switch 的組合功能,所以這里做了兩個事情:
// 1 map. 將輸入的 input dom 事件流裝換為 searchRepoInfo$ 事件流,從而發(fā)起異步搜索請求
// 2 switch. 如果前一個事件流(從輸入到異步請求是一次完整的流)沒有結(jié)束又再次輸入了新的值,則拋棄之前的(流),從而取消異步請求
switchMap(searchRepoInfo$),
// 處理響應(yīng)結(jié)果
map(response => {
if (response.status !== 200) {
throw "出錯誤了";
}
return response.response;
}),
// 渲染響應(yīng)結(jié)果
tap(repos => {
resultEle.innerHTML = repos.items.map(item => `<p>${item.full_name}</p>`).join('');
}),
// 異常捕獲
catchError(error => {
console.error(error);
return of("error");
}),
// 如果出錯了,可以無限重試
repeat()
).subscribe();
彈珠圖:
input: -(space)--(400ms)--r
-(space)--(400ms)--r-e
-(space)--(400ms)--r-e--a
-(space)--(400ms)--r-e--a--c
-(space)--(400ms)--r-e--a--c--t
-(space)--(400ms)--r-e--a--c--t--(400ms)--(->(鍵盤右鍵))---
debounceTime(400) 篩選出時間間隔不少于 400 ms 的事件
input$ -(space)--(400ms)--react--(400ms)--react(該值由于鍵盤右鍵觸發(fā))---
filter(text => !!text) 去除掉無效值
input$ ------react--react(該值由于鍵盤右鍵觸發(fā))---
distinctUntilChanged 去除重復(fù)的值,對于任意相鄰的事件,如果它們的返回值一樣,則只要取前一個
input$ ------react-----
switchMap(() => searchUserInfo$(react)) 發(fā)起異步請求
result response
如果回到 JQuery 的時代,要實現(xiàn)上面的功能,聽說要寫 900 多行代碼。而使用 RxJS 來實現(xiàn),不費吹灰之力。
RxJS 是什么?
關(guān)于 RxJS 我們可能了解到很多不同的描述,官方的解釋是使用 Observables 的響應(yīng)式編程庫。它是 Reactive Extensions 思想在 JavaScript 中的實現(xiàn),而 Rx 最開始是微軟 .NET 的一個響應(yīng)式擴展,最后經(jīng)由 RxJava 發(fā)揚光大。
那什么是響應(yīng)式編程呢?目前有很多的解釋和爭論,甚至對于 Rx 到底是不是純粹的響應(yīng)式編程也是存在不同的聲音的。 外網(wǎng)有一篇被廣泛閱讀的文章 What is Reactive Programming(一個 gist 片段,坐擁 15K start) 中提到:Reactive programming is programming with asynchronous data streams.(響應(yīng)式編程就是使用異步數(shù)據(jù)流進行編程) 。
如果我們從時間的維度去理解 FRP,那么:
FRP is about 'datatypes that represent a value 'over time'
參看 stackoverflow-What is (functional) reactive programming?。
即 FRP 是隨 時間推移(over time) 而代表 值(value) 的 數(shù)據(jù)類型(datatypes),這里 時間 是一個非常關(guān)鍵的要素。
在 FRP 出現(xiàn)之前,幾乎沒有軟件思想考慮過時間這個維度,在舊有的程序編碼觀念中,變量(狀態(tài))隨著時間的推移,雖然會更據(jù)事件的觸發(fā)而發(fā)生變化,但只是平面上孤立,離散的點(舉個例子,拖拽的例子)。 而擁有了 FRP(RxJS) 后,程序設(shè)計中便擁有了對 時間 維度的掌控,業(yè)務(wù)邏輯不過是起始值隨著時間的流動而推演出來的流(Stream),我們可以站時間的尺度上去思考過去,現(xiàn)在,和未來。
《三體》中領(lǐng)悟了操縱高維時空能力的女巫能從遠處透過腦殼拿出敵人的大腦---別人驚訝她是如何做到的,而她只不過是看到了伸手直接拿了而已。
掌控時間
被祭獻的少女

游戲廳里八稚女的出招順序是: ↓ ↘ → ↘ ↓ ↙ ← + A 或 C,方便鍵盤模擬我們簡化成:↓ → ↓ ← + a (下 右 下 左 a):
核心代碼:
// window keyup 事件流
const keyup$ = fromEvent(window, "keyup");
// 必殺技事件流
const ultimateKeys$ = keyup$.pipe(
// 以 3s 的時間周期為尺度,對數(shù)據(jù)流進行緩存截取
bufferTime(3000),
// 必殺技的按鍵組合必須大于 五個 鍵位
filter(events => events.length >= 5),
// 將歷次的 event 對象的 key 映射出來
map(events => events.map(e => e.key)),
// 過濾掉不屬于可以觸發(fā)必殺技的按鍵集合
filter(isultimateKeyboradCollections),
// 發(fā)出必殺技
tap(console.log),
tap(() => alert('fire ultimate')),
).subscribe();
根據(jù) keyup 在一定 時間閾值 內(nèi) filter 的按鍵組合來產(chǎn)生判斷是否可以使用必殺技, 彈珠圖:
彈珠圖:
keyup: ---左---下---上---下-右--下-左--a-右
bufferTime(3000)
buffer: ------左下上------下右下左a右-----
filter(isUltimateKeyboradCollections)
filter: -----------------下右下左a右----
八稚女: ---F---F-----F-------T------F---

bufferTime 緩沖特定時間段內(nèi)從源 Observable 發(fā)出的可觀察值。
掌控空間
開篇的 小試牛刀 已經(jīng)展示了一個 Observable 在空間維度上進行轉(zhuǎn)換的例子。
# switchMap 是 map + switch 的快捷組合操作符
input$.pipe(switchMap(searchUserInfo$))
<==== 等價于 ====>
input$.pipe(
map(searchUserInfo$),
switch()
)
彈珠圖:
input: ---Jack--- # 一維 Observable
map: ---searchUserInfo$(Jack)--- # 二維 Observable
switch: ---res$(jack)--- # 一維 Observable
再來看一個拖拽的場景,鼠標在元素上方按下并且移動鼠標時,元素會跟隨鼠標移動,鼠標抬起時,拖拽結(jié)束:
const $ = id => document.getElementById(id);
const dragBox = $("drag-box");
// mouse down 事件流
const mouseDown$ = fromEvent(dragBox, "mousedown");
// mouse move 事件流
const mouseMove$ = fromEvent(window, "mousemove");
// mouse up 事件流
const mouseUp$ = fromEvent(window, "mouseup");
mouseDown$
.pipe(
switchMap(() => mouseMove$.pipe(takeUntil(mouseUp$))),
withLatestFrom(mouseDown$, (mouse: MouseEvent, down: MouseEvent) => {
return {
offetX: mouse.clientX - down.offsetX,
offetY: mouse.clientY - down.offsetY
};
}),
tap(offset => {
dragBox.style.left = offset.offetX + "px";
dragBox.style.top = offset.offetY + "px";
})
)
.subscribe();
代碼簡單且通俗易懂,并輕而易舉同時擁有了對空間和時間的掌控。
mouseDown$.switchMap(() => mouseMove$.takeUntil(mouseUp$))), mouseDown 觸發(fā)的時候(過去),轉(zhuǎn)換為 mouseMove 事件(現(xiàn)在),而 mouseMove 將在 mouseUp 觸發(fā)的時候結(jié)束(未來)
takeUntil 允許源 Observable 一直發(fā)出值,直到 Notify Observable 有值發(fā)出時完成。

withLatestFrom 是具有主從關(guān)系的操作符,只有在主 Observable 有值發(fā)出時,附屬的 Observable 才會跟著被執(zhí)行。

RxJS 背后的設(shè)計體系
事務(wù)與響應(yīng)式
我們從的時間維度認識到了 FRP 在面對復(fù)雜邏輯上的一些處理方式,我們再回過頭來,從事務(wù)的角度理解響應(yīng)式編程。
看看下面這段代碼:
// 使用 DOM API 構(gòu)建 UI
const data = 'hello world';
const div = document.createElement('div')
const p = document.createElement('p')
p.textContent = data
const UI = div.append(p)
上面通過 JavaScript 的 DOM API 來將數(shù)據(jù)渲染到 UI 中,但是其實我們想要不是這種命令式的賦值動作,而是能建立 數(shù)據(jù) 與 UI 之間的一種永恒的關(guān)聯(lián),即當數(shù)據(jù)發(fā)生改變時,UI 會自動更新,就像 React 一樣:
// React 構(gòu)建 UI 與數(shù)據(jù)的關(guān)聯(lián)
const [data] = useState('hello world')
const h = React.craeteElement
const UI = h('div', null, h('p', null, data))
React 背后的思想其實就是響應(yīng)式,而響應(yīng)式的核心思想是通過某種方式來構(gòu)建 事務(wù) 之間的關(guān)聯(lián)關(guān)系。
事務(wù)是一個寬泛的概念,它可以是一個變量,一個對象,一段代碼,或者就是一段業(yè)務(wù)邏輯(下文沒有特殊說明統(tǒng)一指業(yè)務(wù)邏輯)
UI = f(State) ,React 通過 h 渲染函數(shù)構(gòu)建了 UI 與 State 之間一種永恒的關(guān)聯(lián)。這里的事務(wù)就是 UI 和 State。
我們再來重新理解下 Reactive programming is programming with asynchronous data streams.(響應(yīng)式編程就是使用異步數(shù)據(jù)流進行編程) 這句話,對此一個更加具象的解釋其實是:響應(yīng)式編程是一種通過 異步 和 數(shù)據(jù)流 來 構(gòu)建事務(wù)關(guān)聯(lián) 的編程模型。事務(wù)的關(guān)聯(lián) 是響應(yīng)式編程的核心理念,數(shù)據(jù)流 和 異步 是實現(xiàn)這個核心理念的關(guān)鍵。
- 事務(wù)關(guān)聯(lián):可以理解就是業(yè)務(wù)邏輯之前的關(guān)聯(lián)。比如,UI 與數(shù)據(jù),組件初始化獲取數(shù)據(jù)加載,系統(tǒng)權(quán)限等。
- 數(shù)據(jù)流(Stream):數(shù)據(jù)流是相關(guān)事務(wù)之間溝通的橋梁,就像一條河,它可以被觀測,被過濾,被操作,或者與另外一條流合并為一條新的流。但是只有數(shù)據(jù)流,還不能完美實現(xiàn)事務(wù)之間關(guān)聯(lián)關(guān)系的構(gòu)建,我們還需要異步。
- 異步:異步會盡可能挖掘設(shè)備的運行效率,同時可以讓無關(guān)的事務(wù)之間相互獨立。
使用 RxJS 構(gòu)建事務(wù)之間的關(guān)聯(lián)
RxJS 提供了一系列的 API,來為我們實現(xiàn)事務(wù)之間的關(guān)聯(lián)關(guān)系:
- Observable: 被觀察對象,一個值或者事件的集合;
- Observer:觀察者,根據(jù) Observable 進行處理;
- Subject:可以是 Observable,也可以是 Observer,會對內(nèi)部的 Observers 進行多播(multicast);
- Operators: RxJS 的提供的一系列操作符,用于創(chuàng)建,組合,過濾 Observable;
import { Observable } from 'rxjs';
// 定義一個觀察者
const observer = (subscriber) => {
console.log('hello world');
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
}
// 構(gòu)建一個 Observable
const observable = new Observable(observer);
console.log('just before subscribe');
// 通過 Observable 的 subscribe 方法進行調(diào)用,并返回一個 subscription 用于資源釋放
const subscription = observable.subscribe({
next(x) { console.log('got value ' + x); },
error(err) { console.error('something wrong occurred: ' + err); },
complete() { console.log('done'); }
});
console.log('just after subscribe');
// 取消訂閱
subscription.unsubscribe();
我們創(chuàng)建了一個 Observable,通過 Observable 提供的 subscribe 方法對其進行訂閱(調(diào)用)。
在這里 Observable 是一個可以接收 Observer 對象作為參數(shù)的函數(shù),并返回一個函數(shù)用來取消訂閱。Observer 對象可以聲明 next、err、complete 方法來處理流的不同狀態(tài)。
Rx 是結(jié)合了觀察者模式、迭代器模式、函數(shù)式編程,以使用可觀察數(shù)據(jù)流進行異步編程,從上面的代碼可略窺一二,前三個我們先不多講,對于 Observable 中的異步 我們看下這段代碼的輸出結(jié)果:
just before subscribe
hello world
got value 1
got value 2
got value 3
just after subscribe
got value 4 // 異步
done
當調(diào)用 subscribe 后,Observable 可以同步地返回一個值,或異步地返回一個值,從這個角度我們可以看到,Observable 基于這樣的實現(xiàn),磨平了同步和異步的差異。
我們把數(shù)據(jù)流(Stream)比喻成一條河流,而這河里流動的就是 Observable。
觀察者模式和迭代器模式的融合將 Observable 推向了一個全新的 Push 體系。
Push Vs Pull
Push 和 Pull 是生產(chǎn)者與消費者通信的兩種模型:
| _ | 生產(chǎn)者(Producer) | 消費者(Consumer) |
|---|---|---|
| Pull | 被動:被請求的時候產(chǎn)生數(shù)據(jù) | 主動:決定何時請求數(shù)據(jù) |
| Push | 主動:按自己的節(jié)奏產(chǎn)生數(shù)據(jù) | 被動:對接收到的數(shù)據(jù)進行處理 |
Pull 模型中,消費者決定何時從生產(chǎn)者那里接收數(shù)據(jù),但是生產(chǎn)者本身并不知道自己什么時候會將數(shù)據(jù)發(fā)送給消費者。
- JavaScript 中函數(shù)是 Pull 模型,函數(shù)是生產(chǎn)者,執(zhí)行函數(shù)的代碼通過從調(diào)用中 拉 出 一個值 來消耗數(shù)據(jù)。
- 迭代器和生成器是另一種 Pull 模型,調(diào)用
iterator.next的是消費者,和函數(shù)的區(qū)別是可以 拉 取 多個值。
Push 模型中,生產(chǎn)者決定何時像消費者發(fā)送數(shù)據(jù),消費者并不知道自己何時會接收到數(shù)據(jù)。在 Push 模型中,消費者變主動為被動,消費者只管吃,反正來了食物我就吃,而不用再去操心食物是怎么來的。
- JS 的"事件系統(tǒng)"是一個 Push 模型:
window.addEventListener('click', () => {
// do something
})
- Promise 也是 JavaScript 中常見的 Push 模型,一個 Promise(生產(chǎn)者)發(fā)送一個 resolved,來執(zhí)行一個回調(diào)(consumer),Promise 決定何時才將數(shù)據(jù)推送給消費者。
function sleep() {
return new Promise((resolve) => {
setTimeout(() => resolve('Beef'), 1000)
})
}
sleep().then((food) => console.log(`Wake up to eat ${food} !!!`));
RxJS 中的 Observable 是一個全新的 Push 模型,被觀察者是一個可以產(chǎn)生多值的生產(chǎn)者,當產(chǎn)生新數(shù)據(jù)時,會主動推(異步或同步推送)給"觀察者"(消費者)。
| - | Single(單值-在時間序列上只能有一個值) | Multiple(多值-在時間序列上可以有多個值) |
|---|---|---|
| Pull | Function | Iterator |
| Push | Promise | Observable |
-
Function: 惰性求值,調(diào)用時會 同步 返回一個單一值。 -
Generator: 惰性求值,調(diào)用時會 同步 地返回零到(有可能的)無限多個值。 -
Promise:可能(或肯能不)返回單個值,整個過程不可取消,要么resolve要么reject并且只響應(yīng)一次。 -
Observable:惰性求值,并且隨著時間的推移可以 同步或者異步的發(fā)出零到無窮多個值,可取消,組合,過濾,扭轉(zhuǎn)。
RxJS 的異常處理-恢復(fù)/重試-優(yōu)雅的錯誤扭轉(zhuǎn)
傳統(tǒng)異常處理的方式和局限性
-
try/catch:無法處理異步場景
// 無法捕獲異常
try {
setTimeout(() => JSON.parse('invalid json'), 1000)
} catch (e) {
console.error(e);
}
callback 回調(diào)函數(shù): 可以解決
try/catch只能支持同步操作的問題,但是如果事務(wù)之間存在依賴關(guān)聯(lián),則會造成 "回調(diào)地獄";promise 的異常處理機制
比較 try/catch 和callback,promise 的異常處理機制確實進步了很多,但是依然有不足之處,不可取消,不支持重試,以及不強制要求異常捕獲。
- rxjs 異常處理-兼具以上方式的所有優(yōu)點
場景:實現(xiàn)一個登錄控件,用戶連續(xù) N 次輸入密碼提交以后發(fā)生了異常(網(wǎng)絡(luò)異?;蛘呙艽a不對),登錄流程需要暫停 P 秒以后再才能繼續(xù)重試。
核心代碼示例 :
const [handleLogin] = useEventCallback(
($event: Observable<LoginFormValues>) =>
$event.pipe(
debounceTime(500),
tap((payload) => {
setLoading(true);
setErrMsg('');
}),
switchMap((payload) =>
from(requset(payload.username, payload.password))
),
tap((res) => {
setLoading(false);
if (res.code !== 200) {
return setErrMsg(res.message);
}
message.success('登錄成功!')
}),
catchError((err) => {
setErrMsg(err.message)
return _throw(err);
}),
finalize(() => setLoading(false)),
retry(retryCount),
catchError((err) => {
setSubmitButtonDisabled(true);
return _throw(err);
}),
retryWhen((errors) =>
errors.pipe(
tap(() => {
message.warn({
type: 'warning',
content: `重試次數(shù)已經(jīng)超過${retryCount}次,${retryDelayTime}后秒再試`
});
}),
delayWhen(() => timer(1000 * retryDelayTime).pipe(tap(() => setSubmitButtonDisabled(false))))
)
)
),
{ code: -10001 }
);
小結(jié)
RxJS 中概念眾多,初次學習門檻和使用成本都很高,而其學習曲線說是斷崖式的,有圖為證:

RxJS 打開了一個新的編程世界的大門,我們可以站在更高階的時間維度去思考和組織代碼,我們不在糾結(jié)于同步與異步的區(qū)別,其對響應(yīng)編程的拓展,可以輕而易舉的實現(xiàn)實世界中事務(wù)之間的關(guān)聯(lián)構(gòu)建,我們從命令式編程的瑣碎與龐雜中解脫出來,其一切皆流的思想更是對以往編程思維的革新,面對紛繁復(fù)雜的業(yè)務(wù)邏輯,我們從未于此清晰,也從未如此迷惘。
本文初略介紹了 RxJS,所涉相關(guān)點不過其冰山一角,RxJS 的強大決于開發(fā)者的想象力。
參考文章/學習資料
推薦幾個 RxJS 學習資源和幾篇 RxJS 相關(guān)的精彩文章:
流動的數(shù)據(jù)——使用 RxJS 構(gòu)造復(fù)雜單頁應(yīng)用的數(shù)據(jù)邏輯
teambition-sdk 競品 teambiton 技術(shù)窺探,使用 RxJS 構(gòu)建一套前端數(shù)據(jù)層 sdk,不依賴任何展現(xiàn)框架,可以被任何展現(xiàn)框架使用,甚至可以在NodeJS中使用,對外提供了一整套 Reactive 的 API。
基于 RxJs 的前端數(shù)據(jù)層實踐 類似 teambition-sdk 思路,使用 RxJS 大致復(fù)雜業(yè)務(wù)場景下的前端數(shù)據(jù)層。
Cycle.js Cycle.js 是個不一樣的 Web 框架,基于函數(shù)式和響應(yīng)式,完全使用響應(yīng)式編程作為編程范式,Cycle 提出了一種新的 MVI 的分形(fractal) 架構(gòu)模式,定義了整個應(yīng)用的代碼組織方式和開發(fā)范式。