Android拾萃 - RxJava2操作符匯總

幾種主要的需求:

  • 直接創(chuàng)建一個(gè)Observable(創(chuàng)建操作)
  • 組合多個(gè)Observable(組合操作)
  • 對(duì)Observable發(fā)射的數(shù)據(jù)執(zhí)行變換操作(變換操作)
  • 從Observable發(fā)射的數(shù)據(jù)中取特定的值(過(guò)濾操作)
  • 轉(zhuǎn)發(fā)Observable的部分值(條件/布爾/過(guò)濾操作)
  • 對(duì)Observable發(fā)射的數(shù)據(jù)序列求值(算術(shù)/聚合操作)

創(chuàng)建操作符

名稱 解析
just() 將一個(gè)或多個(gè)對(duì)象轉(zhuǎn)換成發(fā)射這個(gè)或這些對(duì)象的一個(gè)Observable
fromArray() 將一個(gè)Iterable, 一個(gè)Future, 或者一個(gè)數(shù)組轉(zhuǎn)換成一個(gè)Observable
repeat() 創(chuàng)建一個(gè)重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的Observable
repeatWhen() 創(chuàng)建一個(gè)重復(fù)發(fā)射指定數(shù)據(jù)或數(shù)據(jù)序列的Observable,它依賴于另一個(gè)Observable發(fā)射的數(shù)據(jù)
create() 使用一個(gè)函數(shù)從頭創(chuàng)建一個(gè)Observable
defer() 只有當(dāng)訂閱者訂閱才創(chuàng)建Observable;為每個(gè)訂閱創(chuàng)建一個(gè)新的Observable
range() 創(chuàng)建一個(gè)發(fā)射指定范圍的整數(shù)序列的Observable
interval() 創(chuàng)建一個(gè)按照給定的時(shí)間間隔發(fā)射整數(shù)序列的Observable
timer() 創(chuàng)建一個(gè)在給定的延時(shí)之后發(fā)射單個(gè)數(shù)據(jù)的Observable
empty() 創(chuàng)建一個(gè)什么都不做直接通知完成的Observable
error() 創(chuàng)建一個(gè)什么都不做直接通知錯(cuò)誤的Observable
never() 創(chuàng)建一個(gè)入不發(fā)射任何數(shù)據(jù)的Observable

變換操作符

操作符 解析
buffer() 緩存,可以簡(jiǎn)單的理解為緩存,它定期從Observable收集數(shù)據(jù)到一個(gè)集合,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個(gè)
map() 對(duì)序列的每一項(xiàng)都應(yīng)用一個(gè)函數(shù)來(lái)變換Observable發(fā)射的數(shù)據(jù)序列
flatMap() , concatMap() , flatMapIterable() 將Observable發(fā)射的數(shù)據(jù)集合變換為Observables集合,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進(jìn)一個(gè)單獨(dú)的Observable
switchMap() 將Observable發(fā)射的數(shù)據(jù)集合變換為Observables集合,然后只發(fā)射這些Observables最近發(fā)射的數(shù)據(jù)
scan() 對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后按順序依次發(fā)射每一個(gè)值
groupBy() 將Observable分拆為Observable集合,將原始Observable發(fā)射的數(shù)據(jù)按Key分組,每一個(gè)Observable發(fā)射一組不同的數(shù)據(jù)
buffer() 它定期從Observable收集數(shù)據(jù)到一個(gè)集合,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個(gè)
window() 定期將來(lái)自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口,然后發(fā)射這些窗口,而不是每次發(fā)射一項(xiàng)
cast() 在發(fā)射之前強(qiáng)制將Observable發(fā)射的所有數(shù)據(jù)轉(zhuǎn)換為指定類型

過(guò)濾操作符列表

