RxJava2訂閱流程淺析

使用RxJava2也有7、8個月了,越來越迷上它,使用期間出現(xiàn)各種各樣的問題,有些是理解錯誤,每次都是去看一文檔、看一遍別人的博客,還是迷迷糊糊的,RxJava門檻相對來說還是高一點(diǎn),就像一把雙刃劍,用得好的人會用得很爽,用不好的人,就會覺得很難用。所以還是需要從源碼層面上理解,才能更好的使用它。

訂閱流程簡單使用

入門RxJava2時,最簡單的流程就是如下,如果你用過,相信你也一定很熟悉:

Observable.create()創(chuàng)建一個被觀察者,create()方法需要傳入一個ObservableOnSubscribe對象,ObservableOnSubscribe對象為被訂閱時的回調(diào),回調(diào)傳入一個emitter發(fā)射器,通過emitter發(fā)射器,可以對訂閱者發(fā)送數(shù)據(jù)。

emitter可以發(fā)射onNext()事件,onError()事件和onComplete()事件,其中onError()和onComplete()是互斥的,只能出現(xiàn)一個,onNext()則是可以發(fā)射無限個。

訂閱通過Observable.subscribe()方法進(jìn)行訂閱,可以傳入多個重載的參數(shù),這里介紹每種回調(diào)單獨(dú)配置的重載方法,第一個Consumer為發(fā)送onNext()時回調(diào),第二個Consumer<Throwable>為發(fā)送onError()回調(diào),以及Action,它是onComplete()時回調(diào),最后一個Consumer為取消訂閱時回調(diào)。

subscribe()訂閱方法返回一個Disposable類型對象,它相當(dāng)于是取消訂閱的一個憑證,通過它的dispose()方法,可以進(jìn)行取消訂閱,一般會在Activity的onDestroy()調(diào)用時調(diào)用注銷訂閱。

Disposable disposable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("a");
        emitter.onNext("b");
        emitter.onNext("c");
        //onError()和onComplete()是互斥的,只能出現(xiàn)一個!
        //emitter.onError(new RuntimeException("發(fā)生了異常"));
        emitter.onComplete();
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String data) throws Exception {
        Logger.d("收到數(shù)據(jù) data: " + data);
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        Logger.d("發(fā)生異常: " + throwable.getMessage());
    }
}, new Action() {
    @Override
    public void run() throws Exception {
        Logger.d("發(fā)送完畢");
    }
}, new Consumer<Disposable>() {
    @Override
    public void accept(Disposable disposable) throws Exception {
        Logger.d("取消訂閱");
    }
});
//取消訂閱,一般會在Activity的onDestroy()調(diào)用時調(diào)用注銷訂閱
disposable.dispose();

//輸出
收到數(shù)據(jù) data: " + a
收到數(shù)據(jù) data: " + b
收到數(shù)據(jù) data: " + c
發(fā)送完畢
取消訂閱

create()方法流程

創(chuàng)建可觀察者是通過Observable.create()靜態(tài)工廠方法創(chuàng)建的,我們點(diǎn)進(jìn)去看一下。

create()方法一開始看起來有點(diǎn)復(fù)雜,我們一步步來:

  1. ObjectHelper.requireNonNull(source, "source is null"),就是非空檢查,如果傳入的source對象為空,則拋出異常。
  2. RxJavaPlugins.onAssembly(xxx),是RxJava提供的Hook操作,我們一般不會設(shè)置,可以先忽略
  3. new ObservableCreate<T>(source),創(chuàng)建了一個ObservableCreate類,將傳進(jìn)來的source作為參數(shù)傳入。(source對象是我們創(chuàng)建的ObservableOnSubscribe對象,被訂閱時的回調(diào)對象)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    //重點(diǎn)是ObservableCreate類
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

將非空檢查和Hook操作去掉,則核心代碼是這樣的:

是不是一下子清爽了很多~那么我們繼續(xù)跟進(jìn)ObservableCreate類

@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    //重點(diǎn)是ObservableCreate類
    return new ObservableCreate<T>(source);
}
  • ObservableCreate類結(jié)構(gòu)
  1. ObservableCreate類繼承于Observable類。
  2. 構(gòu)造方法,將我們傳入的source對象保存(再復(fù)寫下:source對象是我們創(chuàng)建的ObservableOnSubscribe對象,被訂閱時的回調(diào)對象)
  3. 然后?就沒了!ObservableCreate類的創(chuàng)建只保存了我們傳進(jìn)來的source對象!
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    
    //...省略其他方法
}
  • 可觀察者的被訂閱時機(jī)

