Rx異步


Rx基本流程

RxJava是一個(gè)異步框架,基于觀察者設(shè)計(jì)模式,被觀察者Observable,通過(guò)subscribe方法訂閱Observer。觀察者Observer,可以接收Observable的通知。
概念,被觀察者,觀察者,數(shù)據(jù)源,發(fā)射器。

常用方法,flatMap,map,subscribe,subscribeOn,observeOn。
回調(diào)方法,onNext,onCompleted,onError。

Observable類(lèi),抽象類(lèi),實(shí)現(xiàn)ObservableSource接口,subscribeActual方法,抽象方法。

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext("onNext-one");//數(shù)據(jù)發(fā)射
        e.onComplete();
    }
});

創(chuàng)建一個(gè)ObservableOnSubscribe對(duì)象,它是一個(gè)接口,在被觀察者內(nèi)部,用于通知觀察者的數(shù)據(jù)源source,即發(fā)射數(shù)據(jù)。

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

被觀察者是ObservableCreate類(lèi),Observable的子類(lèi)。

被觀察者和數(shù)據(jù)源

Observable類(lèi)實(shí)現(xiàn)ObservableSource接口的訂閱方法,subscribe方法,注冊(cè)一個(gè)Observer。

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
    }
}

Observable的子類(lèi)ObservableCreate實(shí)現(xiàn)subscribeActual抽象方法。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    //數(shù)據(jù)發(fā)射器
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //觀察者第一個(gè)方法
    observer.onSubscribe(parent);
    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

創(chuàng)建一個(gè)CreateEmitter數(shù)據(jù)發(fā)射器,封裝觀察者,調(diào)用觀察者的onSubscribe方法,這是觀察者調(diào)用的第一個(gè)方法,通知觀察者已經(jīng)注冊(cè),將發(fā)射器告訴觀察者,可以在觀察者中阻斷發(fā)射。
調(diào)用數(shù)據(jù)源source(ObservableOnSubscribe)的subscribe方法,將發(fā)射器暴漏給外部,外部可以通過(guò)發(fā)射器發(fā)射數(shù)據(jù),如onNext方法。

數(shù)據(jù)源和發(fā)射器

發(fā)射器實(shí)現(xiàn)Disposable接口,觀察者可以拿到Disposable,當(dāng)觀察者onNext接收時(shí),可以通過(guò)Disposable的#dispose方法,結(jié)束通知,CreateEmitter下次發(fā)射時(shí),查看isDisposed狀態(tài)。
在外部,調(diào)用發(fā)射器onNext方法,onComplete方法。

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

代表數(shù)據(jù)發(fā)射,將通知到觀察者onNext方法,調(diào)用一次發(fā)射器onNext方法,會(huì)通知一次觀察者onNext方法。
調(diào)用發(fā)射器onComplete方法,會(huì)通知觀察者onComplete方法。

@Override
public void onComplete() {
    if (!isDisposed()) {
        try {
            observer.onComplete();
        } finally {
            dispose();
        }
    }
}

dispose時(shí),觀察者onNext和onComplete方法都將不會(huì)再執(zhí)行。
Observer是一個(gè)接口,外部實(shí)現(xiàn)四個(gè)方法。

void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();

Rx核心原理是基于觀察者-被觀察者訂閱設(shè)計(jì)模式。


任重而道遠(yuǎn)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容