方法 含義
filter() 過(guò)濾數(shù)據(jù)
takeLast() 只發(fā)射最后的N項(xiàng)數(shù)據(jù)
last() 只發(fā)射最后的一項(xiàng)數(shù)據(jù)
lastOrDefault() 只發(fā)射最后的一項(xiàng)數(shù)據(jù),如果Observable為空就發(fā)射默認(rèn)值
takeLastBuffer() 將最后的N項(xiàng)數(shù)據(jù)當(dāng)做單個(gè)數(shù)據(jù)發(fā)射
skip() 跳過(guò)開始的N項(xiàng)數(shù)據(jù)
skipLast() 跳過(guò)最后的N項(xiàng)數(shù)據(jù)
take() 只發(fā)射開始的N項(xiàng)數(shù)據(jù)
first() , takeFirst() 只發(fā)射第一項(xiàng)數(shù)據(jù),或者滿足某種條件的第一項(xiàng)數(shù)據(jù)
firstOrDefault() 只發(fā)射第一項(xiàng)數(shù)據(jù),如果Observable為空就發(fā)射默認(rèn)值
elementAt() 發(fā)射第N項(xiàng)數(shù)據(jù)
elementAtOrDefault() 發(fā)射第N項(xiàng)數(shù)據(jù),如果Observable數(shù)據(jù)少于N項(xiàng)就發(fā)射默認(rèn)值
sample() , throttleLast() 定期發(fā)射Observable最近的數(shù)據(jù)
throttleFirst() 定期發(fā)射Observable發(fā)射的第一項(xiàng)數(shù)據(jù)
throttleWithTimeout() , debounce() 只有在空閑了一段時(shí)間后才發(fā)射數(shù)據(jù),通俗的說(shuō),就是如果一段時(shí)間沒有操作,就執(zhí)行一次操作
timeout() 如果在一個(gè)指定的時(shí)間段后還沒發(fā)射數(shù)據(jù),就發(fā)射一個(gè)異常
distinct() 過(guò)濾掉重復(fù)數(shù)據(jù)
distinctUntilChanged() 過(guò)濾掉連續(xù)重復(fù)的數(shù)據(jù)
ofType() 只發(fā)射指定類型的數(shù)據(jù)
ignoreElements() 丟棄所有的正常數(shù)據(jù),只發(fā)射錯(cuò)誤或完成通知

結(jié)合操作符

操作符 解析
and() , then() , when() 通過(guò)模式(And條件)和計(jì)劃(Then次序)組合兩個(gè)或多個(gè)Observable發(fā)射的數(shù)據(jù)集
combineLatest() 當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)射了一個(gè)數(shù)據(jù)時(shí),通過(guò)一個(gè)指定的函數(shù)組合每個(gè)Observable發(fā)射的最新數(shù)據(jù)(一共兩個(gè)數(shù)據(jù)),然后發(fā)射這個(gè)函數(shù)的結(jié)果
join() , groupJoin() 無(wú)論何時(shí),如果一個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)項(xiàng),只要在另一個(gè)Observable發(fā)射的數(shù)據(jù)項(xiàng)定義的時(shí)間窗口內(nèi),就將兩個(gè)Observable發(fā)射的數(shù)據(jù)合并發(fā)射
merge() 將多個(gè)Observable合并為一個(gè)
startWith() 在數(shù)據(jù)序列的開頭增加一項(xiàng)數(shù)據(jù),在發(fā)射原來(lái)的Observable的數(shù)據(jù)序列之前,先發(fā)射一個(gè)指定的數(shù)據(jù)序列或數(shù)據(jù)項(xiàng)
switch() 將一個(gè)發(fā)射Observable序列的Observable轉(zhuǎn)換為這樣一個(gè)Observable:它逐個(gè)發(fā)射那些Observable最近發(fā)射的數(shù)據(jù)
switchOnNext() 將一個(gè)發(fā)射Observables的Observable轉(zhuǎn)換成另一個(gè)Observable,后者發(fā)射這些Observables最近發(fā)射的數(shù)據(jù)
zip() 打包,使用一個(gè)指定的函數(shù)將多個(gè)Observable發(fā)射的數(shù)據(jù)組合在一起,然后將這個(gè)函數(shù)的結(jié)果作為單項(xiàng)數(shù)據(jù)發(fā)射
mergeDelayError() 合并多個(gè)Observables,讓沒有錯(cuò)誤的Observable都完成后再發(fā)射錯(cuò)誤通知

錯(cuò)誤處理操作符

用于對(duì)Observable發(fā)射的 onError 通知做出響應(yīng)或者從錯(cuò)誤中恢復(fù),例如,你
可以:

  • 吞掉這個(gè)錯(cuò)誤,切換到一個(gè)備用的Observable繼續(xù)發(fā)射數(shù)據(jù)
  • 吞掉這個(gè)錯(cuò)誤然后發(fā)射默認(rèn)值
  • 吞掉這個(gè)錯(cuò)誤并立即嘗試重啟這個(gè)Observable
  • 吞掉這個(gè)錯(cuò)誤,在一些回退間隔后重啟這個(gè)Observable
