Rxjava 2.x正式版出來已經(jīng)快兩個月了。在之前的項目中也在使用Rx。但卻一直沒有時間對整個的知識進行梳理,恰好今天抽出時間,也系統(tǒng)的再學(xué)習(xí)一遍RxJava/RxAndroid
RxJava的使用
一、觀察者/被觀察者
1、前奏:
在觀察者之前就要先提下backpressure這個概念。簡單來說,backpressure是在異步場景中,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度時,告訴被觀察者降低發(fā)送速度的策略。
2、在2.0中有以下幾種觀察者
- Observable/Observer
- Flowable/Subscriber
- Single/SingleObserver
- Completable/CompletableObserver
- Maybe/MaybeObserver
依次的來看一下:
Observable
Observable
.just(1, 2, 3)
.subscribe(new Observer < Integer > () {
@Override public void onSubscribe(Disposable d) {}
@Override public void onNext(Integer value) {}
@Override public void onError(Throwable e) {}
@Override public void onComplete() {}
});
這里要提的就是onSubscribe(Disposable d),disposable用于取消訂閱。
就用簡單的just這個操作符來分析一下。
@SuppressWarnings("unchecked")
@SchedulerSupport(SchedulerSupport.NONE)
public static < T > Observable < T > just(T item1, T item2, T item3, T item4) {
ObjectHelper.requireNonNull(item1, "The first item is null");
ObjectHelper.requireNonNull(item2, "The second item is null");
ObjectHelper.requireNonNull(item3, "The third item is null");
ObjectHelper.requireNonNull(item4, "The fourth item is null");
return fromArray(item1, item2, item3, item4);
}
@SchedulerSupport(SchedulerSupport.NONE)
public static < T > Observable < T > fromArray(T...items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray < T > (items));
}
@Override
public void subscribeActual(Observer < ?super T > s) {
FromArrayDisposable < T > d = new FromArrayDisposable < T > (s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
@Override
public void dispose() {
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) {
actual.onComplete();
}
}
just實際調(diào)用了fromArray方法,中創(chuàng)建了ObservableFromArray的實例,在這個實例中實現(xiàn)了Observable這個接口,在調(diào)用subscribe方法進行綁定之后,首先調(diào)用了subscribeActual方法,onSubscribe就會回調(diào)。
在取消綁定是我們可以將Disposable添加到CompositeDisposable中或者直接調(diào)用Disposable的dispose() 方法在流的任意位置取消。
此外, 為了簡化代碼,我使用了Consumer作為觀察者(可以當(dāng)成1.0時候的Action1 、ActionX)subscribe的返回值就是一個Disposable (subscribe 的返回值根據(jù)傳入的參數(shù)不同,也有不同)我把這個對象添加到CompositeDisposable,并在中途取消,但發(fā)射器仍然會把所有的數(shù)據(jù)全都發(fā)射完。因為LambdaSubscriber(也就是傳入Consumer 所構(gòu)造的觀察者)的dispose 和 isDispose 略有不同,并不是簡簡單單的true/false, 說實話,我沒看懂Consumer的這兩個方法干了什么...........尷尬
LambdaSubscriber 瞅瞅
@Override
public void dispose() {
cancel();
}
@Override
public boolean isDisposed() {
return get() == SubscriptionHelper.CANCELLED;
}
Flowable
是2.0之后用的最多的觀察者了,他與上一個的區(qū)別在于支持背壓,也就是說,下游會知道上游有多少數(shù)據(jù),所以他Subscriber會是這樣
Flowable
.just(1, 2, 3, 4)
.subscribe(new Subscriber < Integer > () {
@Override public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override public void onNext(Integer integer) {}
@Override public void onError(Throwable t) {}
@Override public void onComplete() {}
});
onSubscribe 這個回調(diào)傳出了一個Subscription, 我們要指定他傳出數(shù)據(jù)的大小, 調(diào)用他的request() 方法。如沒有要求可以傳入一個Long的最大數(shù)值Long.MAX_VALUE。
要說明一下,request這個方法若不調(diào)用,下游的onNext與OnComplete都不會調(diào)用;若你寫的數(shù)量小于,只會傳你的個數(shù),但是不會調(diào)用onComplete方法,可以看下FlowableFromArray的slowPath方法
@Override void slowPath(long r) {
long e = 0;
T[] arr = array;
int f = arr.length;
int i = index;
Subscriber < ?super T > a = actual;
for (;;) {
while (e != r && i != f) {
if (cancelled) {
return;
}
T t = arr[i];
if (t == null) {
a.onError(new NullPointerException("array element is null"));
return;
} else {
a.onNext(t);
}
e++;
i++;
}
if (i == f) {
if (!cancelled) {
a.onComplete();
}
return;
}
r = get();
if (e == r) {
index = i;
r = addAndGet( - e);
if (r == 0L) {
return;
}
e = 0L;
}
}
}
}
需要if (i == f) f 是這個數(shù)據(jù)的大小,i是當(dāng)前發(fā)送數(shù)據(jù)的個數(shù),所以不會調(diào)用onComplete
休息一下
這是幾種被觀察者實現(xiàn)的接口
- Observable 接口 ObservableSource
- Flowable 接口 Publisher
- Single 接口 SingleSource
- Completable 接口 CompletableSource
- Maybe 接口 MaybeSource
梳理完了兩個被觀察者,發(fā)現(xiàn)Flowable支持背壓,父類是Publisher;Observable不支持背壓,父類是ObservableSource,他們的實現(xiàn)方式,與其的操作符,到最后的觀察者,都有些不同,他們是完全獨立開的。各自之間也互不影響。
Single
單值相應(yīng)的模式: 就是只有一個值唄?
Completable
表示沒有任何值但僅指示完成或異常的延遲計算。
Maybe
maybe 可以當(dāng)成上面兩個的合體吧!
后面的三種,就不細(xì)掰了,我就是這么不求甚解。
二、操作符
操作符基本就沒有改變,但還是會發(fā)現(xiàn),我擦,from沒了,可以使用fromIterable
之前的actionx 也替換了Action \ Consumer \ BiConsumer
Func也跟action一樣, 名字改變了Function
感覺這樣是正經(jīng)Rx了。
三、線程切換
當(dāng)然現(xiàn)場切換沒有發(fā)生改變,用法還是一樣,但是之前沒有看過源碼,不知道怎樣神奇的把線程切換了,難道是來自東方的神秘力量。趁著還有時間,擼一下代碼。
在調(diào)用subscribeOn(Schedulers.io())之后,會創(chuàng)建ObservableSubscribeOn
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}
));
在這個過程中,會把source也就是ObservableSource在線程中訂閱,同時也把把傳入的Observer變成SubscribeOnObserver。若指定的是io線程,可以在IoScheduler中看見對線程的管理
在調(diào)用observeOn(AndroidSchedulers.mainThread())時,會產(chǎn)生一個ObservableObserveOn,同時還會把Observer變成ObserveOnObserver,可以發(fā)現(xiàn)在HandlerScheduler,在ui線程調(diào)用了ObserveOnObserver的run方法
四、Rxjava的數(shù)據(jù)傳遞
Rxjava是我在工作這幾個月最喜歡的框架,沒有之一。完全解決了我這個有潔癖的人在打代碼時的玻璃心。
雖然重復(fù)造輪輪子是沒有必要的(我也造不出來),但是為了全面的了解Rxjava,我也準(zhǔn)備簡單的實現(xiàn)一下,數(shù)據(jù)在每個操作符之中傳輸?shù)恼麄€過程。
在實現(xiàn)之前先猜想一下大概的過程吧:
我的需求是在一個static方法中產(chǎn)生一個數(shù)值,并且通過一層層的接口傳遞下去,下面的操作符的人參是上一個的返回值,最后輸出,我就模仿著Rx的 Maybe 的名字實現(xiàn)吧。
- 首先我要一直‘點’下去的話Maybe 一定要返回自己
- 接口要一層層的傳進去,這樣的話就可以在發(fā)射數(shù)據(jù)時,發(fā)原始數(shù)據(jù)傳入這個一堆的接口,然后每個接口計算自己的實現(xiàn)。
- 最后返回結(jié)果
之后就是仿造源碼完成這段需求了,當(dāng)然這些方法也都簡單寫了,就是為了弄清楚思路:
1、創(chuàng)建一個MaybeSource,我們的Maybe 和 各個操作符都會實現(xiàn)它。
public interface MaybeSource {
void subscribe(MaybeObserver observer);
}
2、創(chuàng)建一個MaybeObserver, 這就是最后綁定的時候的接口
public interface MaybeObserver {
void onSuccess(int value);
}
3、創(chuàng)建Function, 這個在操作符中用于實現(xiàn)
public interface Function {
int apply(int t);
}
4、當(dāng)然少不了Maybe, 這里就實現(xiàn)just和map兩個方法吧
public abstract class Maybe implements MaybeSource {
public static Maybe just(int item) {
return new MaybeJust(item);
}
public final Maybe map(Function mapper) {
return new MaybeMap(this, mapper);
}
}
5、just實際返回的對象是MaybeJust,他的父類是Maybe
public class MaybeJust extends Maybe {
final int value;
public MaybeJust(int value) {
this.value = value;
}
@Override
public void subscribe(MaybeObserver observer) {
observer.onSuccess(value);
}
}
6、map實際返回的對象是MaybeMap,他的父類是Maybe
public class MaybeMap extends Maybe {
final Function mapper;
final MaybeSource source;
public MaybeMap(MaybeSource source, Function mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
public void subscribe(MaybeObserver observer) {
source.subscribe(new MapMaybeObserver(observer, mapper));
}
static final class MapMaybeObserver implements MaybeObserver {
final MaybeObserver actual;
final Function mapper;
MapMaybeObserver(MaybeObserver actual, Function mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onSuccess(int value) {
this.actual.onSuccess(this.mapper.apply(value));
}
}
}
7、在main中可以這么運行
Maybe
.just(1)
.map(new Function() {
@Override
public int apply(int t) {
return t + 1;
}
}).map(new Function() {
@Override
public int apply(int t) {
return t * 4;
}
}).subscribe(new MaybeObserver() {
@Override
public void onSuccess(int value) {
System.out.println(value);
}
});
8、運行結(jié)果,傳入1,先+1, 在 * 4,最后結(jié)果應(yīng)該是8

得到了期望的結(jié)果
RxJava 2.0 + Retrofit 2 .0
之前做過一個項目,沒用什么架構(gòu),也沒什么封裝。但對我?guī)椭畲蟮氖?,之前是不能接受這樣的代碼的,感覺看上去腦袋都大了。但看習(xí)慣了, 也就習(xí)慣了。
但平時自己弄個小項目還是使用mvp,自己的潔癖可能更加強烈一點
在Retrofit 中選擇了Flowable作為返回值,支持背壓,在2.0之后應(yīng)該最為常用
@GET("/")
Flowable<ResponseBody> getText();
在RxJava 2.0 中使用CompositeDisposable做解除綁定的操作, Consumer 回調(diào)中使用了三個Consumer,作為成功、失敗、完成的回調(diào)
public <T> void addSubscription(Flowable flowable,
final RxSubscriber<T> subscriber) {
if (mCompositeDisposable == null) {
mCompositeDisposable = new CompositeDisposable();
}
if (subscriber == null) {
Log.e(TAG, "rx callback is null");
return;
}
Disposable disposable = flowable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<T>() {
@Override
public void accept(T o) throws Exception {
subscriber.onNext(o);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable)
throws Exception {
subscriber.onError(throwable);
}
},
new Action() {
@Override
public void run() throws Exception {
subscriber.onComplete();
}
});
此外,之前的項目后臺接口也是奇葩,同一個人寫的接口,接口的返回格式更是多種多樣,還不改,沒辦法,客戶端只能將就著服務(wù)端,誰叫我們是新來的呢。遇到這種問題,就不直接轉(zhuǎn)成對象格式了,先轉(zhuǎn)成ResponseBody得到Body,再拿出string來。
okhttp中response的body對象就是這個ResponseBody,他的string() 方法就可以獲得整個body,然后再做json解析吧

