RxAndroid 2.0 學(xué)習(xí)筆記

Rxjava 2.x正式版出來已經(jīng)快兩個月了。在之前的項目中也在使用Rx。但卻一直沒有時間對整個的知識進行梳理,恰好今天抽出時間,也系統(tǒng)的再學(xué)習(xí)一遍RxJava/RxAndroid


RxJava的使用

一、觀察者/被觀察者

1、前奏:
在觀察者之前就要先提下backpressure這個概念。簡單來說,backpressure是在異步場景中,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度時,告訴被觀察者降低發(fā)送速度的策略。

2、在2.0中有以下幾種觀察者

  • Observable/Observer
  • Flowable/Subscriber
  • Single/SingleObserver
  • Completable/CompletableObserver
  • Maybe/MaybeObserver

依次的來看一下:

Observable

Observable
.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
@Override public void onSubscribe(Disposable d) {}
@Override public void onNext(Integer value) {}
@Override public void onError(Throwable e) {}
@Override public void onComplete() {}
});

這里要提的就是onSubscribe(Disposable d),disposable用于取消訂閱。
就用簡單的just這個操作符來分析一下。

@SuppressWarnings("unchecked")
@SchedulerSupport(SchedulerSupport.NONE) 
public static < T > Observable < T > just(T item1, T item2, T item3, T item4) {
    ObjectHelper.requireNonNull(item1, "The first item is null");
    ObjectHelper.requireNonNull(item2, "The second item is null");
    ObjectHelper.requireNonNull(item3, "The third item is null");
    ObjectHelper.requireNonNull(item4, "The fourth item is null");

    return fromArray(item1, item2, item3, item4);
}
@SchedulerSupport(SchedulerSupport.NONE) 
public static < T > Observable < T > fromArray(T...items) {
    ObjectHelper.requireNonNull(items, "items is null");
    if (items.length == 0) {
        return empty();
    } else if (items.length == 1) {
        return just(items[0]);
    }
    return RxJavaPlugins.onAssembly(new ObservableFromArray < T > (items));
}
@Override 
public void subscribeActual(Observer < ?super T > s) {
    FromArrayDisposable < T > d = new FromArrayDisposable < T > (s, array);
    s.onSubscribe(d);
    if (d.fusionMode) {
        return;
    }
    d.run();
}

@Override 
public void dispose() {
    disposed = true;
}

@Override 
public boolean isDisposed() {
    return disposed;
}

void run() {
    T[] a = array;
    int n = a.length;
    for (int i = 0; i < n && !isDisposed(); i++) {
        T value = a[i];
        if (value == null) {
            actual.onError(new NullPointerException("The " + i + "th element is null"));
            return;
        }
        actual.onNext(value);
    }
    if (!isDisposed()) {
        actual.onComplete();
    }
}

just實際調(diào)用了fromArray方法,中創(chuàng)建了ObservableFromArray的實例,在這個實例中實現(xiàn)了Observable這個接口,在調(diào)用subscribe方法進行綁定之后,首先調(diào)用了subscribeActual方法,onSubscribe就會回調(diào)。

在取消綁定是我們可以將Disposable添加到CompositeDisposable中或者直接調(diào)用Disposable的dispose() 方法在流的任意位置取消。

此外, 為了簡化代碼,我使用了Consumer作為觀察者(可以當(dāng)成1.0時候的Action1 、ActionX)subscribe的返回值就是一個Disposable (subscribe 的返回值根據(jù)傳入的參數(shù)不同,也有不同)我把這個對象添加到CompositeDisposable,并在中途取消,但發(fā)射器仍然會把所有的數(shù)據(jù)全都發(fā)射完。因為LambdaSubscriber(也就是傳入Consumer 所構(gòu)造的觀察者)的disposeisDispose 略有不同,并不是簡簡單單的true/false, 說實話,我沒看懂Consumer的這兩個方法干了什么...........尷尬

LambdaSubscriber 瞅瞅

@Override
public void dispose() { 
cancel();
}

@Override
public boolean isDisposed() {  
  return get() == SubscriptionHelper.CANCELLED;
}

Flowable

是2.0之后用的最多的觀察者了,他與上一個的區(qū)別在于支持背壓,也就是說,下游會知道上游有多少數(shù)據(jù),所以他Subscriber會是這樣

Flowable
.just(1, 2, 3, 4)
.subscribe(new Subscriber < Integer > () {
@Override public void onSubscribe(Subscription s) {
  s.request(Long.MAX_VALUE);
}
@Override public void onNext(Integer integer) {}
@Override public void onError(Throwable t) {}
@Override public void onComplete() {}
});

