我所理解的RxJava——上手其實很簡單(二)

前言

歡迎繼續(xù)收看《我所理解的RxJava--上手其實很簡單(二)》,上周出了第一篇,各位程序猿大大的反應還不錯,生平第一篇博文能獲得大家的肯定,內(nèi)心肯定是非常開心的,也堅定了我繼續(xù)寫下去的信念,總之,先謝謝大家送出的魚丸...哦不,是收藏。好了,題外話不多說,進入這一篇的主題,本文主要給大家補充一下上一篇遺留的Subject知識,沒看過上一篇的同學、忘了上一章寫什么的同學、還有其他同學,請進入時光機:我所理解的RxJava——上手其實很簡單(一)溫習一遍,俗話說,“書讀百遍,奇異自見”,看多一遍是一遍,多多益善嘛。溫習完的,請回來繼續(xù)聽講。


關于Subject

關于Subject,官方文檔的解釋是這樣的:Subject可以看成是一個橋梁或者代理,在某些ReactiveX實現(xiàn)中(如RxJava),它同時充當了Observer和Observable的角色。因為它是一個Observer,它可以訂閱一個或多個Observable;又因為它是一個Observable,它可以轉發(fā)它收到(Observe)的數(shù)據(jù),也可以發(fā)射新的數(shù)據(jù)。從官方解釋中,我提取出三個要點:

  1. 它可以充當Observable;
  2. 它可以充當Observer;
  3. 它是Observable和Observer之間的橋梁;

接下來對這三個要點解釋一下,但在解釋之前,要先介紹一下Subject的種類, Subject是一個抽象類,不能通過new來實例化Subject,所以Subject有四個實現(xiàn)類,分別為AsyncSubject、BehaviorSubject、PublishSubjectReplaySubject,每個實現(xiàn)類都有特定的“技能”,下面結合代碼來介紹一下它們各自的“技能”。注意,所有的實現(xiàn)類都由create()方法實例化,無需new,所有的實現(xiàn)類調(diào)用onCompleted()onError(),它的Observer將不再接收數(shù)據(jù);


