Rx基本流程
RxJava是一個(gè)異步框架,基于觀察者設(shè)計(jì)模式,被觀察者Observable,通過(guò)subscribe方法訂閱Observer。觀察者Observer,可以接收Observable的通知。
概念,被觀察者,觀察者,數(shù)據(jù)源,發(fā)射器。
常用方法,flatMap,map,subscribe,subscribeOn,observeOn。
回調(diào)方法,onNext,onCompleted,onError。
Observable類(lèi),抽象類(lèi),實(shí)現(xiàn)ObservableSource接口,subscribeActual方法,抽象方法。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("onNext-one");//數(shù)據(jù)發(fā)射
e.onComplete();
}
});
創(chuàng)建一個(gè)ObservableOnSubscribe對(duì)象,它是一個(gè)接口,在被觀察者內(nèi)部,用于通知觀察者的數(shù)據(jù)源source,即發(fā)射數(shù)據(jù)。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
被觀察者是ObservableCreate類(lèi),Observable的子類(lèi)。

Observable類(lèi)實(shí)現(xiàn)ObservableSource接口的訂閱方法,subscribe方法,注冊(cè)一個(gè)Observer。
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
}
}
Observable的子類(lèi)ObservableCreate實(shí)現(xiàn)subscribeActual抽象方法。
@Override
protected void subscribeActual(Observer<? super T> observer) {
//數(shù)據(jù)發(fā)射器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//觀察者第一個(gè)方法
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
創(chuàng)建一個(gè)CreateEmitter數(shù)據(jù)發(fā)射器,封裝觀察者,調(diào)用觀察者的onSubscribe方法,這是觀察者調(diào)用的第一個(gè)方法,通知觀察者已經(jīng)注冊(cè),將發(fā)射器告訴觀察者,可以在觀察者中阻斷發(fā)射。
調(diào)用數(shù)據(jù)源source(ObservableOnSubscribe)的subscribe方法,將發(fā)射器暴漏給外部,外部可以通過(guò)發(fā)射器發(fā)射數(shù)據(jù),如onNext方法。

發(fā)射器實(shí)現(xiàn)Disposable接口,觀察者可以拿到Disposable,當(dāng)觀察者onNext接收時(shí),可以通過(guò)Disposable的#dispose方法,結(jié)束通知,CreateEmitter下次發(fā)射時(shí),查看isDisposed狀態(tài)。
在外部,調(diào)用發(fā)射器onNext方法,onComplete方法。
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
代表數(shù)據(jù)發(fā)射,將通知到觀察者onNext方法,調(diào)用一次發(fā)射器onNext方法,會(huì)通知一次觀察者onNext方法。
調(diào)用發(fā)射器onComplete方法,會(huì)通知觀察者onComplete方法。
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
dispose時(shí),觀察者onNext和onComplete方法都將不會(huì)再執(zhí)行。
Observer是一個(gè)接口,外部實(shí)現(xiàn)四個(gè)方法。
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
Rx核心原理是基于觀察者-被觀察者訂閱設(shè)計(jì)模式。
任重而道遠(yuǎn)