onSubscribe 這個回調(diào)傳出了一個Subscription, 我們要指定他傳出數(shù)據(jù)的大小, 調(diào)用他的request() 方法。如沒有要求可以傳入一個Long的最大數(shù)值Long.MAX_VALUE。
要說明一下,request這個方法若不調(diào)用,下游的onNext與OnComplete都不會調(diào)用;若你寫的數(shù)量小于,只會傳你的個數(shù),但是不會調(diào)用onComplete方法,可以看下FlowableFromArrayslowPath方法

@Override void slowPath(long r) {
    long e = 0;
    T[] arr = array;
    int f = arr.length;
    int i = index;
    Subscriber < ?super T > a = actual;
    for (;;) {
        while (e != r && i != f) {
            if (cancelled) {
                return;
            }
            T t = arr[i];
            if (t == null) {
                a.onError(new NullPointerException("array element is null"));
                return;
            } else {
                a.onNext(t);
            }
            e++;
            i++;
        }
        if (i == f) {
            if (!cancelled) {
                a.onComplete();
            }
            return;
        }
        r = get();
        if (e == r) {
            index = i;
            r = addAndGet( - e);
            if (r == 0L) {
                return;
            }
            e = 0L;
        }
    }
}
}

需要if (i == f) f 是這個數(shù)據(jù)的大小,i是當(dāng)前發(fā)送數(shù)據(jù)的個數(shù),所以不會調(diào)用onComplete

休息一下

這是幾種被觀察者實現(xiàn)的接口

  • Observable 接口 ObservableSource
  • Flowable 接口 Publisher
  • Single 接口 SingleSource
  • Completable 接口 CompletableSource
  • Maybe 接口 MaybeSource

梳理完了兩個被觀察者,發(fā)現(xiàn)Flowable支持背壓,父類是Publisher;Observable不支持背壓,父類是ObservableSource,他們的實現(xiàn)方式,與其的操作符,到最后的觀察者,都有些不同,他們是完全獨立開的。各自之間也互不影響。

Single

單值相應(yīng)的模式: 就是只有一個值唄?

Completable

表示沒有任何值但僅指示完成或異常的延遲計算。

Maybe

maybe 可以當(dāng)成上面兩個的合體吧!

后面的三種,就不細(xì)掰了,我就是這么不求甚解。

二、操作符

操作符基本就沒有改變,但還是會發(fā)現(xiàn),我擦,from沒了,可以使用fromIterable
之前的actionx 也替換了Action \ Consumer \ BiConsumer
Func也跟action一樣, 名字改變了Function

感覺這樣是正經(jīng)Rx了。

三、線程切換

當(dāng)然現(xiàn)場切換沒有發(fā)生改變,用法還是一樣,但是之前沒有看過源碼,不知道怎樣神奇的把線程切換了,難道是來自東方的神秘力量。趁著還有時間,擼一下代碼。
在調(diào)用subscribeOn(Schedulers.io())之后,會創(chuàng)建ObservableSubscribeOn

parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
        source.subscribe(parent);
    }
}
));

在這個過程中,會把source也就是ObservableSource在線程中訂閱,同時也把把傳入的Observer變成SubscribeOnObserver。若指定的是io線程,可以在IoScheduler中看見對線程的管理
在調(diào)用observeOn(AndroidSchedulers.mainThread())時,會產(chǎn)生一個ObservableObserveOn,同時還會把Observer變成ObserveOnObserver,可以發(fā)現(xiàn)在HandlerScheduler,在ui線程調(diào)用了ObserveOnObserver的run方法

四、Rxjava的數(shù)據(jù)傳遞

Rxjava是我在工作這幾個月最喜歡的框架,沒有之一。完全解決了我這個有潔癖的人在打代碼時的玻璃心。
雖然重復(fù)造輪輪子是沒有必要的(我也造不出來),但是為了全面的了解Rxjava,我也準(zhǔn)備簡單的實現(xiàn)一下,數(shù)據(jù)在每個操作符之中傳輸?shù)恼麄€過程。

在實現(xiàn)之前先猜想一下大概的過程吧:
我的需求是在一個static方法中產(chǎn)生一個數(shù)值,并且通過一層層的接口傳遞下去,下面的操作符的人參是上一個的返回值,最后輸出,我就模仿著Rx的 Maybe 的名字實現(xiàn)吧。

  • 首先我要一直‘點’下去的話Maybe 一定要返回自己
  • 接口要一層層的傳進去,這樣的話就可以在發(fā)射數(shù)據(jù)時,發(fā)原始數(shù)據(jù)傳入這個一堆的接口,然后每個接口計算自己的實現(xiàn)。
  • 最后返回結(jié)果