Subject的分類解析

  • AsyncSubject
    Observer會接收AsyncSubject的```onComplete()``之前的最后一個數(shù)據(jù),如果因異常而終止,AsyncSubject將不會釋放任何數(shù)據(jù),但是會向Observer傳遞一個異常通知。示例代碼如下:
        AsyncSubject<String> asyncSubject = AsyncSubject.create();
        asyncSubject.onNext("asyncSubject1");
        asyncSubject.onNext("asyncSubject2");
        asyncSubject.onNext("asyncSubject3");  
        asyncSubject.onCompleted();
        asyncSubject.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {

                LogUtil.log("asyncSubject onCompleted");  //輸出 asyncSubject onCompleted
            }

            @Override
            public void onError(Throwable e) {

                LogUtil.log("asyncSubject onError");  //不輸出(異常才會輸出)
            }

            @Override
            public void onNext(String s) {

                LogUtil.log("asyncSubject:"+s);  //輸出asyncSubject:asyncSubject3
            }
        });

以上代碼,Observer只會接收asyncSubject的onCompleted()被調(diào)用前的最后一個數(shù)據(jù),即“asyncSubject3”,如果不調(diào)用onCompleted(),Subscriber將不接收任何數(shù)據(jù)。

  • BehaviorSubject
    Observer會接收到BehaviorSubject被訂閱之前的最后一個數(shù)據(jù),再接收其他發(fā)射過來的數(shù)據(jù),如果BehaviorSubject被訂閱之前沒有發(fā)送任何數(shù)據(jù),則會發(fā)送一個默認數(shù)據(jù)。(注意跟AsyncSubject的區(qū)別,AsyncSubject要手動調(diào)用onCompleted(),且它的Observer會接收到onCompleted()前發(fā)送的最后一個數(shù)據(jù),之后不會再接收數(shù)據(jù),而BehaviorSubject不需手動調(diào)用onCompleted(),它的Observer接收的是BehaviorSubject被訂閱前發(fā)送的最后一個數(shù)據(jù),兩個的分界點不一樣,且之后還會繼續(xù)接收數(shù)據(jù)。)示例代碼如下:
    BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default");
    behaviorSubject.onNext("behaviorSubject1");
    behaviorSubject.onNext("behaviorSubject2");
        behaviorSubject.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {

                LogUtil.log("behaviorSubject:complete");
            }

            @Override
            public void onError(Throwable e) {

                LogUtil.log("behaviorSubject:error");
            }

            @Override
            public void onNext(String s) {

                LogUtil.log("behaviorSubject:"+s);
            }
        });

        behaviorSubject.onNext("behaviorSubject3");
        behaviorSubject.onNext("behaviorSubject4");

以上代碼,Observer會接收到behaviorSubject2、behaviorSubject3、behaviorSubject4,如果在behaviorSubject.subscribe()之前不發(fā)送behaviorSubject1、behaviorSubject2,則Observer會先接收到default,再接收behaviorSubject3、behaviorSubject4。

  • PublishSubject
    PublishSubject比較容易理解,相對比其他Subject常用,它的Observer只會接收到PublishSubject被訂閱之后發(fā)送的數(shù)據(jù)。示例代碼如下:
    PublishSubject<String> publishSubject = PublishSubject.create();
    publishSubject.onNext("publishSubject1");
    publishSubject.onNext("publishSubject2");
    publishSubject.subscribe(new Observer<String>() {
           @Override
           public void onCompleted() {
               
           }
    
           @Override
           public void onError(Throwable e) {
    
           }
    
           @Override
           public void onNext(String s) {
               LogUtil.log("publishSubject observer1:"+s);
           }
       });
    publishSubject.onNext("publishSubject3");
    publishSubject.onNext("publishSubject4");
    

以上代碼,Observer只會接收到"behaviorSubject3"、"behaviorSubject4"。

  • ReplaySubject
    ReplaySubject會發(fā)射所有數(shù)據(jù)給觀察者,無論它們是何時訂閱的。也有其它版本的ReplaySubject,在重放緩存增長到一定大小的時候或過了一段時間后會丟棄舊的數(shù)據(jù)。示例代碼如下:
ReplaySubject<String>replaySubject = ReplaySubject.create(); //創(chuàng)建默認初始緩存容量大小為16的ReplaySubject,當數(shù)據(jù)條目超過16會重新分配內(nèi)存空間,使用這種方式,不論ReplaySubject何時被訂閱,Observer都能接收到數(shù)據(jù)
//replaySubject = ReplaySubject.create(100);//創(chuàng)建指定初始緩存容量大小為100的ReplaySubject
//replaySubject = ReplaySubject.createWithSize(2);//只緩存訂閱前最后發(fā)送的2條數(shù)據(jù) 
 //replaySubject=ReplaySubject.createWithTime(1,TimeUnit.SECONDS,Schedulers.computation());  //replaySubject被訂閱前的前1秒內(nèi)發(fā)送的數(shù)據(jù)才能被接收     
replaySubject.onNext("replaySubject:pre1");
replaySubject.onNext("replaySubject:pre2");
replaySubject.onNext("replaySubject:pre3");
replaySubject.subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
                LogUtil.log("replaySubject:" + s);
        }
    });
replaySubject.onNext("replaySubject:after1");
replaySubject.onNext("replaySubject:after2");

以上代碼,由于情況比較多,注釋也已解釋的相當清楚,就不對輸出結果一一表述了,有疑問的自行copy代碼去測試一下。至此,四種Subject類型已經(jīng)介紹完畢,但是需要注意,如果你把 Subject 當作一個 Subscriber 使用,不要從多個線程中調(diào)用它的onNext方法(包括其它的on系列方法),這可能導致同時(非順序)調(diào)用,這會違反Observable協(xié)議,給Subject的結果增加了不確定性。要避免此類問題,官方提出了“串行化”,你可以將 Subject 轉換為一個 SerializedSubject ,類似于這樣:

SerializedSubject<String, Integer> ser = new SerializedSubject(publishSubject);

要點解答

接下來,我們繼續(xù)前面提出的問題,為什么說Subject既可充當Observable,又可充當Observer,是它們兩個之間的橋梁呢?經(jīng)過前面的例子,也許有些人已經(jīng)大概理解了,不理解的且聽我細細道來。首先,從理論上講,Subject繼承了Observable,又實現(xiàn)了Observer接口,所以說它既是Observable又是Observer,完全合理。從實際應用上講,Subject也能實現(xiàn)Observable和Observer相同的功能,口說無憑,我們還是通過代碼來證實比較有說服力。

  • 創(chuàng)建Observable并發(fā)射數(shù)據(jù):
    Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

                subscriber.onNext("I'm Observable");
                subscriber.onCompleted();
            }
        });

用Subject實現(xiàn)為:

PublishSubject<String> publishSubject = PublishSubject.create();
publishSubject.onNext("as Observable");
publishSubject.onCompleted();
  • 創(chuàng)建Observer訂閱Observable并接收數(shù)據(jù):
mObservable.subscribe(new Observer<String>() {
        @Override
         public void onCompleted() {
                
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(String s) {
  
                //接收數(shù)據(jù)
        }
});

用Subject實現(xiàn)為:

     publishSubject.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {

            }
        });

也許有人會問,不是說Subject也可以作為Observer,不能把Subject當作Observer傳入subscribe()中嗎?回答是:當然可以!就象這樣:

PublishSubject<String> publishSubject = PublishSubject.create();
Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {

            subscriber.onNext("as Observer"); 
            subscriber.onCompleted();
        }
}).subscribe(publishSubject);

有沒有發(fā)現(xiàn)問題?publishSubject沒有重寫onNext()方法啊,在哪接收的數(shù)據(jù)?這就是前面說的“橋梁”的問題了,盡管把Subject作為Observer傳入subscribe(),但接收數(shù)據(jù)還是要通過Observer來接收,借用Subject來連接Observable和Observer,整體代碼如下:

PublishSubject<String> publishSubject = PublishSubject.create();
     Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {

                subscriber.onNext("as Bridge");
                subscriber.onCompleted();
            }
        }).subscribe(publishSubject);

        publishSubject.subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {

                LogUtil.log("subject:"+s); //接收到 as Bridge
            }
        });
    ```
