本文是 "RxJava 沉思錄" 系列的第三篇分享。本系列所有分享:
在上一篇分享中,我們應該已經(jīng)對 Observable 在空間維度上重新組織事件的能力 印象深刻了,那么自然而然的,我們?nèi)菀茁?lián)想到時間維度,事實上就我個人而言,我認為 Observable 在時間維度上的重新組織事件的能力 相比較其空間維度的能力更為突出。與上一篇類似,本文接下來將通過列舉真實的例子來闡述這一論點。
點擊事件防抖動
這是一個比較常見的情景,用戶在手機比較卡頓的時候,點擊某個按鈕,正常應該啟動一個頁面,但是手機比較卡,沒有立即啟動,用戶就點了好幾下,結(jié)果等手機回過神來的時候,就會啟動好幾個一樣的頁面。
這個需求用 Callback 的方式比較難處理,但是相信用過 RxJava 的開發(fā)者都知道怎么處理:
RxView.clicks(btn)
.debounce(500, TimeUnit.MILLISECONDS)
.observerOn(AndroidSchedulers.mainThread())
.subscribe(o -> {
// handle clicks
})
debounce操作符產(chǎn)生一個新的Observable, 這個Observable只發(fā)射原Observable中時間間隔小于指定閾值的最大子序列的最后一個元素。 參考資料:Debounce
雖然這個例子比較簡單,但是它很好的表達了 Observable 可以在時間維度上對其發(fā)射的事件進行重新組織 , 從而做到之前 Callback 形式不容易做到的事情。
社交軟件上消息的點贊與取消點贊
點贊與取消點贊是社交軟件上經(jīng)常出現(xiàn)的需求,假設我們目前有下面這樣的點贊與取消點贊的代碼:
boolean like;
likeBtn.setOnClickListener(v -> {
if (like) {
// 取消點贊
sendCancelLikeRequest(postId);
} else {
// 點贊
sendLikeRequest(postId);
}
like = !like;
});
以下圖片素材資源來自 Dribbble