ObservableCreate的構(gòu)造方法執(zhí)行只保存了source對象,就直接返回出去了。那么ObservableCreate對象什么時候才被用到的呢?

答案是:subscribe()訂閱方法!我們的例子中,使用了Observable.create()方法創(chuàng)建出ObservableCreate對象后,我們接連調(diào)用了subscribe()訂閱方法。我們來看下它是神是鬼!

  1. 首先,我們是4個參數(shù)的subscribe()重載方法,最終會調(diào)用只有一個參數(shù)的Observer的subscribe(Observer)方法。
  2. subscribe()方法,第一步就是對傳入的參數(shù)都進(jìn)行非空判斷,如果為空,則拋出異常。
  3. 對傳進(jìn)來的onNext、onError、onComplete、onSubscribe組裝為一個Observer對象
  4. 調(diào)用subscribe(),傳入Observer對象。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe) {
    //1、一堆參數(shù)的非空判斷!
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

    //2、對傳進(jìn)來的onNext、onError、onComplete、onSubscribe組裝為一個Observer對象
    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

    //3、調(diào)用subscribe(),傳入Observer對象
    subscribe(ls);
    return ls;
}
  • 最終subscribe(observer)方法
  1. 非空判斷,對傳入的observer對象,必須observer訂閱者對象不為空。
  2. Hook操作,提供被注冊的鉤子給RxJavaPlugins,沒有處理,則返回的還是原有的observer。
  3. 非空判斷,RxJavaPlugins.onSubscribe()處理后的observer必須不為空。
  4. 重點(diǎn):調(diào)用subscribeActual()方法!
  5. try-catch,對拋出的異常交給RxJavaPlugins處理,我們通過RxJavaPlugins可以設(shè)置一個全局異常處理。

最終邏輯落在了subscribeActual()方法,方法名翻譯過來是實(shí)際的訂閱,十有八九是這個方法做訂閱操作了。

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    //非空判斷,對傳入的observer對象,必須observer訂閱者對象不為空
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        //Hook操作,提供被注冊的鉤子給RxJavaPlugins,沒有處理,則返回的還是原有的observer
        observer = RxJavaPlugins.onSubscribe(this, observer);
        //非空判斷,RxJavaPlugins.onSubscribe()處理后的observer必須不為空
        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
        //重點(diǎn):調(diào)用subscribeActual()方法!
        subscribeActual(observer);
    } catch (NullPointerException e) {
        //捕獲拋出的空指針,再拋出
        throw e;
    } catch (Throwable e) {
        //將所有拋出的異常,交給RxJavaPlugins處理,提一下,我們通過RxJavaPlugins可以設(shè)置RxJava中拋出的所有異常!
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(e);
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}
  • subscribeActual()方法分析
  1. 納尼,subscribeActual()方法是個抽象方法,具體實(shí)現(xiàn)在哪呢?

先來理一下,一開始Observable.create()創(chuàng)建了ObservableCreate類對象并返回,我們再調(diào)用了subscribe()方法,subscribe()方法內(nèi)部再調(diào)用了subscribeActual()方法,而且它是抽象方法,那么subscribeActual()方法,肯定是被調(diào)用對象ObservableCreate這個對象里!

protected abstract void subscribeActual(Observer<? super T> observer);
  • ObservableCreate類中的subscribeActual()方法

我們回到ObservableCreate類中,subscribeActual()方法的確被復(fù)寫了

  1. 創(chuàng)建發(fā)射器,將訂閱者傳給發(fā)射器
  2. 調(diào)用observer訂閱者的onSubscribe,通知訂閱者,開始訂閱
  3. 調(diào)用source對象的subscribe()方法,通知可觀察者被訂閱了(再復(fù)習(xí)一下,source對象為我們調(diào)用create()對象傳入的ObservableOnSubscribe回調(diào)對象),我們會在這里調(diào)用發(fā)射器的onNext()發(fā)射數(shù)據(jù)
  4. try-catch處理,如果訂閱過程中,發(fā)生異常,調(diào)用訂閱者的onError(),通知訂閱過程發(fā)生異常

subscribeActual()方法很簡短,主要是通知可觀察者被訂閱和訂閱者開始訂閱,異常處理等。

