2-RxJava源碼分析之 --- 訂閱過程和線程切換

RxJava訂閱過程和線程切換原理

1 - Observable.just("hello world").subscribe(observer)

這是RxJava中的生產(chǎn)/消費(fèi)模式中最簡(jiǎn)單的一種,就是生產(chǎn)發(fā)送“hello world"在用observer去監(jiān)聽消費(fèi)數(shù)據(jù),那么具體內(nèi)部RxJava是如何實(shí)現(xiàn)的呢?

public abstract class Observable<T> implements ObservableSource<T> {
    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        // 1.創(chuàng)建實(shí)際的Observable類ObservableJust,并把數(shù)據(jù)傳進(jìn)去
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }
    
    // subscribe操作
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            // 2.Observable的subscribeActual是abstract,實(shí)現(xiàn)在ObservableJust中
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }   
}

// Observable.just("Hello world"),創(chuàng)建的Observable實(shí)現(xiàn)類
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    private final T value;
    public ObservableJust(final T value) {
        this.value = value;
    }

    // subscribe時(shí)具體實(shí)現(xiàn)邏輯
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
        // 3.先調(diào)用observer.onSubscribe()方法
        observer.onSubscribe(sd);
        // 4.執(zhí)行后續(xù)的調(diào)用
        sd.run();
    }

    @Override
    public T call() {
        return value;
    }
}

public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable {
    @Override
    public void run() {
        // 5.如果時(shí)START,設(shè)置為ON_NEXT
        if (get() == START && compareAndSet(START, ON_NEXT)) {
            // 6.調(diào)用observer.onNext()方法
            observer.onNext(value);
            if (get() == ON_NEXT) {
                // 7.設(shè)置為ON_COMPLETE
                lazySet(ON_COMPLETE);
                // 8.調(diào)用observer.onComplete()方法
                observer.onComplete();
            }
        }
    }
}

上面代碼的注釋可以清晰的了解到整個(gè)Observable.just("Hello world").subscribe(observer)的調(diào)用過程,首先是在Observable.just("Hello world")中創(chuàng)建實(shí)際的Observable對(duì)象ObservableJust實(shí)例,然后在subscribe(observer)時(shí),調(diào)用ObservableJust的subscribeActual方法,在subscribeActual中先調(diào)用obsever.onSubscribe(),再調(diào)用ScalarDisposable的run()方法,run()方法中處理了onNext()/onComplete()邏輯。

2 - Observable.just("hello world").subscribeOn(Schedulers.IO).subscribe(observer),subscribeOn(scheduler)的線程切換分析,數(shù)據(jù)生產(chǎn)方的線程切換

public abstract class Observable<T> implements ObservableSource<T> {
 public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        // 1.創(chuàng)建ObservableSubscribeOn,即把原始Observable轉(zhuǎn)為ObservableSubscribeOn對(duì)象
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
}

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        // 2.把原始Observable保存為source
        super(source);
        this.scheduler = scheduler;
    }
    
     @Override
    public void subscribeActual(final Observer<? super T> s) {
        // 3.把原始o(jì)bserver轉(zhuǎn)成另一個(gè)代理類,代理類中對(duì)onSubscribe和dispose做了處理
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        // 4.調(diào)用Observer.onSubscribe()方法
        s.onSubscribe(parent);
        // 5.此處是關(guān)鍵,創(chuàng)建SubscribeTask,它是個(gè)Runnable,給scheduler去調(diào)用,即進(jìn)行線程切換,
        // 在SubscribeTask中包含了orignalObservable.subscribe(orignalObserver)邏輯,
        // 這樣就使訂閱邏輯執(zhí)行在scheduler線程中。此處的scheduler.scheduleDirect()邏輯后面在分析,
        // 只要理解為把一個(gè)Runnable放在執(zhí)行線程執(zhí)行即可。
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
    final class SubscribeTask implements Runnable {
        @Override
        public void run() {
            // 5.source即為原始Observable,parent為原始Observer的代理類(SubscribeOnObserver),
            //即這里實(shí)際就是執(zhí)行原始未切換線程邏輯下的Observable.subscribe(observer),
            // 注意因?yàn)樯厦嬉呀?jīng)調(diào)用了originObserver.onSubscribe(parent);
            // 所以在代理類onSubscribe方法中沒有調(diào)用originObserver.onSubscribe()
            source.subscribe(parent);
        }
    }
}

從上面代碼注釋可以了解到,RxJava subscribe(發(fā)送端) 切換線程subscribe(Schedulers.IO)邏輯會(huì)把原始Observable轉(zhuǎn)為另一個(gè)訂閱時(shí)線程切換的Observable(ObservableSubscribeOn),在ObservableSubscribeOn.subscribeActual()中把原始o(jì)bserver轉(zhuǎn)為一個(gè)代理對(duì)象parent,調(diào)用originObserver.onSubscribe()方法,并把未切換線程的subscribe邏輯包裝為Runnable,再把Runnable給Scheduler去調(diào)用執(zhí)行,從而達(dá)到切換線程執(zhí)行subscribe邏輯。