名稱 解析
onErrorResumeNext() 指示Observable在遇到錯(cuò)誤時(shí)發(fā)射一個(gè)數(shù)據(jù)序列
onErrorReturn() 讓Observable遇到錯(cuò)誤時(shí)發(fā)射一個(gè)特殊的項(xiàng)并且正常終止。方法返回一個(gè)鏡像原有Observable行為的新Observable,后者會(huì)忽略前者的onError調(diào)用,不會(huì)將錯(cuò)誤傳遞給觀察者,作為替代,它會(huì)發(fā)發(fā)射一個(gè)特殊的項(xiàng)并調(diào)用觀察者的onCompleted方法。
onExceptionResumeNext() 指示Observable遇到錯(cuò)誤時(shí)繼續(xù)發(fā)射數(shù)據(jù)
retry() 指示Observable遇到錯(cuò)誤時(shí)重試
retryWhen() 指示Observable遇到錯(cuò)誤時(shí),將錯(cuò)誤傳遞給另一個(gè)Observable來(lái)決定是否要重新給訂閱這個(gè)Observable
retryUntil() 指示Observable遇到錯(cuò)誤時(shí),是否讓Observable重新訂閱

輔助操作符

用于處理Observable的操作符,例如延遲、定時(shí)等。

名稱 解析
materialize() 將Observable轉(zhuǎn)換成一個(gè)通知列表
dematerialize() 將上面的結(jié)果逆轉(zhuǎn)回一個(gè)Observable
timestamp() 給Observable發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)添加一個(gè)時(shí)間戳
serialize() 強(qiáng)制Observable按次序發(fā)射數(shù)據(jù)并且要求功能是完好的
cache() 記住Observable發(fā)射的數(shù)據(jù)序列并發(fā)射相同的數(shù)據(jù)序列給后續(xù)的訂閱者
observeOn() 指定觀察者觀察Observable的調(diào)度器
subscribeOn() 指定Observable執(zhí)行任務(wù)的調(diào)度器
doOnEach() 注冊(cè)一個(gè)動(dòng)作,對(duì)Observable發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)使用
doOnCompleted() 注冊(cè)一個(gè)動(dòng)作,對(duì)正常完成的Observable使用
doOnError() 注冊(cè)一個(gè)動(dòng)作,對(duì)發(fā)生錯(cuò)誤的Observable使用
doOnTerminate() Observable終止之前會(huì)被調(diào)用,無(wú)論是正常還是異常終止
doOnSubscribe() 注冊(cè)一個(gè)動(dòng)作,在觀察者訂閱時(shí)使用
doOnUnsubscribe() 注冊(cè)一個(gè)動(dòng)作,在觀察者取消訂閱時(shí)使用
doOnNext() 在onNext前執(zhí)行
doAfterNext() 在onNext之后執(zhí)行
doAfterTerminate 終止發(fā)送時(shí)候調(diào)用
doOnLifecycle 可以在訂閱之后 設(shè)置是否取消訂閱
doFinally 在最后執(zhí)行
finallyDo() 注冊(cè)一個(gè)動(dòng)作,在Observable完成時(shí)使用
delay() 延時(shí)發(fā)射Observable的結(jié)果
delaySubscription() 延時(shí)處理訂閱請(qǐng)求
timeInterval() 轉(zhuǎn)換獲取數(shù)據(jù)發(fā)送的時(shí)間間隔
using() 創(chuàng)建一個(gè)只在Observable生命周期存在的資源
single() 強(qiáng)制返回單個(gè)數(shù)據(jù),否則拋出異常
singleOrDefault() 如果Observable完成時(shí)返回了單個(gè)數(shù)據(jù),就返回它,否則返回默認(rèn)數(shù)據(jù)
toFuture(),toIterable(),toList() 將Observable轉(zhuǎn)換為其它對(duì)象或數(shù)據(jù)結(jié)構(gòu)

條件操作符

根據(jù)條件發(fā)射或變換Observables