下一步的重點(diǎn)在source.subscribe(parent)中,parent為CreateEmitter類對象,它包裹了被訂閱者,我們在subscribe()方法中,就是Observable.create()中提供的回調(diào)對象的subscribe()方法中,調(diào)用了發(fā)射器emitter的onNext()發(fā)射數(shù)據(jù)給訂閱者,所以下一步的邏輯應(yīng)該是在CreateEmitter發(fā)射器類中。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    //創(chuàng)建發(fā)射器,將訂閱者傳給發(fā)射器
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //調(diào)用observer訂閱者的onSubscribe,通知訂閱者,開始訂閱
    observer.onSubscribe(parent);
    try {
        //再復(fù)習(xí)一下,source對象為我們調(diào)用create()對象傳入的ObservableOnSubscribe回調(diào)對象
        //重點(diǎn):調(diào)用source對象的subscribe()方法,通知可觀察者被訂閱了。我們會在這里調(diào)用發(fā)射器的onNext()發(fā)射數(shù)據(jù)
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        //調(diào)用訂閱者的onError(),通知訂閱過程發(fā)生異常
        parent.onError(ex);
    }
}
  • CreateEmitter發(fā)射器
  1. CreateEmitter發(fā)射器,實(shí)現(xiàn)了ObservableEmitter發(fā)射器接口和Disposable切斷訂閱接口
  2. 構(gòu)造方法,就是保存了observer訂閱者。
  3. onNext(),先空檢查,如果為null,會拋出異常,證明RxJava2后不能發(fā)射null了。沒有解除訂閱,則調(diào)用observer訂閱者的onNext()。
  4. onError()事件,try-catch處理,如果事件中發(fā)生異常則交給RxJavaPlugins處理。
  5. tryOnError(),上面onError()事件的,try-catch處理,如果為空或訂閱者的onError()方法發(fā)生異常,返回true,否則返回false。
  6. onComplete()完成,調(diào)用訂閱者的observer.onComplete()
  7. setCancellable(),提供一個Cancellable回調(diào)對象,當(dāng)被dispose()時回調(diào),可以在dispose()時做解除訂閱時需要做的事情
  8. dispose(),切斷訂閱。
  9. isDisposed(),判斷是否已經(jīng)切斷了訂閱。

到這里,RxJava的訂閱流程就接觸了,下面來總結(jié)一下。

//發(fā)射器,實(shí)現(xiàn)了ObservableEmitter發(fā)射器接口和Disposable切斷接口
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
    private static final long serialVersionUID = -3434801548987643227L;

    //訂閱者
    final Observer<? super T> observer;

    //構(gòu)造方法,就是保存了observer訂閱者
    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        //先空檢查,如果為null,會拋出異常,證明RxJava2后不能發(fā)射null了
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        //沒有解除訂閱,則調(diào)用observer訂閱者的onNext()
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        //onError()事件,try-catch處理,如果事件中發(fā)生異常則交給RxJavaPlugins處理
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

    @Override
    public boolean tryOnError(Throwable t) {
        //上面onError()事件的,try-catch處理,如果為空或訂閱者的onError()方法發(fā)生異常,返回true,否則返回false
        if (t == null) {
            t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        }
        if (!isDisposed()) {
            try {
                observer.onError(t);
            } finally {
                dispose();
            }
            return true;
        }
        return false;
    }

    @Override
    public void onComplete() {
        //onComplete()完成,調(diào)用訂閱者的observer.onComplete()
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }

    @Override
    public void setDisposable(Disposable d) {
        DisposableHelper.set(this, d);
    }

    @Override
    public void setCancellable(Cancellable c) {
        //提供一個Cancellable回調(diào)對象,當(dāng)被dispose()時回調(diào),可以在dispose()時做解除訂閱時需要做的事情
        setDisposable(new CancellableDisposable(c));
    }

    @Override
    public ObservableEmitter<T> serialize() {
        //序列化發(fā)射的數(shù)據(jù),使用一個SerializedEmitter類來包裹CreateEmitter,裝飾者模式,如果你發(fā)射的數(shù)據(jù)在不同線程中,可以調(diào)用serialize()來讓發(fā)射的數(shù)據(jù)按發(fā)射順序發(fā)射,不是本篇重點(diǎn),就不分析SerializedEmitter類
        return new SerializedEmitter<T>(this);
    }

    @Override
    public void dispose() {
        //切斷訂閱
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        //判斷是否已經(jīng)切斷了訂閱
        return DisposableHelper.isDisposed(get());
    }

    @Override
    public String toString() {
        return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
    }
}

流程總結(jié)

RxJava2的總體流程:

RxJava2的訂閱流程還算清晰,主要是要清楚幾個切入點(diǎn):

RxJava2訂閱流程.png
  • 創(chuàng)建被觀察者時機(jī)
  • 訂閱時機(jī)
  • 調(diào)用發(fā)射器發(fā)射數(shù)據(jù)時機(jī)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容