幾種主要的需求:
- 直接創(chuàng)建一個(gè)Observable(創(chuàng)建操作)
- 組合多個(gè)Observable(組合操作)
- 對(duì)Observable發(fā)射的數(shù)據(jù)執(zhí)行變換操作(變換操作)
- 從Observable發(fā)射的數(shù)據(jù)中取特定的值(過(guò)濾操作)
- 轉(zhuǎn)發(fā)Observable的部分值(條件/布爾/過(guò)濾操作)
- 對(duì)Observable發(fā)射的數(shù)據(jù)序列求值(算術(shù)/聚合操作)
創(chuàng)建操作符
| 名稱 | 解析 |
|---|---|
| just() | 將一個(gè)或多個(gè)對(duì)象轉(zhuǎn)換成發(fā)射這個(gè)或這些對(duì)象的一個(gè)Observable |
| fromArray() | 將一個(gè)Iterable, 一個(gè)Future, 或者一個(gè)數(shù)組轉(zhuǎn)換成一個(gè)Observable |
| repeat() | 創(chuàng)建一個(gè)重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的Observable |
| repeatWhen() | 創(chuàng)建一個(gè)重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的Observable,它依賴于另一個(gè)Observable發(fā)射的數(shù)據(jù) |
| create() | 使用一個(gè)函數(shù)從頭創(chuàng)建一個(gè)Observable |
| defer() | 只有當(dāng)訂閱者訂閱才創(chuàng)建Observable;為每個(gè)訂閱創(chuàng)建一個(gè)新的Observable |
| range() | 創(chuàng)建一個(gè)發(fā)射指定范圍的整數(shù)序列的Observable |
| interval() | 創(chuàng)建一個(gè)按照給定的時(shí)間間隔發(fā)射整數(shù)序列的Observable |
| timer() | 創(chuàng)建一個(gè)在給定的延時(shí)之后發(fā)射單個(gè)數(shù)據(jù)的Observable |
| empty() | 創(chuàng)建一個(gè)什么都不做直接通知完成的Observable |
| error() | 創(chuàng)建一個(gè)什么都不做直接通知錯(cuò)誤的Observable |
| never() | 創(chuàng)建一個(gè)入不發(fā)射任何數(shù)據(jù)的Observable |
變換操作符
| 操作符 | 解析 |
|---|---|
| buffer() | 緩存,可以簡(jiǎn)單的理解為緩存,它定期從Observable收集數(shù)據(jù)到一個(gè)集合,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個(gè) |
| map() | 對(duì)序列的每一項(xiàng)都應(yīng)用一個(gè)函數(shù)來(lái)變換Observable發(fā)射的數(shù)據(jù)序列 |
| flatMap() , concatMap() , flatMapIterable() | 將Observable發(fā)射的數(shù)據(jù)集合變換為Observables集合,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進(jìn)一個(gè)單獨(dú)的Observable |
| switchMap() | 將Observable發(fā)射的數(shù)據(jù)集合變換為Observables集合,然后只發(fā)射這些Observables最近發(fā)射的數(shù)據(jù) |
| scan() | 對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后按順序依次發(fā)射每一個(gè)值 |
| groupBy() | 將Observable分拆為Observable集合,將原始Observable發(fā)射的數(shù)據(jù)按Key分組,每一個(gè)Observable發(fā)射一組不同的數(shù)據(jù) |
| buffer() | 它定期從Observable收集數(shù)據(jù)到一個(gè)集合,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個(gè) |
| window() | 定期將來(lái)自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口,然后發(fā)射這些窗口,而不是每次發(fā)射一項(xiàng) |
| cast() | 在發(fā)射之前強(qiáng)制將Observable發(fā)射的所有數(shù)據(jù)轉(zhuǎn)換為指定類型 |
過(guò)濾操作符列表
| 方法 | 含義 |
|---|---|
| filter() | 過(guò)濾數(shù)據(jù) |
| takeLast() | 只發(fā)射最后的N項(xiàng)數(shù)據(jù) |
| last() | 只發(fā)射最后的一項(xiàng)數(shù)據(jù) |
| lastOrDefault() | 只發(fā)射最后的一項(xiàng)數(shù)據(jù),如果Observable為空就發(fā)射默認(rèn)值 |
| takeLastBuffer() | 將最后的N項(xiàng)數(shù)據(jù)當(dāng)做單個(gè)數(shù)據(jù)發(fā)射 |
| skip() | 跳過(guò)開始的N項(xiàng)數(shù)據(jù) |
| skipLast() | 跳過(guò)最后的N項(xiàng)數(shù)據(jù) |
| take() | 只發(fā)射開始的N項(xiàng)數(shù)據(jù) |
| first() , takeFirst() | 只發(fā)射第一項(xiàng)數(shù)據(jù),或者滿足某種條件的第一項(xiàng)數(shù)據(jù) |
| firstOrDefault() | 只發(fā)射第一項(xiàng)數(shù)據(jù),如果Observable為空就發(fā)射默認(rèn)值 |
| elementAt() | 發(fā)射第N項(xiàng)數(shù)據(jù) |
| elementAtOrDefault() | 發(fā)射第N項(xiàng)數(shù)據(jù),如果Observable數(shù)據(jù)少于N項(xiàng)就發(fā)射默認(rèn)值 |
| sample() , throttleLast() | 定期發(fā)射Observable最近的數(shù)據(jù) |
| throttleFirst() | 定期發(fā)射Observable發(fā)射的第一項(xiàng)數(shù)據(jù) |
| throttleWithTimeout() , debounce() | 只有在空閑了一段時(shí)間后才發(fā)射數(shù)據(jù),通俗的說(shuō),就是如果一段時(shí)間沒有操作,就執(zhí)行一次操作 |
| timeout() | 如果在一個(gè)指定的時(shí)間段后還沒發(fā)射數(shù)據(jù),就發(fā)射一個(gè)異常 |
| distinct() | 過(guò)濾掉重復(fù)數(shù)據(jù) |
| distinctUntilChanged() | 過(guò)濾掉連續(xù)重復(fù)的數(shù)據(jù) |
| ofType() | 只發(fā)射指定類型的數(shù)據(jù) |
| ignoreElements() | 丟棄所有的正常數(shù)據(jù),只發(fā)射錯(cuò)誤或完成通知 |
結(jié)合操作符
| 操作符 | 解析 |
|---|---|
| and() , then() , when() | 通過(guò)模式(And條件)和計(jì)劃(Then次序)組合兩個(gè)或多個(gè)Observable發(fā)射的數(shù)據(jù)集 |
| combineLatest() | 當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)射了一個(gè)數(shù)據(jù)時(shí),通過(guò)一個(gè)指定的函數(shù)組合每個(gè)Observable發(fā)射的最新數(shù)據(jù)(一共兩個(gè)數(shù)據(jù)),然后發(fā)射這個(gè)函數(shù)的結(jié)果 |
| join() , groupJoin() | 無(wú)論何時(shí),如果一個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)項(xiàng),只要在另一個(gè)Observable發(fā)射的數(shù)據(jù)項(xiàng)定義的時(shí)間窗口內(nèi),就將兩個(gè)Observable發(fā)射的數(shù)據(jù)合并發(fā)射 |
| merge() | 將多個(gè)Observable合并為一個(gè) |
| startWith() | 在數(shù)據(jù)序列的開頭增加一項(xiàng)數(shù)據(jù),在發(fā)射原來(lái)的Observable的數(shù)據(jù)序列之前,先發(fā)射一個(gè)指定的數(shù)據(jù)序列或數(shù)據(jù)項(xiàng) |
| switch() | 將一個(gè)發(fā)射Observable序列的Observable轉(zhuǎn)換為這樣一個(gè)Observable:它逐個(gè)發(fā)射那些Observable最近發(fā)射的數(shù)據(jù) |
| switchOnNext() | 將一個(gè)發(fā)射Observables的Observable轉(zhuǎn)換成另一個(gè)Observable,后者發(fā)射這些Observables最近發(fā)射的數(shù)據(jù) |
| zip() | 打包,使用一個(gè)指定的函數(shù)將多個(gè)Observable發(fā)射的數(shù)據(jù)組合在一起,然后將這個(gè)函數(shù)的結(jié)果作為單項(xiàng)數(shù)據(jù)發(fā)射 |
| mergeDelayError() | 合并多個(gè)Observables,讓沒有錯(cuò)誤的Observable都完成后再發(fā)射錯(cuò)誤通知 |
錯(cuò)誤處理操作符
用于對(duì)Observable發(fā)射的 onError 通知做出響應(yīng)或者從錯(cuò)誤中恢復(fù),例如,你
可以:
- 吞掉這個(gè)錯(cuò)誤,切換到一個(gè)備用的Observable繼續(xù)發(fā)射數(shù)據(jù)
- 吞掉這個(gè)錯(cuò)誤然后發(fā)射默認(rèn)值
- 吞掉這個(gè)錯(cuò)誤并立即嘗試重啟這個(gè)Observable
- 吞掉這個(gè)錯(cuò)誤,在一些回退間隔后重啟這個(gè)Observable
| 名稱 | 解析 |
|---|---|
| onErrorResumeNext() | 指示Observable在遇到錯(cuò)誤時(shí)發(fā)射一個(gè)數(shù)據(jù)序列 |
| onErrorReturn() | 讓Observable遇到錯(cuò)誤時(shí)發(fā)射一個(gè)特殊的項(xiàng)并且正常終止。方法返回一個(gè)鏡像原有Observable行為的新Observable,后者會(huì)忽略前者的onError調(diào)用,不會(huì)將錯(cuò)誤傳遞給觀察者,作為替代,它會(huì)發(fā)發(fā)射一個(gè)特殊的項(xiàng)并調(diào)用觀察者的onCompleted方法。 |
| onExceptionResumeNext() | 指示Observable遇到錯(cuò)誤時(shí)繼續(xù)發(fā)射數(shù)據(jù) |
| retry() | 指示Observable遇到錯(cuò)誤時(shí)重試 |
| retryWhen() | 指示Observable遇到錯(cuò)誤時(shí),將錯(cuò)誤傳遞給另一個(gè)Observable來(lái)決定是否要重新給訂閱這個(gè)Observable |
| retryUntil() | 指示Observable遇到錯(cuò)誤時(shí),是否讓Observable重新訂閱 |
輔助操作符
用于處理Observable的操作符,例如延遲、定時(shí)等。
| 名稱 | 解析 |
|---|---|
| materialize() | 將Observable轉(zhuǎn)換成一個(gè)通知列表 |
| dematerialize() | 將上面的結(jié)果逆轉(zhuǎn)回一個(gè)Observable |
| timestamp() | 給Observable發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)添加一個(gè)時(shí)間戳 |
| serialize() | 強(qiáng)制Observable按次序發(fā)射數(shù)據(jù)并且要求功能是完好的 |
| cache() | 記住Observable發(fā)射的數(shù)據(jù)序列并發(fā)射相同的數(shù)據(jù)序列給后續(xù)的訂閱者 |
| observeOn() | 指定觀察者觀察Observable的調(diào)度器 |
| subscribeOn() | 指定Observable執(zhí)行任務(wù)的調(diào)度器 |
| doOnEach() | 注冊(cè)一個(gè)動(dòng)作,對(duì)Observable發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)使用 |
| doOnCompleted() | 注冊(cè)一個(gè)動(dòng)作,對(duì)正常完成的Observable使用 |
| doOnError() | 注冊(cè)一個(gè)動(dòng)作,對(duì)發(fā)生錯(cuò)誤的Observable使用 |
| doOnTerminate() | Observable終止之前會(huì)被調(diào)用,無(wú)論是正常還是異常終止 |
| doOnSubscribe() | 注冊(cè)一個(gè)動(dòng)作,在觀察者訂閱時(shí)使用 |
| doOnUnsubscribe() | 注冊(cè)一個(gè)動(dòng)作,在觀察者取消訂閱時(shí)使用 |
| doOnNext() | 在onNext前執(zhí)行 |
| doAfterNext() | 在onNext之后執(zhí)行 |
| doAfterTerminate | 終止發(fā)送時(shí)候調(diào)用 |
| doOnLifecycle | 可以在訂閱之后 設(shè)置是否取消訂閱 |
| doFinally | 在最后執(zhí)行 |
| finallyDo() | 注冊(cè)一個(gè)動(dòng)作,在Observable完成時(shí)使用 |
| delay() | 延時(shí)發(fā)射Observable的結(jié)果 |
| delaySubscription() | 延時(shí)處理訂閱請(qǐng)求 |
| timeInterval() | 轉(zhuǎn)換獲取數(shù)據(jù)發(fā)送的時(shí)間間隔 |
| using() | 創(chuàng)建一個(gè)只在Observable生命周期存在的資源 |
| single() | 強(qiáng)制返回單個(gè)數(shù)據(jù),否則拋出異常 |
| singleOrDefault() | 如果Observable完成時(shí)返回了單個(gè)數(shù)據(jù),就返回它,否則返回默認(rèn)數(shù)據(jù) |
| toFuture(),toIterable(),toList() | 將Observable轉(zhuǎn)換為其它對(duì)象或數(shù)據(jù)結(jié)構(gòu) |
條件操作符
根據(jù)條件發(fā)射或變換Observables
| 名稱 | 解析 |
|---|---|
| amb() | 給定多個(gè)Observable,只讓第一個(gè)發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù) |
| defaultIfEmpty() | 發(fā)射來(lái)自原始Observable的數(shù)據(jù),如果原始Observable沒有發(fā)射數(shù)據(jù),就發(fā)射一個(gè)默認(rèn)數(shù)據(jù) |
| switchIfEmpty() | 如果原始Observable沒有發(fā)射數(shù)據(jù),它發(fā)射一個(gè)備用Observable的發(fā)射物 |
| (rxjava-computation-expressions) doWhile() | 發(fā)射原始Observable的數(shù)據(jù)序列,然后重復(fù)發(fā)射這個(gè)序列直到不滿足這個(gè)條件為止 |
| (rxjava-computation-expressions) ifThen() | 只有當(dāng)某個(gè)條件為真時(shí)才發(fā)射原始Observable的數(shù)據(jù)序列,否則發(fā)射一個(gè)空的或默認(rèn)的序列 |
| skipUntil() | 丟棄原始Observable發(fā)射的數(shù)據(jù),直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù),然后發(fā)射原始Observable的剩余數(shù)據(jù) |
| skipWhile() | 丟棄原始Observable發(fā)射的數(shù)據(jù),直到一個(gè)特定的條件為假,然后發(fā)射原始Observable剩余的數(shù)據(jù) |
| (rxjava-computation-expressions) switchCase() | 基于一個(gè)計(jì)算結(jié)果,發(fā)射一個(gè)指定Observable的數(shù)據(jù)序列 |
| takeUntil() | 發(fā)射來(lái)自原始Observable的數(shù)據(jù),直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)或一個(gè)通知 |
| takeWhile(),takeWhileWithIndex() | 發(fā)射原始Observable的數(shù)據(jù),直到一個(gè)特定的條件為真,然后跳過(guò)剩余的數(shù)據(jù) |
| (rxjava-computation-expressions)whileDo() | 如果滿足一個(gè)條件,發(fā)射原始Observable的數(shù)據(jù),然后重復(fù)發(fā)射直到不滿足這個(gè)條件為止 |
(rxjava-computation-expressions): 表示這個(gè)操作符當(dāng)前是可選包 rxjava-
computation-expressions 的一部分,還沒有包含在標(biāo)準(zhǔn)RxJava的操作符集合里,需要自己導(dǎo)包,但是這個(gè)包用的是Rxjava1.0.0-rc3的版本,所以在Rxjava2.0用的話,會(huì)出現(xiàn)問(wèn)題。 可以使用repeat操作符代替
compile 'io.reactivex:rxjava-computation-expressions:0.21.0'
布爾操作符列表
對(duì)原始數(shù)據(jù)發(fā)射源進(jìn)行布爾操作,經(jīng)過(guò)布爾操作之后,接收者就是觀察者接收到的數(shù)據(jù)是布爾值。
| 名稱 | 解析 |
|---|---|
| all() | 判斷是否所有的數(shù)據(jù)項(xiàng)都滿足某個(gè)條件 |
| contains() | 判斷Observable是否會(huì)發(fā)射一個(gè)指定的值 |
| isEmpty() | 判斷Observable是否發(fā)射了一個(gè)值 |
| sequenceEqual() | 判斷兩個(gè)Observables發(fā)射的序列是否相等 |
算術(shù)操作符
這個(gè)模塊需要導(dǎo)下面這個(gè)包,不支持Rxjava2,所以Rxjava2用不了噢:
compile 'io.reactivex:rxjava-math:1.0.0'
| 名稱 | 解析 |
|---|---|
| averageInteger() | 求序列平均數(shù)并發(fā)射 |
| averageLong() | 求序列平均數(shù)并發(fā)射 |
| averageFloat() | 求序列平均數(shù)并發(fā)射 |
| averageDouble() | 求序列平均數(shù)并發(fā)射 |
| max() | 求序列最大值并發(fā)射 |
| maxBy() | 求最大key對(duì)應(yīng)的值并發(fā)射 |
| min() | 求最小值并發(fā)射 |
| minBy() | 求最小Key對(duì)應(yīng)的值并發(fā)射 |
| sumInteger() | 求和并發(fā)射 |
| sumLong() | 求和并發(fā)射 |
| sumFloat() | 求和并發(fā)射 |
| sumDouble() | 求和并發(fā)射 |
聚合操作符:
| 名稱 | 解析 |
|---|---|
| concat() / concatArray | 順序連接多個(gè)Observables |
| count() | 計(jì)算數(shù)據(jù)項(xiàng)的個(gè)數(shù)并發(fā)射結(jié)果 |
| reduce() | 按順序?qū)bservable發(fā)射的每項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)并發(fā)射最終的值 |
| collect() | 將原始Observable發(fā)射的數(shù)據(jù)放到一個(gè)單一的可變的數(shù)據(jù)結(jié)構(gòu)中,然后返回一個(gè)發(fā)射這個(gè)數(shù)據(jù)結(jié)構(gòu)的Observable |
| toList() | 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)列表,然后返回這個(gè)列表 |
| toSortedList() | 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)有序列表,然后返回這個(gè)列表 |
| toMap() | 將序列數(shù)據(jù)轉(zhuǎn)換為一個(gè)Map,Map的key是根據(jù)一個(gè)函數(shù)計(jì)算的 |
| toMultiMap() | 將序列數(shù)據(jù)轉(zhuǎn)換為一個(gè)列表,同時(shí)也是一個(gè)Map,Map的key是根據(jù)一個(gè)函數(shù)計(jì)算的 |
異步操作符
異步操作符屬于單獨(dú)的rxjava-async模塊,它們用于將同步對(duì)象轉(zhuǎn)換為Observable。不支持Rxjava2.0,如果使用Rxjava1.0的話,可以導(dǎo)入下面的包就可以使用異步操作符了。
compile 'io.reactivex:rxjava-async-util:0.21.0'
| 名稱 | 解析 |
|---|---|
| start() | 創(chuàng)建一個(gè)Observable,它發(fā)射一個(gè)函數(shù)的返回值 |
| toAsync() / asyncAction()/ asyncFunc() | 將一個(gè)函數(shù)或者Action轉(zhuǎn)換為已Observable,它執(zhí)行這個(gè)函數(shù)并發(fā)射函數(shù)的返回值 |
| startFuture() | 將一個(gè)返回Future的函數(shù)轉(zhuǎn)換為一個(gè)Observable,它發(fā)射Future的返回值 |
| deferFuture() | 將一個(gè)返回Observable的Future轉(zhuǎn)換為一個(gè)Observable,但是并不嘗試獲取這個(gè)Future返回的Observable,直到有訂閱者訂閱它 |
| forEachFuture() | 傳遞Subscriber方法給一個(gè)Subscriber,但是同時(shí)表現(xiàn)得像一個(gè)Future一樣阻塞直到它完成 |
| fromAction() | 將一個(gè)Action轉(zhuǎn)換為Observable,當(dāng)一個(gè)訂閱者訂閱時(shí),它執(zhí)行這個(gè)action并發(fā)射它的返回值 |
| fromCallable() | 將一個(gè)Callable轉(zhuǎn)換為Observable,當(dāng)一個(gè)訂閱者訂閱時(shí),它執(zhí)行這個(gè)Callable并發(fā)射Callable的返回值,或者發(fā)射異常 |
| fromRunnable() | 將一個(gè)Runnable轉(zhuǎn)換為Observable,當(dāng)一個(gè)訂閱者訂閱時(shí),它執(zhí)行這個(gè)Runnable并發(fā)射Runnable的返回值 |
| runAsync() | 返回一個(gè)StoppableObservable,它發(fā)射某個(gè)Scheduler上指定的Action生成的多個(gè)actions |
ps: 由于這些操作必須等待數(shù)據(jù)發(fā)射完成(通常也必須緩存這些數(shù)據(jù)),它們對(duì)于非常長(zhǎng)或者無(wú)限的序列來(lái)說(shuō)是危險(xiǎn)的,不推薦使用。
連接操作符
一個(gè)可連接的Observable與普通的Observable差不多,除了這一點(diǎn):可連接的Observabe在被訂閱時(shí)并不開始發(fā)射數(shù)據(jù),只有在它的connect()被調(diào)用時(shí)才開始。用這種方法,你可以等所有的潛在訂閱者都訂閱了這個(gè)Observable之后才開始發(fā)射數(shù)據(jù)。
| 名稱 | 解析 |
|---|---|
| ConnectableObservable.connect() | 指示一個(gè)可連接的Observable開始發(fā)射數(shù)據(jù) |
| Observable.publish() | 將一個(gè)Observable轉(zhuǎn)換為一個(gè)可連接的Observable |
| Observable.replay() | 確保所有的訂閱者看到相同的數(shù)據(jù)序列,即使它們?cè)贠bservable開始發(fā)射數(shù)據(jù)之后才訂閱 |
| ConnectableObservable.refCount() | 讓一個(gè)可連接的Observable表現(xiàn)得像一個(gè)普通的Observable |
阻塞操作符列表
在Rxjava1中的BlockingObservable已經(jīng)在Rxjava2中去掉了,在Rxjava2中已經(jīng)集成到了Observable中。
官方說(shuō)明:
https://github.com/ReactiveX/RxJava/wiki/What’s-different-in-2.0
BlockingObservable的不同可以看這里:
http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html
| 名稱 | 解析 |
|---|---|
| blockingForEach() | 對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)調(diào)用一個(gè)方法,會(huì)阻塞直到Observable完成 |
| blockingFirst() | 阻塞直到Observable發(fā)射了一個(gè)數(shù)據(jù),然后返回第一項(xiàng)數(shù)據(jù) |
| blockingMostRecent() | 返回一個(gè)總是返回Observable最近發(fā)射的數(shù)據(jù)的iterable |
| blockingLatest() | 返回一個(gè)iterable,會(huì)阻塞直到或者除非Observable發(fā)射了一個(gè)iterable沒有返回的值,然后返回這個(gè)值 |
| blockingNext() | 返回一個(gè)iterable,阻塞直到返回另外一個(gè)值 |
| blockingLast() | 阻塞直到Observable終止,然后返回最后一項(xiàng)數(shù)據(jù) |
| blockingIterable() | 將Observable轉(zhuǎn)換返回一個(gè)iterable. |
| blockingSingle() | 如果Observable終止時(shí)只發(fā)射了一個(gè)值,返回那個(gè)值,否則拋出異常 |
| blockingSubscribe() | 在當(dāng)前線程訂閱,和forEach類似 |