RxJava RxAndroid - 觀察模式 + 異步 + 函數(shù)式

一、前言

閱讀本文前,建議詳細(xì)閱讀并掌握什么是觀察者模式。

https://github.com/ReactiveX/RxJava
https://github.com/ReactiveX/RxAndroid
RxJava是ReactiveX的一種Java的實(shí)現(xiàn)形式,擁有三大部分

  • 觀察者模式,即定義對(duì)象間一種一對(duì)多的依賴關(guān)系,當(dāng)一個(gè)對(duì)象改變狀態(tài)時(shí),則所有依賴它的對(duì)象都會(huì)被改變。
  • Iterator模式,即迭代流式編程模式。
  • 函數(shù)式編程模式,即提供一系列函數(shù)樣式的方法供快速開(kāi)發(fā)。

首先RxAndroid基于RxJava,RxAndroid結(jié)合Android添加了很少的類,讓我們?cè)贏ndroid更方便和簡(jiǎn)單的使用RxJava。RxJava和RxAndroid已經(jīng)來(lái)到了3.0+版本

RxJava核心思想:觀察者模式 + 異步 來(lái)處理事件

implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'

二、基本用法

        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext("one");
                emitter.onNext("two");
                //emitter.onComplete();
                emitter.onError(new Throwable("error"));
            }
        });
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.d("yink","onSubscribe ...");
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.d("yink","onNext = " + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.d("yink","onError ... = " + e);
            }

            @Override
            public void onComplete() {
                Log.d("yink","onComplete ...");
            }
        };
        observable.subscribeOn(Schedulers.newThread());
        observable.observeOn(AndroidSchedulers.mainThread());
        Log.d("yink","subscribe ...");
        observable.subscribe(observer);

首先,RxJava基于觀察者模式,所以它的結(jié)構(gòu)很簡(jiǎn)單,無(wú)非觀察者模式三步驟:

  • Observable被觀察者,ObservableEmitter為被觀察者的幾個(gè)事件。(onComplete和onError唯一且互斥)
  • Observer觀察者,觀察幾個(gè)方法
  • observable.subscribe(observer)提交觀察者到被觀察者

其次,RxJava還額外實(shí)現(xiàn)了線程調(diào)度,即控制觀察者和被觀察者的代碼在哪個(gè)線程執(zhí)行:

  • observable.subscribeOn(Schedulers.newThread());設(shè)置被觀察者開(kāi)啟新線程來(lái)調(diào)度
  • observable.observeOn(AndroidSchedulers.mainThread());設(shè)置觀察者在主線程調(diào)度
    所以被觀察者中我們就可以做一些耗時(shí)操作,然后ObservableEmitter通知到觀察者,觀察者中可以刷新UI等操作。
2020-03-18 10:36:27.841 12678-12678/com.example.demo D/yink: subscribe ...
2020-03-18 10:36:27.841 12678-12678/com.example.demo D/yink: onSubscribe ...
2020-03-18 10:36:27.841 12678-12678/com.example.demo D/yink: onNext = one
2020-03-18 10:36:27.841 12678-12678/com.example.demo D/yink: onNext = two
2020-03-18 10:36:27.841 12678-12678/com.example.demo D/yink: onError ... = java.lang.Throwable: error

整體看來(lái)RxJava是為了讓我們更加優(yōu)雅的異步,異步指定線程方便,過(guò)程好控制,還可多個(gè)觀察者,指定不同線程,這樣異步真的太方便了。

三、詳細(xì)介紹

現(xiàn)在我們知道RxJava大體框架,觀察者模式+異步。下面我們來(lái)看詳細(xì)介紹。詳細(xì)介紹分為下面幾個(gè)部分:

  • Observer 觀察者的各種擴(kuò)展寫法
  • Observable 被觀察者的各種拓展寫法
  • 線程調(diào)度,指定線程操作相關(guān)

3.1、觀察者Observer

觀察者的擴(kuò)展寫法就比較簡(jiǎn)單

public final Disposable subscribe() 
void subscribe(@NonNull Observer<? super T> observer);
public final Disposable subscribe(@NonNull Consumer<? super T> onNext) 
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, 
      @NonNull Consumer<? super Throwable> onError) 