名稱 解析
amb() 給定多個(gè)Observable,只讓第一個(gè)發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù)
defaultIfEmpty() 發(fā)射來(lái)自原始Observable的數(shù)據(jù),如果原始Observable沒有發(fā)射數(shù)據(jù),就發(fā)射一個(gè)默認(rèn)數(shù)據(jù)
switchIfEmpty() 如果原始Observable沒有發(fā)射數(shù)據(jù),它發(fā)射一個(gè)備用Observable的發(fā)射物
(rxjava-computation-expressions) doWhile() 發(fā)射原始Observable的數(shù)據(jù)序列,然后重復(fù)發(fā)射這個(gè)序列直到不滿足這個(gè)條件為止
(rxjava-computation-expressions) ifThen() 只有當(dāng)某個(gè)條件為真時(shí)才發(fā)射原始Observable的數(shù)據(jù)序列,否則發(fā)射一個(gè)空的或默認(rèn)的序列
skipUntil() 丟棄原始Observable發(fā)射的數(shù)據(jù),直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù),然后發(fā)射原始Observable的剩余數(shù)據(jù)
skipWhile() 丟棄原始Observable發(fā)射的數(shù)據(jù),直到一個(gè)特定的條件為假,然后發(fā)射原始Observable剩余的數(shù)據(jù)
(rxjava-computation-expressions) switchCase() 基于一個(gè)計(jì)算結(jié)果,發(fā)射一個(gè)指定Observable的數(shù)據(jù)序列
takeUntil() 發(fā)射來(lái)自原始Observable的數(shù)據(jù),直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)或一個(gè)通知
takeWhile(),takeWhileWithIndex() 發(fā)射原始Observable的數(shù)據(jù),直到一個(gè)特定的條件為真,然后跳過(guò)剩余的數(shù)據(jù)
(rxjava-computation-expressions)whileDo() 如果滿足一個(gè)條件,發(fā)射原始Observable的數(shù)據(jù),然后重復(fù)發(fā)射直到不滿足這個(gè)條件為止

(rxjava-computation-expressions): 表示這個(gè)操作符當(dāng)前是可選包 rxjava-
computation-expressions 的一部分,還沒有包含在標(biāo)準(zhǔn)RxJava的操作符集合里,需要自己導(dǎo)包,但是這個(gè)包用的是Rxjava1.0.0-rc3的版本,所以在Rxjava2.0用的話,會(huì)出現(xiàn)問(wèn)題。 可以使用repeat操作符代替

compile 'io.reactivex:rxjava-computation-expressions:0.21.0'

布爾操作符列表

對(duì)原始數(shù)據(jù)發(fā)射源進(jìn)行布爾操作,經(jīng)過(guò)布爾操作之后,接收者就是觀察者接收到的數(shù)據(jù)是布爾值。

名稱 解析
all() 判斷是否所有的數(shù)據(jù)項(xiàng)都滿足某個(gè)條件
contains() 判斷Observable是否會(huì)發(fā)射一個(gè)指定的值
isEmpty() 判斷Observable是否發(fā)射了一個(gè)值
sequenceEqual() 判斷兩個(gè)Observables發(fā)射的序列是否相等

算術(shù)操作符

這個(gè)模塊需要導(dǎo)下面這個(gè)包,不支持Rxjava2,所以Rxjava2用不了噢:

    compile 'io.reactivex:rxjava-math:1.0.0'

名稱 解析
averageInteger() 求序列平均數(shù)并發(fā)射
averageLong() 求序列平均數(shù)并發(fā)射
averageFloat() 求序列平均數(shù)并發(fā)射
averageDouble() 求序列平均數(shù)并發(fā)射
max() 求序列最大值并發(fā)射
maxBy() 求最大key對(duì)應(yīng)的值并發(fā)射
min() 求最小值并發(fā)射
minBy() 求最小Key對(duì)應(yīng)的值并發(fā)射
sumInteger() 求和并發(fā)射
sumLong() 求和并發(fā)射
sumFloat() 求和并發(fā)射
sumDouble() 求和并發(fā)射

聚合操作符:

名稱 解析
concat() / concatArray 順序連接多個(gè)Observables
count() 計(jì)算數(shù)據(jù)項(xiàng)的個(gè)數(shù)并發(fā)射結(jié)果
reduce() 按順序?qū)bservable發(fā)射的每項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù)并發(fā)射最終的值
collect() 將原始Observable發(fā)射的數(shù)據(jù)放到一個(gè)單一的可變的數(shù)據(jù)結(jié)構(gòu)中,然后返回一個(gè)發(fā)射這個(gè)數(shù)據(jù)結(jié)構(gòu)的Observable
toList() 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)列表,然后返回這個(gè)列表
toSortedList() 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個(gè)有序列表,然后返回這個(gè)列表
toMap() 將序列數(shù)據(jù)轉(zhuǎn)換為一個(gè)Map,Map的key是根據(jù)一個(gè)函數(shù)計(jì)算的
toMultiMap() 將序列數(shù)據(jù)轉(zhuǎn)換為一個(gè)列表,同時(shí)也是一個(gè)Map,Map的key是根據(jù)一個(gè)函數(shù)計(jì)算的

異步操作符

異步操作符屬于單獨(dú)的rxjava-async模塊,它們用于將同步對(duì)象轉(zhuǎn)換為Observable。不支持Rxjava2.0,如果使用Rxjava1.0的話,可以導(dǎo)入下面的包就可以使用異步操作符了。

