使用RxJava2也有7、8個月了,越來越迷上它,使用期間出現(xiàn)各種各樣的問題,有些是理解錯誤,每次都是去看一文檔、看一遍別人的博客,還是迷迷糊糊的,RxJava門檻相對來說還是高一點(diǎn),就像一把雙刃劍,用得好的人會用得很爽,用不好的人,就會覺得很難用。所以還是需要從源碼層面上理解,才能更好的使用它。
訂閱流程簡單使用
入門RxJava2時,最簡單的流程就是如下,如果你用過,相信你也一定很熟悉:
Observable.create()創(chuàng)建一個被觀察者,create()方法需要傳入一個ObservableOnSubscribe對象,ObservableOnSubscribe對象為被訂閱時的回調(diào),回調(diào)傳入一個emitter發(fā)射器,通過emitter發(fā)射器,可以對訂閱者發(fā)送數(shù)據(jù)。
emitter可以發(fā)射onNext()事件,onError()事件和onComplete()事件,其中onError()和onComplete()是互斥的,只能出現(xiàn)一個,onNext()則是可以發(fā)射無限個。
訂閱通過Observable.subscribe()方法進(jìn)行訂閱,可以傳入多個重載的參數(shù),這里介紹每種回調(diào)單獨(dú)配置的重載方法,第一個Consumer為發(fā)送onNext()時回調(diào),第二個Consumer<Throwable>為發(fā)送onError()回調(diào),以及Action,它是onComplete()時回調(diào),最后一個Consumer為取消訂閱時回調(diào)。
subscribe()訂閱方法返回一個Disposable類型對象,它相當(dāng)于是取消訂閱的一個憑證,通過它的dispose()方法,可以進(jìn)行取消訂閱,一般會在Activity的onDestroy()調(diào)用時調(diào)用注銷訂閱。
Disposable disposable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("a");
emitter.onNext("b");
emitter.onNext("c");
//onError()和onComplete()是互斥的,只能出現(xiàn)一個!
//emitter.onError(new RuntimeException("發(fā)生了異常"));
emitter.onComplete();
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String data) throws Exception {
Logger.d("收到數(shù)據(jù) data: " + data);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Logger.d("發(fā)生異常: " + throwable.getMessage());
}
}, new Action() {
@Override
public void run() throws Exception {
Logger.d("發(fā)送完畢");
}
}, new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Logger.d("取消訂閱");
}
});
//取消訂閱,一般會在Activity的onDestroy()調(diào)用時調(diào)用注銷訂閱
disposable.dispose();
//輸出
收到數(shù)據(jù) data: " + a
收到數(shù)據(jù) data: " + b
收到數(shù)據(jù) data: " + c
發(fā)送完畢
取消訂閱
create()方法流程
創(chuàng)建可觀察者是通過Observable.create()靜態(tài)工廠方法創(chuàng)建的,我們點(diǎn)進(jìn)去看一下。
create()方法一開始看起來有點(diǎn)復(fù)雜,我們一步步來:
- ObjectHelper.requireNonNull(source, "source is null"),就是非空檢查,如果傳入的source對象為空,則拋出異常。
- RxJavaPlugins.onAssembly(xxx),是RxJava提供的Hook操作,我們一般不會設(shè)置,可以先忽略
- new ObservableCreate<T>(source),創(chuàng)建了一個ObservableCreate類,將傳進(jìn)來的source作為參數(shù)傳入。(source對象是我們創(chuàng)建的ObservableOnSubscribe對象,被訂閱時的回調(diào)對象)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
//重點(diǎn)是ObservableCreate類
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
將非空檢查和Hook操作去掉,則核心代碼是這樣的:
是不是一下子清爽了很多~那么我們繼續(xù)跟進(jìn)ObservableCreate類
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//重點(diǎn)是ObservableCreate類
return new ObservableCreate<T>(source);
}
- ObservableCreate類結(jié)構(gòu)
- ObservableCreate類繼承于Observable類。
- 構(gòu)造方法,將我們傳入的source對象保存(再復(fù)寫下:source對象是我們創(chuàng)建的ObservableOnSubscribe對象,被訂閱時的回調(diào)對象)
- 然后?就沒了!ObservableCreate類的創(chuàng)建只保存了我們傳進(jìn)來的source對象!
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//...省略其他方法
}
- 可觀察者的被訂閱時機(jī)
ObservableCreate的構(gòu)造方法執(zhí)行只保存了source對象,就直接返回出去了。那么ObservableCreate對象什么時候才被用到的呢?
答案是:subscribe()訂閱方法!我們的例子中,使用了Observable.create()方法創(chuàng)建出ObservableCreate對象后,我們接連調(diào)用了subscribe()訂閱方法。我們來看下它是神是鬼!
- 首先,我們是4個參數(shù)的subscribe()重載方法,最終會調(diào)用只有一個參數(shù)的Observer的subscribe(Observer)方法。
- subscribe()方法,第一步就是對傳入的參數(shù)都進(jìn)行非空判斷,如果為空,則拋出異常。
- 對傳進(jìn)來的onNext、onError、onComplete、onSubscribe組裝為一個Observer對象
- 調(diào)用subscribe(),傳入Observer對象。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
//1、一堆參數(shù)的非空判斷!
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
//2、對傳進(jìn)來的onNext、onError、onComplete、onSubscribe組裝為一個Observer對象
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
//3、調(diào)用subscribe(),傳入Observer對象
subscribe(ls);
return ls;
}
- 最終subscribe(observer)方法
- 非空判斷,對傳入的observer對象,必須observer訂閱者對象不為空。
- Hook操作,提供被注冊的鉤子給RxJavaPlugins,沒有處理,則返回的還是原有的observer。
- 非空判斷,RxJavaPlugins.onSubscribe()處理后的observer必須不為空。
- 重點(diǎn):調(diào)用subscribeActual()方法!
- try-catch,對拋出的異常交給RxJavaPlugins處理,我們通過RxJavaPlugins可以設(shè)置一個全局異常處理。
最終邏輯落在了subscribeActual()方法,方法名翻譯過來是實(shí)際的訂閱,十有八九是這個方法做訂閱操作了。
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
//非空判斷,對傳入的observer對象,必須observer訂閱者對象不為空
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//Hook操作,提供被注冊的鉤子給RxJavaPlugins,沒有處理,則返回的還是原有的observer
observer = RxJavaPlugins.onSubscribe(this, observer);
//非空判斷,RxJavaPlugins.onSubscribe()處理后的observer必須不為空
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//重點(diǎn):調(diào)用subscribeActual()方法!
subscribeActual(observer);
} catch (NullPointerException e) {
//捕獲拋出的空指針,再拋出
throw e;
} catch (Throwable e) {
//將所有拋出的異常,交給RxJavaPlugins處理,提一下,我們通過RxJavaPlugins可以設(shè)置RxJava中拋出的所有異常!
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
- subscribeActual()方法分析
- 納尼,subscribeActual()方法是個抽象方法,具體實(shí)現(xiàn)在哪呢?
先來理一下,一開始Observable.create()創(chuàng)建了ObservableCreate類對象并返回,我們再調(diào)用了subscribe()方法,subscribe()方法內(nèi)部再調(diào)用了subscribeActual()方法,而且它是抽象方法,那么subscribeActual()方法,肯定是被調(diào)用對象ObservableCreate這個對象里!
protected abstract void subscribeActual(Observer<? super T> observer);
- ObservableCreate類中的subscribeActual()方法
我們回到ObservableCreate類中,subscribeActual()方法的確被復(fù)寫了
- 創(chuàng)建發(fā)射器,將訂閱者傳給發(fā)射器
- 調(diào)用observer訂閱者的onSubscribe,通知訂閱者,開始訂閱
- 調(diào)用source對象的subscribe()方法,通知可觀察者被訂閱了(再復(fù)習(xí)一下,source對象為我們調(diào)用create()對象傳入的ObservableOnSubscribe回調(diào)對象),我們會在這里調(diào)用發(fā)射器的onNext()發(fā)射數(shù)據(jù)
- try-catch處理,如果訂閱過程中,發(fā)生異常,調(diào)用訂閱者的onError(),通知訂閱過程發(fā)生異常
subscribeActual()方法很簡短,主要是通知可觀察者被訂閱和訂閱者開始訂閱,異常處理等。
下一步的重點(diǎn)在source.subscribe(parent)中,parent為CreateEmitter類對象,它包裹了被訂閱者,我們在subscribe()方法中,就是Observable.create()中提供的回調(diào)對象的subscribe()方法中,調(diào)用了發(fā)射器emitter的onNext()發(fā)射數(shù)據(jù)給訂閱者,所以下一步的邏輯應(yīng)該是在CreateEmitter發(fā)射器類中。
@Override
protected void subscribeActual(Observer<? super T> observer) {
//創(chuàng)建發(fā)射器,將訂閱者傳給發(fā)射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//調(diào)用observer訂閱者的onSubscribe,通知訂閱者,開始訂閱
observer.onSubscribe(parent);
try {
//再復(fù)習(xí)一下,source對象為我們調(diào)用create()對象傳入的ObservableOnSubscribe回調(diào)對象
//重點(diǎn):調(diào)用source對象的subscribe()方法,通知可觀察者被訂閱了。我們會在這里調(diào)用發(fā)射器的onNext()發(fā)射數(shù)據(jù)
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
//調(diào)用訂閱者的onError(),通知訂閱過程發(fā)生異常
parent.onError(ex);
}
}
- CreateEmitter發(fā)射器
- CreateEmitter發(fā)射器,實(shí)現(xiàn)了ObservableEmitter發(fā)射器接口和Disposable切斷訂閱接口
- 構(gòu)造方法,就是保存了observer訂閱者。
- onNext(),先空檢查,如果為null,會拋出異常,證明RxJava2后不能發(fā)射null了。沒有解除訂閱,則調(diào)用observer訂閱者的onNext()。
- onError()事件,try-catch處理,如果事件中發(fā)生異常則交給RxJavaPlugins處理。
- tryOnError(),上面onError()事件的,try-catch處理,如果為空或訂閱者的onError()方法發(fā)生異常,返回true,否則返回false。
- onComplete()完成,調(diào)用訂閱者的observer.onComplete()
- setCancellable(),提供一個Cancellable回調(diào)對象,當(dāng)被dispose()時回調(diào),可以在dispose()時做解除訂閱時需要做的事情
- dispose(),切斷訂閱。
- isDisposed(),判斷是否已經(jīng)切斷了訂閱。
到這里,RxJava的訂閱流程就接觸了,下面來總結(jié)一下。
//發(fā)射器,實(shí)現(xiàn)了ObservableEmitter發(fā)射器接口和Disposable切斷接口
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
//訂閱者
final Observer<? super T> observer;
//構(gòu)造方法,就是保存了observer訂閱者
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
//先空檢查,如果為null,會拋出異常,證明RxJava2后不能發(fā)射null了
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//沒有解除訂閱,則調(diào)用observer訂閱者的onNext()
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
//onError()事件,try-catch處理,如果事件中發(fā)生異常則交給RxJavaPlugins處理
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
//上面onError()事件的,try-catch處理,如果為空或訂閱者的onError()方法發(fā)生異常,返回true,否則返回false
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
//onComplete()完成,調(diào)用訂閱者的observer.onComplete()
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
//提供一個Cancellable回調(diào)對象,當(dāng)被dispose()時回調(diào),可以在dispose()時做解除訂閱時需要做的事情
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
//序列化發(fā)射的數(shù)據(jù),使用一個SerializedEmitter類來包裹CreateEmitter,裝飾者模式,如果你發(fā)射的數(shù)據(jù)在不同線程中,可以調(diào)用serialize()來讓發(fā)射的數(shù)據(jù)按發(fā)射順序發(fā)射,不是本篇重點(diǎn),就不分析SerializedEmitter類
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
//切斷訂閱
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
//判斷是否已經(jīng)切斷了訂閱
return DisposableHelper.isDisposed(get());
}
@Override
public String toString() {
return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
}
}
流程總結(jié)
RxJava2的總體流程:
RxJava2的訂閱流程還算清晰,主要是要清楚幾個切入點(diǎn):

- 創(chuàng)建被觀察者時機(jī)
- 訂閱時機(jī)
- 調(diào)用發(fā)射器發(fā)射數(shù)據(jù)時機(jī)