public final Disposable subscribe(@NonNull Consumer<? super T> onNext, 
      @NonNull Consumer<? super Throwable> onError,
      @NonNull Action onComplete) 
public final Disposable subscribe(@NonNull Action onComplete) {
public final Completable doOnSubscribe(@NonNull Consumer<? super Disposable> onSubscribe) {
public final void blockingSubscribe(@NonNull Action onComplete) {
...
  • 可以看到我們提交觀察者時(shí),觀察者的幾個(gè)變種
  • subscribe() 實(shí)際提交的是一個(gè)Functions.emptyConsumer()
  • onNext事件:Consumer<? super T>
  • onError事件:Consumer<? super Throwable>
  • onComplete事件:Action
  • Observer<? super T> observer:多種繼承自O(shè)bserver的類也可以
    所以,我們?cè)诒O(jiān)聽(tīng)上,就可以簡(jiǎn)單只監(jiān)聽(tīng)部分事件。例如
Observable.just("one").subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Throwable {

            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Throwable {
                
            }
        });

這個(gè)例子就提交了兩個(gè)觀察者。一個(gè)觀察者只監(jiān)聽(tīng)onNext事件,一個(gè)觀察者只監(jiān)聽(tīng)onError事件。當(dāng)然你也可以只提交一個(gè)觀察者。源碼如下,很好理解,我隨便點(diǎn)了一個(gè)subscribe的實(shí)現(xiàn)

public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,
           ...
        LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());
        subscribe(ls);
        return ls;
    }

public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete,
            Consumer<? super Disposable> onSubscribe) {
        super();
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onSubscribe = onSubscribe;
    }

@Override
    public void onNext(T t) {
        if (!isDisposed()) {
            try {
                onNext.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                get().dispose();
                onError(e);
            }
        }
    }

3.2、被觀察者Observable

Observable是被觀察者產(chǎn)生的地方,分下面幾個(gè)方面介紹:

  • Observable的創(chuàng)建
  • Observable操作符歸類
  • Observable擴(kuò)展:Flowable、Maybe、Single、Completable

3.2.1、Observable的創(chuàng)建

事件產(chǎn)生理解上很簡(jiǎn)單,就是創(chuàng)建被觀察者,我們以just的為例,看看它的整體思路

Observable.just("a","b").subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.d("yink","onSubscribe ...");
            }

            @Override
            public void onNext(@NonNull String s) {
                Log.d("yink","onNext = " + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.d("yink","onError ... = " + e);
            }

            @Override
            public void onComplete() {
                Log.d("yink","onComplete ...");
            }
        });

2020-03-24 09:30:13.562 31403-31403/com.example.demo D/yink: onSubscribe ...
2020-03-24 09:30:13.562 31403-31403/com.example.demo D/yink: onNext = a
2020-03-24 09:30:13.562 31403-31403/com.example.demo D/yink: onNext = b
2020-03-24 09:30:13.562 31403-31403/com.example.demo D/yink: onComplete ...

將對(duì)象或者對(duì)象集合轉(zhuǎn)換為一個(gè)會(huì)發(fā)射這些對(duì)象的Observable
just用法很簡(jiǎn)單,直接把數(shù)組以onNext事件傳遞,傳遞完后 調(diào)用onComplete事件
我們接著來(lái)看看just方法的實(shí)現(xiàn),篇幅原因,我省去了一些代碼。

 public static <T> Observable<T> just(@NonNull T item1, @NonNull T item2) {
        ...
        return fromArray(item1, item2);
    }

 public static <T> Observable<T> fromArray(@NonNull T... items) {
        ...
        return RxJavaPlugins.onAssembly(new ObservableFromArray<>(items));
    }

