FluxJava 新增 RxJava2 的支援功能

FluxJava 最初的設(shè)計(jì)就是以 Add-on 的方式來(lái)提供對(duì)于 RxJava 的支持,所以這次增加 RxJava2 的部份也依照相同的模式,在 Project 中加上了 fluxjava-rx2 的 Module。新的 Module 功能上與 fluxjava-rx 大致上相同,只是原本以 RxJava 規(guī)格運(yùn)作的部份,改為使用 RxJava2。

由于 RxJava 與 RxJava2 不太有機(jī)會(huì)共存在同一個(gè) Module 里,所以 fluxjava-rx2 沿用了 fluxjava-rx 的 Package 名稱,在使用上這二個(gè) Add-on 必須要擇一引用。不過(guò)也帶來(lái)了一個(gè)額外的好處,如果想要由 fluxjava-rx 升級(jí)到 fluxjava-rx2 時(shí),只要修改成 RxJava2 的調(diào)用規(guī)格,不用再特別調(diào)整 Import 的內(nèi)容。

配合 RxJava2 的更新,按照慣例增加了一篇文章手把手教你如何一起用 FluxJava 與 RxJava2 做為使用上的參考。更多深入的文章請(qǐng)參閱簡(jiǎn)書(shū) http://m.itdecent.cn/u/fea63707e07fwznote.blogspot.com。

在 fluxjava-rx2 中與 fluxjava-rx 最大的差異,主要是因應(yīng) RxJava2 把原本的 Observable 分成了有背壓版本的 Flowable 與沒(méi)有背壓版本的 Observable。因此 RxBus 與 RxStore 中都分別再提供了 toFlowable 的 Method 來(lái)取得 Flowable,不過(guò) Observable 本來(lái)就可以再轉(zhuǎn)換為 Flowable,此處的功能只是為了增加便利性、簡(jiǎn)化源代碼之用。

同時(shí),利用這篇文章補(bǔ)充說(shuō)明一下一些使用上的技術(shù)細(xì)節(jié)。在 RxStore 中使用的是沒(méi)有背壓版本的 Observable 來(lái)接收外部傳來(lái)的信息,只是接收到之后就會(huì)被分派到不同的 Thread 上去處理后續(xù)的工作,所以在這個(gè)部份是不大有機(jī)會(huì)遇上 MissingBackpressureException 問(wèn)題的。但是,這并不代表背壓所造成的情況就不存在了,只是瓶頸移到了 ThreadPool 的承受能力或是執(zhí)行的環(huán)境可以產(chǎn)生 Thread 的數(shù)量上。

就算是使用有背壓功能的 Flowable,也不代表就可以高枕無(wú)憂了。說(shuō)穿了,背壓本身不是什么神奇的黑科技,原理上只是在上下游中間加了個(gè)水池,讓下游有喘息的空間。水池畢竟還是有一定的物理限制,沒(méi)有控制好,依舊會(huì)讓水池承載不下而出現(xiàn)錯(cuò)誤。

因此,不論使用哪一種方法,前端發(fā)送的數(shù)量仍然應(yīng)該要被謹(jǐn)慎地控制,避免海量的信息把接收端給淹沒(méi)了。像是把 RecyclerView 滾動(dòng)時(shí)產(chǎn)生位移的 Event 毫無(wú)選擇地往 RxStore 送。

在發(fā)送端就要篩選信息,除了要減少 MissingBackpressureException 可能會(huì)出現(xiàn)的機(jī)會(huì)外,還有一個(gè)目的是要節(jié)省執(zhí)行成本。當(dāng)信息數(shù)量大到無(wú)法處理時(shí),能做的只有挑選值得處理的部份來(lái)進(jìn)行。當(dāng)挑選的工作被移到發(fā)送后,RxJava 的傳送機(jī)制表面上看起來(lái)就是簡(jiǎn)單地轉(zhuǎn)了一手,但是如果去追蹤其源代碼,就可以發(fā)現(xiàn)其實(shí)底下做了不少的工作。而每一次傳送都要運(yùn)行這些內(nèi)容,可以想見(jiàn)在數(shù)量到達(dá)一定的程度之后,就會(huì)顯現(xiàn)出可觀的效能差距。

由于 Observable 在定義上所形成的限制使然,同一個(gè)發(fā)送源無(wú)法把的信息分配至不同的 Thread 上送出。RxStore 為了要讓每個(gè)資料處理要求可以獨(dú)立、同步地進(jìn)行,所以才會(huì)在接收到信息后,以不同的 Thread 進(jìn)行后續(xù)的工作。

在 RxStore 里提供了一個(gè) getExecutor Method,可以使用 ThreadPool 來(lái)做為背壓的替代方案,但是并沒(méi)有像背壓一樣有可以控制上游的功能。在實(shí)作 getExecutor 時(shí)要注意,不要直接在回傳時(shí) New 一個(gè) ThreadPool 的 Instance。因?yàn)?getExecutor 是在每一次收到信息后調(diào)用一次。以上的做法,也等同于每一次收到信息就拿到一個(gè) ThreadPool 的 Instance,每一個(gè) ThreadPool 都只會(huì)產(chǎn)生一個(gè) Thread,這就失去了使用 ThreadPool 的用意。

最容易出現(xiàn)以上問(wèn)題的情況是使用 Executors 來(lái)取得 ThreadPool,一般的情況下很容易忽略 Executors 其實(shí)是每次調(diào)用就產(chǎn)生一個(gè) Instance。所以當(dāng)以 return Executors.newFixedThreadPool(poolSize); 的方式回傳 getExecutor 時(shí),一開(kāi)始也許顯示不出問(wèn)題而被略過(guò),但是一但信息爆量后,就會(huì)因?yàn)楫a(chǎn)生 Thread 的數(shù)量到達(dá)上限而中止運(yùn)作。

以上,是這次補(bǔ)充的內(nèi)容,讓使用 FluxJava 的朋友做為參考。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容