一、前言
這里就不詳細(xì)介紹怎么使用 RxJava 了,沒用過的自行去 github 瞅瞅 >>>>> 地址
本文源碼基于 rxjava:2.2.15
二、RxJava 的訂閱流程
咱們先來看個栗子:
//步驟一:創(chuàng)建被觀察者Observable,定義要發(fā)送的事件
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("0");
emitter.onNext("1");
emitter.onNext("2");
emitter.onComplete();
}
});
//步驟二:創(chuàng)建觀察者Observer,接收事件并作處理
Observer<String> observer = new Observer<String>() {
@Override public void onSubscribe(Disposable d) {
Log.d("RxJava", "onSubscribe");
}
@Override public void onNext(String s) {
Log.d("RxJava", "onNext: " + s);
}
@Override public void onError(Throwable e) {
Log.d("RxJava", "onError");
}
@Override public void onComplete() {
Log.d("RxJava", "onComplete");
}
};
//步驟三:觀察者訂閱被觀察者
observable.subscribe(observer);
輸出結(jié)果:
onSubscribe
onNext: 0
onNext: 1
onNext: 2
onComplete
這里存在這么幾個角色,被觀察者(Observable)、觀察者(Observer)、事件(Event)、訂閱(Subscribe)。被觀察者是負(fù)責(zé)生產(chǎn)事件的,觀察者是負(fù)責(zé)接收事件并作處理,事件是被觀察者和觀察者的消息載體,也就是栗子中的 "0"、"1"、"2",訂閱是連接被觀察者和觀察者。
1、創(chuàng)建被觀察者過程
首先咱們來瞅瞅 Observable 的 create() 方法里面到底都干了什么事情
1.1、Observable 類的 create()
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null"); //這里就是一個判空處理
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
可以看到其實這里就是先創(chuàng)建了一個 ObservableCreate 對象,同時把我們定義好的 ObservableOnSubscribe 對象作為參數(shù)傳入進(jìn)去,最后調(diào)用了 RxJavaPlugins.onAssembly() 方法。
咱們先看看這個 ObservableCreate 類
1.2、ObservableCreate 類
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
// 省略無關(guān)代碼...
}
可以看到 ObservableCreate 類是繼承自 Observable 抽象類的, 然后把咱們傳入的 ObservableOnSubscribe 對象存儲了起來。
再看下這個方法 RxJavaPlugins.onAssembly()
1.3、RxJavaPlugins.onAssembly()
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
// 省略無關(guān)代碼...
return source;
}
最終僅僅是把我們 new 出的 ObservableCreate 對象給返回來了。
1.4、小結(jié)
所以 Observable.create() 方法僅僅是把我們定義好的 ObservableOnSubscribe 對象重新包裝成了一個 ObservableCreate 對象。
2、創(chuàng)建觀察者過程
Observer<String> observer = new Observer<String>() {
@Override public void onSubscribe(Disposable d) {
Log.d("RxJava", "onSubscribe");
}
@Override public void onNext(String s) {
Log.d("RxJava", "onNext: " + s);
}
@Override public void onError(Throwable e) {
Log.d("RxJava", "onError");
}
@Override public void onComplete() {
Log.d("RxJava", "onComplete");
}
};
很簡單,這里就是做了一個實現(xiàn)了 Observer 接口的匿名內(nèi)部類實例化。
3、訂閱過程
接下來我們一起看看訂閱過程,點進(jìn)去 observable.subscribe(observer);
public final void subscribe(Observer<? super T> observer) {
// 省略無關(guān)代碼
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
// 省略無關(guān)代碼
}
先分析第一行代碼:
3.1、RxJavaPlugins.onSubscribe()
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
// 省略無關(guān)代碼
return observer;
}
跟之前代碼一樣,這里僅僅是把傳入的 Observer 對象給返回來了
再來分析第二行代碼:
3.2、Observable 類的 subscribeActual()
protected abstract void subscribeActual(Observer<? super T> observer);
很明顯,這是抽象類 Observable 類的一個抽象方法,那它的具體實現(xiàn)在哪呢?其實它的具體實現(xiàn)類就是我們在前面創(chuàng)建被觀察者時創(chuàng)建的 ObservableCreate 類,它就是 Observable 的子類,現(xiàn)在來看它的具體實現(xiàn)
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
這里第一步創(chuàng)建了一個 CreateEmitter 對象,第二步調(diào)用了 Observer 類的 onSubscribe() 方法,第三步調(diào)用了 ObservableOnSubscribe 類的 subscribe() 方法,其中這個 source 就是我們之前創(chuàng)建 ObservableCreate 對象傳入進(jìn)去的 ObservableOnSubscribe 對象。
同樣地,先看這個 CreateEmitter 類的創(chuàng)建過程:
3.3、CreateEmitter 類
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
// 省略無關(guān)代碼
}
CreateEmitter 類繼承了原子引用類 AtomicReference,實現(xiàn)了 ObservableEmitter 和 Disposable 接口,把我們傳入的 Observer 對象存儲了起來,又是一個重新包裝新對象的用法。
3.4、Observer 類的 onSubscribe()
observer.onSubscribe(parent);
這個 onSubscribe() 回調(diào)的含義其實就是告訴觀察者已經(jīng)成功訂閱了被觀察者
3.5、ObservableOnSubscribe 接口的 subscribe()
source.subscribe(parent);
這個 source 就是我們一開始傳入的 ObservableOnSubscribe 對象,即這里會調(diào)用 ObservableOnSubscribe 的 subscribe() 方法,它的方法如下:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("0");
emitter.onNext("1");
emitter.onNext("2");
emitter.onComplete();
}
});
subscribe() 里的 onNext() 是用于將事件流發(fā)送出去,最后調(diào)用 onComplete() 方法代表完成了訂閱過程。這里的 ObservableEmitter 接口其具體實現(xiàn)為 CreateEmitter 類,所以我們需要看看 CreateEmitter 類里的 onNext() 和 onComplete() 方法的實現(xiàn)
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
@Override
public void onNext(T t) {
// 省略無關(guān)代碼...
if (!isDisposed()) {
// 調(diào)用觀察者的 onNext()
observer.onNext(t);
}
}
// 省略無關(guān)代碼...
@Override
public void onComplete() {
if (!isDisposed()) {
try {
// 調(diào)用觀察者的 onComplete()
observer.onComplete();
} finally {
dispose();
}
}
}
// 省略無關(guān)代碼...
可以看到,最終就是會調(diào)用觀察者的 onNext() 和 onComplete() 方法。至此,一個完整的消息訂閱流程就完成了。
三、RxJava 的線程切換
先給出線程切換的栗子:
Observable.create(new ObservableOnSubscribe<String>() {
@Override public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("0");
emitter.onNext("1");
emitter.onNext("2");
emitter.onComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override public void onSubscribe(Disposable d) {
Log.d("RxJava", "onSubscribe");
}
@Override public void onNext(String s) {
Log.d("RxJava", "onNext: " + s);
}
@Override public void onError(Throwable e) {
Log.d("RxJava", "onError");
}
@Override public void onComplete() {
Log.d("RxJava", "onComplete");
}
});