RxJava 消息訂閱和線程切換的源碼分析

一、前言

這里就不詳細(xì)介紹怎么使用 RxJava 了,沒用過的自行去 github 瞅瞅 >>>>> 地址
本文源碼基于 rxjava:2.2.15

二、RxJava 的訂閱流程

咱們先來看個栗子:

        //步驟一:創(chuàng)建被觀察者Observable,定義要發(fā)送的事件
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("0");
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onComplete();
            }
        });
        //步驟二:創(chuàng)建觀察者Observer,接收事件并作處理
        Observer<String> observer = new Observer<String>() {
            @Override public void onSubscribe(Disposable d) {
                Log.d("RxJava", "onSubscribe");
            }

            @Override public void onNext(String s) {
                Log.d("RxJava", "onNext: " + s);
            }

            @Override public void onError(Throwable e) {
                Log.d("RxJava", "onError");
            }

            @Override public void onComplete() {
                Log.d("RxJava", "onComplete");
            }
        };
        //步驟三:觀察者訂閱被觀察者
        observable.subscribe(observer);

輸出結(jié)果:

onSubscribe
onNext: 0
onNext: 1
onNext: 2
onComplete

這里存在這么幾個角色,被觀察者(Observable)、觀察者(Observer)、事件(Event)、訂閱(Subscribe)。被觀察者是負(fù)責(zé)生產(chǎn)事件的,觀察者是負(fù)責(zé)接收事件并作處理,事件是被觀察者和觀察者的消息載體,也就是栗子中的 "0"、"1"、"2",訂閱是連接被觀察者和觀察者。

1、創(chuàng)建被觀察者過程

首先咱們來瞅瞅 Observable 的 create() 方法里面到底都干了什么事情

1.1、Observable 類的 create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null"); //這里就是一個判空處理
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

可以看到其實這里就是先創(chuàng)建了一個 ObservableCreate 對象,同時把我們定義好的 ObservableOnSubscribe 對象作為參數(shù)傳入進(jìn)去,最后調(diào)用了 RxJavaPlugins.onAssembly() 方法。

咱們先看看這個 ObservableCreate 類

1.2、ObservableCreate 類
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    // 省略無關(guān)代碼...
}

可以看到 ObservableCreate 類是繼承自 Observable 抽象類的, 然后把咱們傳入的 ObservableOnSubscribe 對象存儲了起來。

再看下這個方法 RxJavaPlugins.onAssembly()

1.3、RxJavaPlugins.onAssembly()
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        // 省略無關(guān)代碼...
        return source;
    }

最終僅僅是把我們 new 出的 ObservableCreate 對象給返回來了。

1.4、小結(jié)

所以 Observable.create() 方法僅僅是把我們定義好的 ObservableOnSubscribe 對象重新包裝成了一個 ObservableCreate 對象。

2、創(chuàng)建觀察者過程

Observer<String> observer = new Observer<String>() {
      @Override public void onSubscribe(Disposable d) {
                Log.d("RxJava", "onSubscribe");
            }

            @Override public void onNext(String s) {
                Log.d("RxJava", "onNext: " + s);
            }

            @Override public void onError(Throwable e) {
                Log.d("RxJava", "onError");
            }

            @Override public void onComplete() {
                Log.d("RxJava", "onComplete");
            }
};

很簡單,這里就是做了一個實現(xiàn)了 Observer 接口的匿名內(nèi)部類實例化。

3、訂閱過程

接下來我們一起看看訂閱過程,點進(jìn)去 observable.subscribe(observer);

public final void subscribe(Observer<? super T> observer) {
    // 省略無關(guān)代碼
    observer = RxJavaPlugins.onSubscribe(this, observer);

    subscribeActual(observer);
      
    // 省略無關(guān)代碼
}

先分析第一行代碼:

3.1、RxJavaPlugins.onSubscribe()
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        // 省略無關(guān)代碼
        return observer;
    }

跟之前代碼一樣,這里僅僅是把傳入的 Observer 對象給返回來了

再來分析第二行代碼:

3.2、Observable 類的 subscribeActual()
protected abstract void subscribeActual(Observer<? super T> observer);