public final class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    public ObservableFromArray(T[] array) {
        this.array = array;
    }

    @Override
    public void subscribeActual(Observer<? super T> observer) {
        FromArrayDisposable<T> d = new FromArrayDisposable<>(observer, array);

        observer.onSubscribe(d);

        if (d.fusionMode) {
            return;
        }

        d.run();
    }

    static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
        ....
        FromArrayDisposable(Observer<? super T> actual, T[] array) {
            this.downstream = actual;
            this.array = array;
        }
        ...
        void run() {
            T[] a = array;
            int n = a.length;

            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                if (value == null) {
                    downstream.onError(new NullPointerException("The element at index " + i + " is null"));
                    return;
                }
                downstream.onNext(value);
            }
            if (!isDisposed()) {
                downstream.onComplete();
            }
        }
    }
}
  • 可以看到調(diào)用流程也很簡(jiǎn)單,把數(shù)組挨個(gè)調(diào)用就可以了,最后發(fā)出onComplete事件
  • 事件的產(chǎn)生代碼都極其類似,最后定位實(shí)現(xiàn)都會(huì)定位到類似ObservableFromArray.java這樣的實(shí)現(xiàn)
  • 事件產(chǎn)生實(shí)現(xiàn)的代碼路徑在“/RxJava-3.x/src/main/java/io/reactivex/rxjava3/internal/operators/observable/**”,此路徑下有很多種實(shí)現(xiàn),包括from系列,just,create,error,timer等等,只是RxJava源碼為我們默認(rèn)添加的一些實(shí)現(xiàn)。
  • 創(chuàng)建觀察者思想都類似,就不一一詳述了,簡(jiǎn)單介紹如下:
用法 簡(jiǎn)單介紹
create 通過(guò)調(diào)用觀察者的方法從頭創(chuàng)建一個(gè)Observable
frome 封裝Iterable、Array、Callable、Action、Runnable、Future,以執(zhí)行為核心,調(diào)用封裝的類型的內(nèi)部事件。將其它的對(duì)象或數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)換為Observable
targetType.from{sourceType}()
這幾種類型都是為了解決特定的問(wèn)題而存在,3.1.3會(huì)簡(jiǎn)單介紹
just 將對(duì)象或者對(duì)象集合轉(zhuǎn)換為一個(gè)會(huì)發(fā)射這些對(duì)象的Observable
defer 普通observable創(chuàng)建對(duì)象時(shí)就確定了參數(shù),defer則是我們提交觀察者時(shí)才去創(chuàng)建被觀察者和參數(shù),相當(dāng)于一個(gè)懶加載,在觀察者訂閱之前不創(chuàng)建這個(gè)Observable,為每一個(gè)觀察者創(chuàng)建一個(gè)新的Observable
range rang(0,10)直接生成0-10事件,創(chuàng)建發(fā)射指定范圍的整數(shù)序列的Observable
interval interval(1, TimeUnit.SECONDS);周期性生成一個(gè)無(wú)限的、永遠(yuǎn)增長(zhǎng)的數(shù)(長(zhǎng)整型)
timer Observable.timer(5, TimeUnit.MINUTES)延時(shí)五分鐘發(fā)送事件
empty 創(chuàng)建一個(gè)不發(fā)射任何數(shù)據(jù)但是正常終止的Observable,只調(diào)用一個(gè)onCompleted方法
never 創(chuàng)建一個(gè)什么事件都不發(fā)送的被觀察者
error 發(fā)送error事件

3.2.2、ObservableObservable操作符歸類

Rxjava提供的操作符真的蠻多的。主要分為下面幾個(gè)大類

  • 直接創(chuàng)建一個(gè)Observable(創(chuàng)建操作)
  • 組合多個(gè)Observable(組合操作)
  • 對(duì)Observable發(fā)射的數(shù)據(jù)執(zhí)行變換操作(變換操作)
  • 從Observable發(fā)射的數(shù)據(jù)中取特定的值(過(guò)濾操作)
  • 轉(zhuǎn)發(fā)Observable的部分值(條件/布爾/過(guò)濾操作)
  • 對(duì)Observable發(fā)射的數(shù)據(jù)序列求值(算術(shù)/聚合操作)

這段歸類描述引用自知乎的一個(gè)回答,Rxjava、rxandroid中的操作,這里我將其制成表格,供大家查閱使用,上面有創(chuàng)建操作符表格,所以下面是省略這部分。

變換操作

用法 簡(jiǎn)單介紹
Buffer 緩存,可以簡(jiǎn)單的理解為緩存,它定期從Observable收集數(shù)據(jù)到一個(gè)集合,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個(gè)
FlatMap 扁平映射,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進(jìn)一個(gè)單獨(dú)的Observable,可以認(rèn)為是一個(gè)將嵌套的數(shù)據(jù)結(jié)構(gòu)展開(kāi)的過(guò)程。
GroupBy 分組,將原來(lái)的Observable分拆為Observable集合,將原始Observable發(fā)射的數(shù)據(jù)按Key分組,每一個(gè)Observable發(fā)射一組不同的數(shù)據(jù)
Map 映射,通過(guò)對(duì)序列的每一項(xiàng)都應(yīng)用一個(gè)函數(shù)變換Observable發(fā)射的數(shù)據(jù),實(shí)質(zhì)是對(duì)序列中的每一項(xiàng)執(zhí)行一個(gè)函數(shù),函數(shù)的參數(shù)就是這個(gè)數(shù)據(jù)項(xiàng)
Scan 掃描,對(duì)Observable發(fā)射的每一項(xiàng)數(shù)據(jù)應(yīng)用一個(gè)函數(shù),然后按順序依次發(fā)射這些值
Window 窗口,定期將來(lái)自O(shè)bservable的數(shù)據(jù)分拆成一些Observable窗口,然后發(fā)射這些窗口,而不是每次發(fā)射一項(xiàng)。類似于Buffer,但Buffer發(fā)射的是數(shù)據(jù),Window發(fā)射的是Observable,每一個(gè)Observable發(fā)射原始Observable的數(shù)據(jù)的一個(gè)子集
lift 把事件處理一次后再發(fā)送到Observer

過(guò)濾操作

用法 簡(jiǎn)單介紹
Debounce 只有在空閑了一段時(shí)間后才發(fā)射數(shù)據(jù),通俗的說(shuō),就是如果一段時(shí)間沒(méi)有操作,就執(zhí)行一次操作
Distinct 去重,過(guò)濾掉重復(fù)數(shù)據(jù)項(xiàng)
ElementAt 取值,取特定位置的數(shù)據(jù)項(xiàng)
Filter 過(guò)濾,過(guò)濾掉沒(méi)有通過(guò)謂詞測(cè)試的數(shù)據(jù)項(xiàng),只發(fā)射通過(guò)測(cè)試的
First 首項(xiàng),只發(fā)射滿足條件的第一條數(shù)據(jù)
IgnoreElements 忽略所有的數(shù)據(jù),只保留終止通知(onError或onCompleted)
Last 末項(xiàng),只發(fā)射最后一條數(shù)據(jù)
Sample 取樣,定期發(fā)射最新的數(shù)據(jù),等于是數(shù)據(jù)抽樣,有的實(shí)現(xiàn)里叫ThrottleFirst
Skip 跳過(guò)前面的若干項(xiàng)數(shù)據(jù)
SkipLast 跳過(guò)后面的若干項(xiàng)數(shù)據(jù)
Take 只保留前面的若干項(xiàng)數(shù)據(jù)
TakeLast 只保留后面的若干項(xiàng)數(shù)據(jù)

組合操作

用法 簡(jiǎn)單介紹
And/Then/When 通過(guò)模式(And條件)和計(jì)劃(Then次序)組合兩個(gè)或多個(gè)Observable發(fā)射的數(shù)據(jù)集
CombineLatest 當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)射了一個(gè)數(shù)據(jù)時(shí),通過(guò)一個(gè)指定的函數(shù)組合每個(gè)Observable發(fā)射的最新數(shù)據(jù)(一共兩個(gè)數(shù)據(jù)),然后發(fā)射這個(gè)函數(shù)的結(jié)果
Join 無(wú)論何時(shí),如果一個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)項(xiàng),只要在另一個(gè)Observable發(fā)射的數(shù)據(jù)項(xiàng)定義的時(shí)間窗口內(nèi),就將兩個(gè)Observable發(fā)射的數(shù)據(jù)合并發(fā)射
Merge 將兩個(gè)Observable發(fā)射的數(shù)據(jù)組合并成一個(gè)
StartWith 在發(fā)射原來(lái)的Observable的數(shù)據(jù)序列之前,先發(fā)射一個(gè)指定的數(shù)據(jù)序列或數(shù)據(jù)項(xiàng)
Switch 將一個(gè)發(fā)射Observable序列的Observable轉(zhuǎn)換為這樣一個(gè)Observable:它逐個(gè)發(fā)射那些Observable最近發(fā)射的數(shù)據(jù)
Zip 打包,使用一個(gè)指定的函數(shù)將多個(gè)Observable發(fā)射的數(shù)據(jù)組合在一起,然后將這個(gè)函數(shù)的結(jié)果作為單項(xiàng)數(shù)據(jù)發(fā)射

錯(cuò)誤處理
這些操作符用于從錯(cuò)誤通知中恢復(fù)

用法 簡(jiǎn)單介紹
Catch 捕獲,繼續(xù)序列操作,將錯(cuò)誤替換為正常的數(shù)據(jù),從onError通知中恢復(fù)
Retry 重試,如果Observable發(fā)射了一個(gè)錯(cuò)誤通知,重新訂閱它,期待它正常終止

輔助操作

用法 簡(jiǎn)單介紹
Delay 延遲一段時(shí)間發(fā)射結(jié)果數(shù)據(jù)
Do 注冊(cè)一個(gè)動(dòng)作占用一些Observable的生命周期事件,相當(dāng)于Mock某個(gè)操作
Materialize/Dematerialize 將發(fā)射的數(shù)據(jù)和通知都當(dāng)做數(shù)據(jù)發(fā)射,或者反過(guò)來(lái)
ObserveOn 指定觀察者觀察Observable的調(diào)度程序(工作線程)
Serialize 強(qiáng)制Observable按次序發(fā)射數(shù)據(jù)并且功能是有效的
Subscribe 收到Observable發(fā)射的數(shù)據(jù)和通知后執(zhí)行的操作
SubscribeOn 指定Observable應(yīng)該在哪個(gè)調(diào)度程序上執(zhí)行
TimeInterval 將一個(gè)Observable轉(zhuǎn)換為發(fā)射兩個(gè)數(shù)據(jù)之間所耗費(fèi)時(shí)間的Observable
Timeout 添加超時(shí)機(jī)制,如果過(guò)了指定的一段時(shí)間沒(méi)有發(fā)射數(shù)據(jù),就發(fā)射一個(gè)錯(cuò)誤通知
Timestamp 給Observable發(fā)射的每個(gè)數(shù)據(jù)項(xiàng)添加一個(gè)時(shí)間戳
Using 創(chuàng)建一個(gè)只在Observable的生命周期內(nèi)存在的一次性資源

條件和布爾操作

用法 簡(jiǎn)單介紹
All 判斷Observable發(fā)射的所有的數(shù)據(jù)項(xiàng)是否都滿足某個(gè)條件
Amb 給定多個(gè)Observable,只讓第一個(gè)發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù)
Contains 判斷Observable是否會(huì)發(fā)射一個(gè)指定的數(shù)據(jù)項(xiàng)
DefaultIfEmpty 發(fā)射來(lái)自原始Observable的數(shù)據(jù),如果原始Observable沒(méi)有發(fā)射數(shù)據(jù),就發(fā)射一個(gè)默認(rèn)數(shù)據(jù)
SequenceEqual 判斷兩個(gè)Observable是否按相同的數(shù)據(jù)序列
SkipUntil 丟棄原始Observable發(fā)射的數(shù)據(jù),直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù),然后發(fā)射原始Observable的剩余數(shù)據(jù)
SkipWhile 丟棄原始Observable發(fā)射的數(shù)據(jù),直到一個(gè)特定的條件為假,然后發(fā)射原始Observable剩余的數(shù)據(jù)
TakeUntil 發(fā)射來(lái)自原始Observable的數(shù)據(jù),直到第二個(gè)Observable發(fā)射了一個(gè)數(shù)據(jù)或一個(gè)通知
TakeWhile 發(fā)射原始Observable的數(shù)據(jù),直到一個(gè)特定的條件為真,然后跳過(guò)剩余的數(shù)據(jù)

算術(shù)和聚合操作

用法 簡(jiǎn)單介紹
Average 計(jì)算Observable發(fā)射的數(shù)據(jù)序列的平均值,然后發(fā)射這個(gè)結(jié)果
Concat 不交錯(cuò)的連接多個(gè)Observable的數(shù)據(jù)
Count 計(jì)算Observable發(fā)射的數(shù)據(jù)個(gè)數(shù),然后發(fā)射這個(gè)結(jié)果
Max 計(jì)算并發(fā)射數(shù)據(jù)序列的最大值
Min 計(jì)算并發(fā)射數(shù)據(jù)序列的最小值
Reduce 按順序?qū)?shù)據(jù)序列的每一個(gè)應(yīng)用某個(gè)函數(shù),然后返回這個(gè)值
Sum 計(jì)算并發(fā)射數(shù)據(jù)序列的和

連接操作

用法 簡(jiǎn)單介紹
Connect 指示一個(gè)可連接的Observable開(kāi)始發(fā)射數(shù)據(jù)給訂閱者
Publish 將一個(gè)普通的Observable轉(zhuǎn)換為可連接的
RefCount 使一個(gè)可連接的Observable表現(xiàn)得像一個(gè)普通的Observable
Replay 確保所有的觀察者收到同樣的數(shù)據(jù)序列,即使他們?cè)贠bservable開(kāi)始發(fā)射數(shù)據(jù)之后才訂閱

轉(zhuǎn)換操作

用法 簡(jiǎn)單介紹
To 將Observable轉(zhuǎn)換為其它的對(duì)象或數(shù)據(jù)結(jié)構(gòu)
Blocking 阻塞Observable的操作符

這幾個(gè)表格的目的是方便大家查閱,我們不需要一口氣學(xué)習(xí)所有操作符,知其思想即可。這些操作符目的加工Observable
由于篇幅原因這里只舉一個(gè)map例子,上面所有操作符都很容易上手使用,就不過(guò)多闡述。

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        }
    }).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return "map add " + integer;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("yink", s);
        }
    });

2020-03-24 14:59:30.518 8467-8467/com.example.demo D/yink: map add 1
2020-03-24 14:59:30.518 8467-8467/com.example.demo D/yink: map add 2
2020-03-24 14:59:30.518 8467-8467/com.example.demo D/yink: map add 3

自定義操作符
自定義操作符有兩個(gè)方向:

  • 對(duì)Observable被觀察者中的數(shù)據(jù)動(dòng)刀,讓數(shù)據(jù)轉(zhuǎn)換一次,最后再發(fā)送到觀察者。
  • 對(duì)Observable被觀察者直接動(dòng)刀,轉(zhuǎn)換一次被觀察者
    1、實(shí)現(xiàn)ObservableOperator接口,轉(zhuǎn)換數(shù)據(jù)
@FunctionalInterface
public interface ObservableOperator<@NonNull Downstream, @NonNull Upstream> {
    @NonNull
    Observer<? super Upstream> apply(@NonNull Observer<? super Downstream> observer) throws Throwable;
}

Observable.just("1","2")
                  .lift(new YourClass<String>())

2、實(shí)現(xiàn)ObservableOperator

public interface ObservableTransformer<Upstream, Downstream> {
    @NonNull
    ObservableSource<Downstream> apply(@NonNull Observable<Upstream> upstream);
}

Observable.just("1","2")
                  .compose(new YourTransformer())
                  .subscribe...

3.2.3、Observable擴(kuò)展

Flowable、Maybe、Single、Completable

Flowable
Flowable為了解決背壓的問(wèn)題而存在。直接上例子:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for(int j = 0; j <= 100; j++){
                    e.onNext(j);
                    Log.i("yink"," send id = " + j);
                    try{
                        Thread.sleep(50);
                    }catch (Exception ex){
                    }
                }
            }
        }, BackpressureStrategy.MISSING)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(Long.MAX_VALUE); //觀察者設(shè)置接收事件的數(shù)量,如果不設(shè)置接收不到事件
                    }
                    @Override
                    public void onNext(Integer integer) {
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Log.e("yink","onNext = " + integer);
                    }
                    @Override
                    public void onError(Throwable t) {
                        Log.e("yink","onError = " + t.toString());
                    }
                    @Override
                    public void onComplete() {
                        Log.e("yink","onComplete");
                    }
                });
  • 背壓:被觀察者發(fā)送事件的速度大于觀察者接收的速度時(shí),觀察者內(nèi)部是創(chuàng)建一個(gè)無(wú)限大的緩存來(lái)暫存沒(méi)有處理的事件。如果事件太多會(huì)導(dǎo)致OOM。
  • Flowable提供了幾種方式來(lái)避免無(wú)限放大的緩存
  • Flowable 在使用上并無(wú)太大差別,多了一個(gè)BackpressureStrategy參數(shù),必須指定接收事件數(shù)量
  • Flowable 緩存池默認(rèn)大小是128
  • BackpressureStrategy參數(shù)就是Flowable設(shè)置在背壓發(fā)生時(shí)處理的方式,參數(shù)類型如下:
public enum BackpressureStrategy {
    MISSING,
    ERROR,
    BUFFER,
    DROP,
    LATEST
}
類型 簡(jiǎn)單介紹
MISSING 不采取任何背壓策略(既不丟棄,也不緩存),超出隊(duì)列大小會(huì)拋出onError事件,不影響事件發(fā)送,拋出MissingBackpressureException: Queue is full?!
ERROR 萬(wàn)一下游跟不上,拋出MissingBackpressureException: create: could not emit value due to lack of requests
BUFFER 緩存所有
DROP 下游處理跟不上,刪除最新
LATEST 下游處理跟不上,只保留最新

Single
Single,用法如其名,它只有onSuccess和onError事件,只發(fā)送一次事件。例子如下比較簡(jiǎn)單:

      Single.create(new SingleOnSubscribe<String>() {

            @Override
            public void subscribe(@NonNull SingleEmitter<String> e) throws Exception {

                e.onSuccess("1");
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.d("yink","s = " + s.toString());
            }
        });

public interface SingleEmitter<@NonNull T> {
    void onSuccess(@NonNull T t);
    void onError(@NonNull Throwable t);
    void setDisposable(@Nullable Disposable d);
    void setCancellable(@Nullable Cancellable c);
    boolean isDisposed();
    boolean tryOnError(@NonNull Throwable t);
}

Completable
Completable不發(fā)送任何數(shù)據(jù),發(fā)送onComplete和onError,用法也比較簡(jiǎn)單,下面例子就是,線程執(zhí)行完了,接著可以做別的。因?yàn)榫€程執(zhí)行時(shí)也不用發(fā)出什么事件,或者發(fā)送值。
Completable還支持toXXX,轉(zhuǎn)換成Flowable/Single/Maybe/Observable

Completable.create(new CompletableOnSubscribe() {
            @Override
            public void subscribe(@NonNull CompletableEmitter emitter) throws Exception {
                try{
                        Thread.sleep(50);
                    }catch (Exception ex){
                    }
            }
        }).andThen(Observable.range(1, 10))
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.d("yink","integer = " + integer);
            }
        });

public interface CompletableEmitter {
    void onComplete();
    void onError(@NonNull Throwable t);
    void setDisposable(@Nullable Disposable d);
    void setCancellable(@Nullable Cancellable c);
    boolean isDisposed();
}

Maybe
Maybe可以理解為Single和Completable的結(jié)合。看事件就理解了,Maybe只能傳一次值onSuccess,然后支持onComplete和onError事件。

      Maybe.create(new MaybeOnSubscribe<String>() {

            @Override
            public void subscribe(@NonNull MaybeEmitter<String> e) throws Exception {
                e.onSuccess("testA");
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.d("yink", " s = " + s);
            }
        });

public interface MaybeEmitter<@NonNull T> {
    void onSuccess(@NonNull T t);
    void onError(@NonNull Throwable t);
    void onComplete();
    void setDisposable(@Nullable Disposable d);
    void setCancellable(@Nullable Cancellable c);
    boolean isDisposed();
    boolean tryOnError(@NonNull Throwable t);
}

所以Rxjava提供五種被觀察者寫法:Observable、Flowable、Maybe、Single、Completable??梢苑奖阄覀?cè)俸线m的地方用合適的類,寫出更優(yōu)美的代碼。

3.3、線程調(diào)度

此小結(jié)為RxJava最騷的地方,正是因?yàn)镽xJava在觀察者模式上增加了線程調(diào)度,所以讓異步變得如此優(yōu)美。

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                Log.e("yink", "Observable call: " + Thread.currentThread().getName());
                emitter.onNext("one");
                emitter.onNext("two");
            }
        })
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Throwable {
                Log.e("yink", "Observer call: " + Thread.currentThread().getName());
                Log.d("yink","s = " + s);
            }
        });

2020-03-25 09:31:38.388 29612-29724/com.example.demo E/yink: Observable call: RxNewThreadScheduler-1
2020-03-25 09:31:38.417 29612-29612/com.example.demo E/yink: Observer call: main
2020-03-25 09:31:38.417 29612-29612/com.example.demo D/yink: s = one
2020-03-25 09:31:38.417 29612-29612/com.example.demo E/yink: Observer call: main
2020-03-25 09:31:38.417 29612-29612/com.example.demo D/yink: s = two
方法 簡(jiǎn)單介紹
Schedulers.io() 用于IO密集型的操作,線程緩存(有空閑則復(fù)用,否則無(wú)限增加)
Schedulers.newThread() 每次創(chuàng)建一個(gè)新線程,不具有緩存
Schedulers.single() 單線程,先進(jìn)先出
Schedulers.computation() cpu密集型計(jì)算任務(wù),具有固定的線程池,大小為CPU核數(shù),不可以用于IO操作,因?yàn)镮O操作的等待時(shí)間會(huì)浪費(fèi)cpu
Schedulers.trampoline() 立即執(zhí)行當(dāng)前添加任務(wù)A,若有一個(gè)B任務(wù)正在執(zhí)行,則暫停B,執(zhí)行完A后再接著執(zhí)行B
Schedulers.from(Executor) 提供帶入線程池的方式
AndroidSchedulers.mainThread() RxAndroid擴(kuò)展,Android中UI線程中執(zhí)行
AndroidSchedulers.from(Looper) RxAndroid擴(kuò)展,Looper當(dāng)前循環(huán)線程執(zhí)行

RxJava/RxAndroid默認(rèn)給我們線程調(diào)度方式基本涵蓋了我們絕大部分會(huì)使用的情況。

四、RxAndroid

RxAndroid中的代碼很少,它擴(kuò)展的意義在于方便我們結(jié)合Android特性,在UI線程中調(diào)度?;蛘呓Y(jié)合Handler調(diào)度。舉兩個(gè)例子

public class ReactiveFragment extends Fragment {//在UI線程中的例子
    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        Observable.just("one", "two", "three", "four", "five")
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(/* an Observer */);
    }

new Thread(new Runnable() {//在其他線程中的例子
    @Override
    public void run() {
        final Handler handler = new Handler(); //綁定到這個(gè)線程的Handler
        Observable.just("one", "two", "three", "four", "five")
                .subscribeOn(Schedulers.newThread())
                .observeOn(HandlerScheduler.from(handler))
                .subscribe(/* an Observer */)
    }
}, "custom-thread-1").start();

五、寫在最后

到此RxJava和RxAndroid介紹的差不多了。本文主要目的是全方位的介紹一下RxJava,主要拆分成:被觀察者 + 觀察者 + 異步,分部分來(lái)講解。來(lái)達(dá)到對(duì)RxJava有一個(gè)整體上,不光是用法,還有思想上的理解。希望本文對(duì)你有所幫助。有啥對(duì)或者不對(duì)的地方,歡迎交流,指正。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容