沒錯,這很橋梁!
***
##總結
關于Subject,到此就介紹完了。也許你會跟我一樣困惑,為什么又要多個Subject出來,除了有幾個特定功能之外,其他所有的一切,Observable和Observer也都有,而且寫法上也沒有原來的簡便。確實如此,對于幾個特定功能,我也還想不到有什么應用場景,至少我還沒發(fā)現(xiàn)有什么場景必須得用Subject來實現(xiàn)不可,那么問題又來了,我為什么要花這么大篇幅來介紹Subject,理由有三。其一,既然官方推出Subject,必有其道理,還沒遇到不代表以后不會遇到,更不能代表你不會遇到這樣的應用場景;其二,“一千個讀者有一千個哈姆雷特”,我所看到的并不是全部,也許你會發(fā)掘出更有意思的東西可不是?其三,我可不想當你看完我所有關于RxJava的文章,自信已上手RxJava,當有人跟你提起Subject的時候,你一臉茫然不知道Subject是什么東西,豈不哀哉?所以呢,介紹一下Subject還是很有意義的,最起碼學了比沒學好,“養(yǎng)兵千日用兵一時”,知識不嫌多,突然哪天就用上了呢。對于Subject的理解,有異議的歡迎底下評論,一起交流進步。下一篇文章,進入RxJava操作符的使用講解。
***
本猿已轉行,勿念。
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容