之后就是仿造源碼完成這段需求了,當(dāng)然這些方法也都簡單寫了,就是為了弄清楚思路:

1、創(chuàng)建一個MaybeSource,我們的Maybe 和 各個操作符都會實現(xiàn)它。

public interface MaybeSource {
     void subscribe(MaybeObserver observer);
}

2、創(chuàng)建一個MaybeObserver, 這就是最后綁定的時候的接口

public interface MaybeObserver {
    void onSuccess(int value);
}

3、創(chuàng)建Function, 這個在操作符中用于實現(xiàn)

public interface Function {
    int apply(int t);
}

4、當(dāng)然少不了Maybe, 這里就實現(xiàn)just和map兩個方法吧

public abstract class Maybe implements MaybeSource {
    public static Maybe just(int item) {
        return new MaybeJust(item);
    }

    public final Maybe map(Function mapper) {
        return new MaybeMap(this, mapper);
    }
}

5、just實際返回的對象是MaybeJust,他的父類是Maybe

public class MaybeJust extends Maybe {
    final int value;

    public MaybeJust(int value) {
        this.value = value;
    }

    @Override
    public void subscribe(MaybeObserver observer) {
        observer.onSuccess(value);
    }
}

6、map實際返回的對象是MaybeMap,他的父類是Maybe

public class MaybeMap extends Maybe {
    final Function mapper;
    final MaybeSource source;

    public MaybeMap(MaybeSource source, Function mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    public void subscribe(MaybeObserver observer) {
        source.subscribe(new MapMaybeObserver(observer, mapper));
    }

    static final class MapMaybeObserver implements MaybeObserver {
        final MaybeObserver actual;

        final Function mapper;

        MapMaybeObserver(MaybeObserver actual, Function mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

        @Override
        public void onSuccess(int value) {
            this.actual.onSuccess(this.mapper.apply(value));
        }
    }
}

7、在main中可以這么運行

Maybe
.just(1)
.map(new Function() {

    @Override
    public int apply(int t) {
        return t + 1;
    }
}).map(new Function() {

    @Override
    public int apply(int t) {
        return t * 4;
    }
}).subscribe(new MaybeObserver() {

    @Override
    public void onSuccess(int value) {
        System.out.println(value);
    }
});

8、運行結(jié)果,傳入1,先+1, 在 * 4,最后結(jié)果應(yīng)該是8

Paste_Image.png

得到了期望的結(jié)果


RxJava 2.0 + Retrofit 2 .0

之前做過一個項目,沒用什么架構(gòu),也沒什么封裝。但對我?guī)椭畲蟮氖?,之前是不能接受這樣的代碼的,感覺看上去腦袋都大了。但看習(xí)慣了, 也就習(xí)慣了。
但平時自己弄個小項目還是使用mvp,自己的潔癖可能更加強烈一點

在Retrofit 中選擇了Flowable作為返回值,支持背壓,在2.0之后應(yīng)該最為常用

@GET("/")  
Flowable<ResponseBody> getText();

在RxJava 2.0 中使用CompositeDisposable做解除綁定的操作, Consumer 回調(diào)中使用了三個Consumer,作為成功、失敗、完成的回調(diào)

    public <T> void addSubscription(Flowable flowable,
        final RxSubscriber<T> subscriber) {
        if (mCompositeDisposable == null) {
            mCompositeDisposable = new CompositeDisposable();
        }

        if (subscriber == null) {
            Log.e(TAG, "rx callback is null");

            return;
        }

        Disposable disposable = flowable.subscribeOn(Schedulers.io())
                                        .observeOn(AndroidSchedulers.mainThread())
                                        .subscribe(new Consumer<T>() {
                    @Override
                    public void accept(T o) throws Exception {
                        subscriber.onNext(o);
                    }
                },
                new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable)
                        throws Exception {
                        subscriber.onError(throwable);
                    }
                },
                new Action() {
                    @Override
                    public void run() throws Exception {
                        subscriber.onComplete();
                    }
                });

此外,之前的項目后臺接口也是奇葩,同一個人寫的接口,接口的返回格式更是多種多樣,還不改,沒辦法,客戶端只能將就著服務(wù)端,誰叫我們是新來的呢。遇到這種問題,就不直接轉(zhuǎn)成對象格式了,先轉(zhuǎn)成ResponseBody得到Body,再拿出string來。
okhttp中response的body對象就是這個ResponseBody,他的string() 方法就可以獲得整個body,然后再做json解析吧

Paste_Image.png
Paste_Image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容