compile 'io.reactivex:rxjava-async-util:0.21.0'
名稱 解析
start() 創(chuàng)建一個(gè)Observable,它發(fā)射一個(gè)函數(shù)的返回值
toAsync() / asyncAction()/ asyncFunc() 將一個(gè)函數(shù)或者Action轉(zhuǎn)換為已Observable,它執(zhí)行這個(gè)函數(shù)并發(fā)射函數(shù)的返回值
startFuture() 將一個(gè)返回Future的函數(shù)轉(zhuǎn)換為一個(gè)Observable,它發(fā)射Future的返回值
deferFuture() 將一個(gè)返回Observable的Future轉(zhuǎn)換為一個(gè)Observable,但是并不嘗試獲取這個(gè)Future返回的Observable,直到有訂閱者訂閱它
forEachFuture() 傳遞Subscriber方法給一個(gè)Subscriber,但是同時(shí)表現(xiàn)得像一個(gè)Future一樣阻塞直到它完成
fromAction() 將一個(gè)Action轉(zhuǎn)換為Observable,當(dāng)一個(gè)訂閱者訂閱時(shí),它執(zhí)行這個(gè)action并發(fā)射它的返回值
fromCallable() 將一個(gè)Callable轉(zhuǎn)換為Observable,當(dāng)一個(gè)訂閱者訂閱時(shí),它執(zhí)行這個(gè)Callable并發(fā)射Callable的返回值,或者發(fā)射異常
fromRunnable() 將一個(gè)Runnable轉(zhuǎn)換為Observable,當(dāng)一個(gè)訂閱者訂閱時(shí),它執(zhí)行這個(gè)Runnable并發(fā)射Runnable的返回值
runAsync() 返回一個(gè)StoppableObservable,它發(fā)射某個(gè)Scheduler上指定的Action生成的多個(gè)actions

ps: 由于這些操作必須等待數(shù)據(jù)發(fā)射完成(通常也必須緩存這些數(shù)據(jù)),它們對(duì)于非常長(zhǎng)或者無(wú)限的序列來(lái)說(shuō)是危險(xiǎn)的,不推薦使用。

連接操作符

一個(gè)可連接的Observable與普通的Observable差不多,除了這一點(diǎn):可連接的Observabe在被訂閱時(shí)并不開始發(fā)射數(shù)據(jù),只有在它的connect()被調(diào)用時(shí)才開始。用這種方法,你可以等所有的潛在訂閱者都訂閱了這個(gè)Observable之后才開始發(fā)射數(shù)據(jù)。

名稱 解析
ConnectableObservable.connect() 指示一個(gè)可連接的Observable開始發(fā)射數(shù)據(jù)
Observable.publish() 將一個(gè)Observable轉(zhuǎn)換為一個(gè)可連接的Observable
Observable.replay() 確保所有的訂閱者看到相同的數(shù)據(jù)序列,即使它們?cè)贠bservable開始發(fā)射數(shù)據(jù)之后才訂閱
ConnectableObservable.refCount() 讓一個(gè)可連接的Observable表現(xiàn)得像一個(gè)普通的Observable

阻塞操作符列表

在Rxjava1中的BlockingObservable已經(jīng)在Rxjava2中去掉了,在Rxjava2中已經(jīng)集成到了Observable中。

官方說(shuō)明:

https://github.com/ReactiveX/RxJava/wiki/What’s-different-in-2.0

這里寫圖片描述

BlockingObservable的不同可以看這里:

http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.html

這里寫圖片描述
名稱 解析
blockingForEach() 對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)調(diào)用一個(gè)方法,會(huì)阻塞直到Observable完成
blockingFirst() 阻塞直到Observable發(fā)射了一個(gè)數(shù)據(jù),然后返回第一項(xiàng)數(shù)據(jù)
blockingMostRecent() 返回一個(gè)總是返回Observable最近發(fā)射的數(shù)據(jù)的iterable
blockingLatest() 返回一個(gè)iterable,會(huì)阻塞直到或者除非Observable發(fā)射了一個(gè)iterable沒有返回的值,然后返回這個(gè)值
blockingNext() 返回一個(gè)iterable,阻塞直到返回另外一個(gè)值
blockingLast() 阻塞直到Observable終止,然后返回最后一項(xiàng)數(shù)據(jù)
blockingIterable() 將Observable轉(zhuǎn)換返回一個(gè)iterable.
blockingSingle() 如果Observable終止時(shí)只發(fā)射了一個(gè)值,返回那個(gè)值,否則拋出異常
blockingSubscribe() 在當(dāng)前線程訂閱,和forEach類似
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容