很明顯,這是抽象類 Observable 類的一個抽象方法,那它的具體實現(xiàn)在哪呢?其實它的具體實現(xiàn)類就是我們在前面創(chuàng)建被觀察者時創(chuàng)建的 ObservableCreate 類,它就是 Observable 的子類,現(xiàn)在來看它的具體實現(xiàn)

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

這里第一步創(chuàng)建了一個 CreateEmitter 對象,第二步調(diào)用了 Observer 類的 onSubscribe() 方法,第三步調(diào)用了 ObservableOnSubscribe 類的 subscribe() 方法,其中這個 source 就是我們之前創(chuàng)建 ObservableCreate 對象傳入進(jìn)去的 ObservableOnSubscribe 對象。

同樣地,先看這個 CreateEmitter 類的創(chuàng)建過程:

3.3、CreateEmitter 類
static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        // 省略無關(guān)代碼
    }

CreateEmitter 類繼承了原子引用類 AtomicReference,實現(xiàn)了 ObservableEmitter 和 Disposable 接口,把我們傳入的 Observer 對象存儲了起來,又是一個重新包裝新對象的用法。

3.4、Observer 類的 onSubscribe()
observer.onSubscribe(parent);

這個 onSubscribe() 回調(diào)的含義其實就是告訴觀察者已經(jīng)成功訂閱了被觀察者

3.5、ObservableOnSubscribe 接口的 subscribe()
 source.subscribe(parent);

這個 source 就是我們一開始傳入的 ObservableOnSubscribe 對象,即這里會調(diào)用 ObservableOnSubscribe 的 subscribe() 方法,它的方法如下:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("0");
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onComplete();
            }
        });

subscribe() 里的 onNext() 是用于將事件流發(fā)送出去,最后調(diào)用 onComplete() 方法代表完成了訂閱過程。這里的 ObservableEmitter 接口其具體實現(xiàn)為 CreateEmitter 類,所以我們需要看看 CreateEmitter 類里的 onNext() 和 onComplete() 方法的實現(xiàn)

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        @Override
        public void onNext(T t) {
             // 省略無關(guān)代碼...
            if (!isDisposed()) {
                // 調(diào)用觀察者的 onNext()
                observer.onNext(t);
            }
        }

        // 省略無關(guān)代碼...

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    // 調(diào)用觀察者的 onComplete()
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

      // 省略無關(guān)代碼...

可以看到,最終就是會調(diào)用觀察者的 onNext() 和 onComplete() 方法。至此,一個完整的消息訂閱流程就完成了。

三、RxJava 的線程切換

先給出線程切換的栗子:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("0");
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onComplete();
            }
        })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<String>() {
                @Override public void onSubscribe(Disposable d) {
                    Log.d("RxJava", "onSubscribe");
                }

                @Override public void onNext(String s) {
                    Log.d("RxJava", "onNext: " + s);
                }

                @Override public void onError(Throwable e) {
                    Log.d("RxJava", "onError");
                }

                @Override public void onComplete() {
                    Log.d("RxJava", "onComplete");
                }
        });

四、總結(jié)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 0.版權(quán)聲明 本文由玉剛說寫作平臺提供寫作贊助,版權(quán)歸玉剛說微信公眾號所有原作者:四月葡萄版權(quán)聲明:未經(jīng)玉剛說許可...
    四月葡萄閱讀 834評論 0 3
  • ??樓主最近在找實習(xí)工作,由于簡歷上說了解RxJava,所以在面試的時候應(yīng)該會問到RxJava的知識,于是樓主結(jié)合...
    瓊珶和予閱讀 3,233評論 3 5
  • 本篇文章已授權(quán)微信公眾號 YYGeeker 獨家發(fā)布轉(zhuǎn)載請標(biāo)明出處 CSDN學(xué)院課程地址RxJava2從入門到精通...
    Hensen_閱讀 641評論 0 1
  • 前言 通過前一篇的從觀察者模式出發(fā),聊聊RxJava,我們大致理解了RxJava的實現(xiàn)原理,在RxJava中可以非...
    CuiTao閱讀 655評論 0 4
  • 我今年25歲了,人生路已經(jīng)走了不知道是三分之一還是四分之一了,而且25歲這年也剩下只有100天了??墒窃谶@個年齡,...
    高袁閱讀 204評論 0 0

友情鏈接更多精彩內(nèi)容