3 - Observable.just("hello world").observeOn(Schedulers.IO).subscribe(observer), observeOn(scheduler)的線程切換分析,數(shù)據(jù)消費(fèi)方線程切換

public abstract class Observable<T> implements ObservableSource<T> {
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        //  1.創(chuàng)建ObservableObserveOn,即把原始Observable轉(zhuǎn)為ObservableObserveOn對(duì)象
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
}


public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            // 2.創(chuàng)建ObserveOnObserver,即把原始o(jì)bserver轉(zhuǎn)為ObserveOnObserver對(duì)象,
            // ObserveOnObserver中會(huì)把observer相關(guān)的回調(diào)通過worker切換到指定線程去調(diào)用
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            // 3.切換線程調(diào)用
            schedule();
        }
        
         void schedule() {
            if (getAndIncrement() == 0) {
                // 4.切換到worker線程調(diào)用,里面具體調(diào)用哪個(gè)方法執(zhí)行邏輯較多,總體來說到這里就達(dá)到了observer回調(diào)切換線程的目的
                worker.schedule(this);
            }
        }
    }
}

從上面代碼注釋可以了解到,RxJava observer(接收端) 切換線程observerOn(Schedulers.IO)邏輯會(huì)把原始Observable轉(zhuǎn)為另一個(gè)訂閱時(shí)線程切換的Observable(ObservableObserveOn),在ObservableObserveOn.subscribeActual()中把原始o(jì)bserver轉(zhuǎn)為一個(gè)代理對(duì)象ObserveOnObserver,在ObserveOnObserver中對(duì)observer相關(guān)的回調(diào)做線程切換處理,從而達(dá)到observer回調(diào)切換線程的目的。

4 - 生產(chǎn)端和消費(fèi)端都切換線程的分析, Observable.just("hello world").subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(observer)

public abstract class Observable<T> implements ObservableSource<T> {
    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        // 1.創(chuàng)建實(shí)際的Observable類ObservableJust,并把數(shù)據(jù)傳進(jìn)去
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }
    
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        // 2.創(chuàng)建ObservableSubscribeOn,即把ObservableJust轉(zhuǎn)為ObservableSubscribeOn對(duì)象
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    
     public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        // 3.創(chuàng)建ObservableObserveOn,即把ObservableSubscribeOn轉(zhuǎn)為ObservableObserveOn對(duì)象
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
}

public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    @Override
    protected void subscribeActual(Observer<? super T> s) {
        // 4.這是ObservableJust執(zhí)行subscribe的邏輯。記:TAG-4
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
    }
}

public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable {
    public void run() {
        // 5.ObservableJust發(fā)送端邏輯,這里會(huì)調(diào)線程切換轉(zhuǎn)化后的Observer,記后面的TAG-10位置
        if (get() == START && compareAndSet(START, ON_NEXT)) {
            observer.onNext(value);
            if (get() == ON_NEXT) {
                lazySet(ON_COMPLETE);
                observer.onComplete();
            }
        }
    }
}

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        // 6.把原始Observable保存為source
        super(source);
        this.scheduler = scheduler;
    }
    
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        // 注意此處的s不是原始的observer,是在ObservableObserveOn中轉(zhuǎn)化過的observer,可以見下面分析
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        //7.把observable.subscribe(observer)包裝進(jìn)Runnable中,并給scheduler去執(zhí)行,留意此處在哪調(diào)用的,后面會(huì)分析。記:TAG-7,這里會(huì)調(diào)用到第六步TAG-8
        //注意注意注意?。。。哼@里達(dá)到發(fā)送端切換線程目的
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
    final class SubscribeTask implements Runnable {
        @Override
        public void run() {
            // 8.在io線程中執(zhí)行observable.subscribe(observer),注意此處的source是ObservableJust,它會(huì)調(diào)用TAG-4。記:TAG-8
            source.subscribe(parent);
        }
    }
}


public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            // 9.創(chuàng)建ObserveOnObserver,即把原始o(jì)bserver轉(zhuǎn)為ObserveOnObserver對(duì)象,并調(diào)用到上面第7步的流程,即TAG-7的地方
            // 注意注意注意!?。。篛bserveOnObserver會(huì)把原始o(jì)bserver的回調(diào)放在指定線程去回調(diào),達(dá)到接收端切換線程目的
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            // 10.observer切換線程的地方。記TAG-10
            schedule();
        }
    }
}

從上面代碼注釋可以了解到,其實(shí)RxJava對(duì)于發(fā)送端和接收端切換線程的邏輯就是把對(duì)應(yīng)的Observable轉(zhuǎn)為另一個(gè)Observable,且對(duì)于observeOn(scheduler)會(huì)把observer轉(zhuǎn)為切換線程調(diào)用的oberver,當(dāng)subscribe時(shí),就會(huì)從轉(zhuǎn)化后的Observable一級(jí)一級(jí)調(diào)用到原始的Observable方法,當(dāng)然中間做了subscribe的切換線程操作,在原始Observable上再調(diào)用轉(zhuǎn)化后的可線程切換的Observer的回調(diào),在切換線程Observer中對(duì)原始Observer回調(diào)進(jìn)行線程切換后調(diào)用。

RxJava源碼分析系列文章主題目錄:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容