如果你碰巧實現(xiàn)了一個非??犰诺狞c贊動畫,用戶可能會玩得不亦樂乎,這個時候可能會對后端服務器造成一定的壓力,因為每次點贊與取消點贊都會發(fā)起網(wǎng)絡請求,假如很多用戶同時在玩這個點贊動畫,服務器可能會不堪重負。
和前一個例子的防抖動思路差不多,我們首先想到需要防抖動:
boolean like;
PublishSubject<Boolean> likeAction = PublishSubject.create();
likeBtn.setOnClickListener(v -> {
likeAction.onNext(like);
like = !like;
});
likeAction.debounce(1000, TimeUnit.MILLISECONDS)
.observerOn(AndroidSchedulers.mainThread())
.subscribe(like -> {
if (like) {
sendCancelLikeRequest(postId);
} else {
sendLikeRequest(postId);
}
});
寫到這個份上,其實已經(jīng)可以解決服務器壓力過大的問題了,但是還是有優(yōu)化空間,假設當前是已贊狀態(tài),用戶快速點擊 2 下,按照上面的代碼,還是會發(fā)送一次點贊的請求,由于當前是已贊狀態(tài),再發(fā)送一次點贊請求是沒有意義的,所以我們優(yōu)化的目標就是將這一類事件過濾掉:
Observable<Boolean> debounced = likeAction.debounce(1000, TimeUnit.MILLISECONDS);
debounced.zipWith(
debounced.startWith(like),
(last, current) -> last == current ? new Pair<>(false, false) : new Pair<>(true, current)
)
.flatMap(pair -> pair.first ? Observable.just(pair.second) : Observable.empty())
.subscribe(like -> {
if (like) {
sendCancelLikeRequest(postId);
} else {
sendLikeRequest(postId);
}
});
zipWith操作符可以把兩個Observable發(fā)射的相同序號(同為第 x 個)的元素,進行運算轉(zhuǎn)換,得到的新元素作為新的Observable對應序號所發(fā)射的元素。參考資料:ZipWith
上面的代碼,我們可以看到,首先我們對事件流做了一次 debounce 操作,得到 debounced 事件流,然后我們把 debounced 事件流和 debounced.startWith(like) 事件流做了一次 zipWith 操作。相當于新的這個 Observable 中發(fā)射的第 n 個元素(n >= 2)是由 debounced 事件流中的第 n 和 第 n-1 個元素運算得到的(新的這個 Observable 中發(fā)射的第 1 個元素是由 debounced 事件流中的第 1 個元素和原始點贊狀態(tài) like 運算而來)。
運算的結(jié)果是得到一個 Pair 對象,它是一個雙布爾類型二元組,二元組第一個元素為 true 代表這個事件不該被忽略,應該被觀察者觀察到;若為 false 則應該被忽略。二元組的第二個元素僅在第一個元素為 true 的情況下才有意義,true 表示應該發(fā)起一次點贊操作,而 false 表示應該發(fā)起一次取消點贊操作。上面提到的“運算”具體運算的規(guī)則是,比較兩個元素,若相等,則把二元組的第一個元素置為 false,若不相等,則把二元組的第一個元素置為 true, 同時把二元組的第二個元素置為 debounced 事件流發(fā)射的那個元素。
隨后的 flatMap 操作符完成了兩個邏輯,一是過濾掉二元組第一個元素為 false 的二元組,二是把二元組轉(zhuǎn)化回最初的 Boolean 事件流。其實這個邏輯也可由 filter 和 map 兩個操作符配合完成,這里為了簡單用了一個操作符。
雖然上面用了不少篇幅解釋了每個操作符的意義,但其實核心思想是簡單的,就是在原先 debounce 操作符的基礎上,把得到的事件流里每個元素和它的上一個元素做比較,如果這個元素和上個元素相同(例如在已贊狀態(tài)下再次發(fā)起點贊操作), 就把這個元素過濾掉,這樣最終的觀察者里只會在在真正需要改變點贊狀態(tài)的時候才會發(fā)起網(wǎng)絡請求了。
我們考慮用 Callback 實現(xiàn)相同邏輯,雖然比較本次操作與上次操作這樣的邏輯通過 Callback 也可以做到,但是 debounce 這個操作符完成的任務,如果要使用 Callback 來實現(xiàn)就非常復雜了,我們需要定義一個計時器,還要負責啟動與關(guān)閉這個計時器,我們的 Callback 內(nèi)部會摻雜進很多和觀察者本身無關(guān)的邏輯,相比 RxJava 版本的純粹相去甚遠。
檢測雙擊事件
首先,我們需要定義雙擊事件,不妨先規(guī)定兩次點擊小于 500 毫秒則為一次雙擊事件。我們先使用 Callback 的方式實現(xiàn):
long lastClickTimeStamp;
btn.setOnClickListener(v -> {
if (System.currentTimeMillis() - lastClickTimeStamp < 500) {
// handle double click
}
});
上面的代碼很容易理解,我們引入一個中間變量 lastClickTimeStamp, 通過比較點擊事件發(fā)生時和上一次點擊事件的時間差是否小于 500 毫秒,來確認是否發(fā)生了一次雙擊事件。那么如何通過 RxJava 來實現(xiàn)呢?就和上一個例子一樣,我們可以在時間維度對 Observable 發(fā)射的事件進行重新組織,只過濾出與上次點擊事件間隔小于 500 毫秒的點擊事件,代碼如下:
Observable<Long> clicks = RxView.clicks(btn)
.map(o -> System.currentTimeMillis())
.share();
clicks.zipWith(clicks.skip(1), (t1, t2) -> t2 - t1)
.filter(interval -> interval < 500)
.subscribe(o -> {
// handle double click
});
我們再一次用到了 zipWith 操作符來對事件流自身相鄰的兩個元素做比較,另外這次代碼中使用了 share 操作符,用來保證點擊事件的 Observable 被轉(zhuǎn)為 Hot Observable。
在
RxJava中,Observable可以被分為Hot Observable與Cold Observable,引用《Learning Reactive Programming with Java 8》中一個形象的比喻(翻譯后的意思):我們可以這樣認為,Cold Observable在每次被訂閱的時候為每一個Subscriber單獨發(fā)送可供使用的所有元素,而Hot Observable始終處于運行狀態(tài)當中,在它運行的過程中,向它的訂閱者發(fā)射元素(發(fā)送廣播、事件),我們可以把Hot Observable比喻成一個電臺,聽眾從某個時刻收聽這個電臺開始就可以聽到此時播放的節(jié)目以及之后的節(jié)目,但是無法聽到電臺此前播放的節(jié)目,而Cold Observable就像音樂 CD ,人們購買 CD 的時間可能前后有差距,但是收聽 CD 時都是從第一個曲目開始播放的。也就是說同一張 CD ,每個人收聽到的內(nèi)容都是一樣的, 無論收聽時間早或晚。
僅僅是上面這個雙擊檢測的例子,還不能體現(xiàn) RxJava 的優(yōu)越性,我們把需求改得更復雜一點:如果用戶在“短時間”內(nèi)連續(xù)多次點擊,只能算一次雙擊操作。這個需求是合理的,因為如果按照上面 Callback 的寫法,雖然可以檢測出雙擊操作,但是如果用戶快速點擊 n 次(間隔均小于 500 毫秒,n >= 2), 就會觸發(fā) n - 1 次雙擊事件,假設雙擊處理函數(shù)里需要發(fā)起網(wǎng)絡請求,會對服務器造成壓力。要實現(xiàn)這個需求其實也簡單,和上一個例子類似,我們用到了 debounce 操作符:
Observable<Object> clicks = RxView.clicks(btn).share()
clicks.buffer(clicks.debounce(500, TimeUnit.MILLISECONDS))
.filter(events -> events.size >= 2)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(o -> {
// handle double click
});
buffer操作符接受一個Observable為參數(shù),這個Observable所發(fā)射的元素是什么不重要,重要的是這些元素發(fā)射的時間點,這些時間點會在時間維度上把原來那個Observable所發(fā)射的元素劃分為一系列元素的組,buffer操作符返回的新的Observable發(fā)射的元素即為那些“組”。
參考資料: Buffer
上面的代碼通過 buffer 和 debounce 兩個操作符很巧妙的把點擊事件流轉(zhuǎn)化為了我們關(guān)心的 “短時間內(nèi)點擊次數(shù)超過 2 次” 的事件流,而且新的事件流中任意兩個相鄰事件間隔必定大于 500 毫秒。
在這個例子中,如果我們想要使用 Callback 去實現(xiàn)相似邏輯,代碼量肯定是巨大的,而且魯棒性也無法保證。
搜索提示
我們平時使用的搜索框中,常常是當用戶輸入一部分內(nèi)容后,下方就會顯示對應的搜索提示,以支付寶為例,當在搜索框輸入“螞蟻”關(guān)鍵詞后,下方自動刷新和關(guān)鍵詞相關(guān)的結(jié)果:

為了簡化這個例子,我們不妨定義根據(jù)關(guān)鍵詞搜索的接口如下:
public interface Api {
@GET("path/to/api")
Observable<List<String>> queryKeyword(String keyword);
}
查詢接口現(xiàn)在已經(jīng)確定下來,我們考慮一下在實現(xiàn)這個需求的過程中需要考慮哪些因素:
- 防止用戶輸入過快,觸發(fā)過多網(wǎng)絡請求,需要對輸入事件做一下防抖動。
- 用戶在輸入關(guān)鍵詞過程中可能觸發(fā)多次請求,那么,如果后一次請求的結(jié)果先返回,前一次請求的結(jié)果后返回,這種情況應該保證界面展示的是后一次請求的結(jié)果。
- 用戶在輸入關(guān)鍵詞過程中可能觸發(fā)多次請求,那么,如果后一次請求的結(jié)果返回時,前一次請求的結(jié)果尚未返回的情況下,就應該取消前一次請求。
綜合考慮上面的因素以后,我們使用 RxJava 實現(xiàn)的對應的代碼如下:
RxTextView.textChanges(input)
.debounce(300, TimeUnit.MILLISECONDS)
.switchMap(text -> api.queryKeyword(text.toString()))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(results -> {
// handle results
});
switchMap這個操作符與flatMap操作符類似,但是區(qū)別是如果原Observable中的兩個元素,通過switchMap操作符都轉(zhuǎn)為Observable之后,如果后一個元素對應的Observable發(fā)射元素時,前一個元素對應的Observable尚未發(fā)射完所有元素,那么前一個元素對應的Observable會被自動取消訂閱,尚未發(fā)射完的元素也不會體現(xiàn)在switchMap操作符調(diào)用后產(chǎn)生的新的Observable發(fā)射的元素中。
參考資料:SwitchMap
我們分析上面的代碼,可以發(fā)現(xiàn): debounce 操作符解決了問題 1,switchMap 操作符解決了問題 2、3。這個例子可以很好的說明,RxJava 的 Observable 可以通過一系列操作符從時間的維度上重新組織事件,從而簡化觀察者的邏輯。這個例子如果使用 Callback 來實現(xiàn),肯定是十分復雜的,需要設置計時器以及一堆中間變量,觀察者中也會摻雜進很多額外的邏輯,用來保證事件與事件的依賴關(guān)系。
(未完待續(xù))
本文屬于 "RxJava 沉思錄" 系列,歡迎閱讀本系列的其他分享: