RxJava源碼解析(二)

前言

本篇主要解析RxJava的線程切換的原理實(shí)現(xiàn)

subscribeOn

首先, 我們先看下subscribeOn()方法, 老樣子, 先上Demo

Observable<Integer> observable =
                Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(123);
                        emitter.onComplete();
                    }
                });

observable
        .subscribeOn(Schedulers.io())
        .subscribe(getObserver());

subscribeOn操作符源碼里其實(shí)是返回了一個(gè)ObservableSubscribeOn對(duì)象, 而從上篇我們已經(jīng)知道, 訂閱的動(dòng)作其實(shí)在每個(gè)ObservablesubscribeActual(observer)中執(zhí)行, 所以我們直接去看ObservableSubscribeOn中的對(duì)應(yīng)重載方法就行了.

@Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

SubscribeTask是一個(gè)Runnable的實(shí)現(xiàn)類(lèi), 執(zhí)行內(nèi)容就是修飾后的Observer訂閱上游的動(dòng)作, 我們先看scheduler.scheduleDirect(runable)方法

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}

這里createWorker是個(gè)抽象方法, 我們需要找到對(duì)應(yīng)的修飾類(lèi), 我們返回去看Schedulers.io(), IOIoScheduler的實(shí)例, 它的重載方法代碼如下

final AtomicReference<CachedWorkerPool> pool;
public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

可以看到IO線程實(shí)際使用的是一個(gè)有線程緩存的線程調(diào)度器.它內(nèi)部通過(guò)ScheduledExecutorService實(shí)例來(lái)嘗試重用之前worker開(kāi)始使用的實(shí)例, 由于本篇著重在流程實(shí)現(xiàn)原理, 所以略過(guò)細(xì)節(jié)處.
EventLoopWorker中, 我們看下對(duì)應(yīng)的重載方法

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }

繼續(xù)往下, 其實(shí)這個(gè)時(shí)候已經(jīng)是在線程池目標(biāo)線程執(zhí)行相關(guān)的工作了. 再深入就是線程池的操作了, 所以這里我們不再贅述

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

由此我們可以看出來(lái), 每次每個(gè)subscribeOn操作符執(zhí)行的時(shí)候, 其實(shí)在source.subscribe(parent);訂閱動(dòng)作就做了線程切換, 所以在多次調(diào)subscribeOn的時(shí)候, 就會(huì)一直切換線程, 直到離ObservableSource最近的subscribeOn線程切換生效.

observeOn

廢話不說(shuō), 我們直接看ObservableObserveOn.subscribeActual(observer)

protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

熟悉的配方, 當(dāng)相同想成的時(shí)候, 直接訂閱, 而當(dāng)不同線程的時(shí)候, 可以看到我們獲取目標(biāo)切換線程對(duì)應(yīng)的worker實(shí)例以及裝飾對(duì)應(yīng)的ObserverObserveOnOberver,后面的流程我們心知肚明, 就是Observer層層訂閱上去, 然后我們看當(dāng)碰到最上流的ObservableSource往下執(zhí)行的時(shí)候, 做什么操作.具體我們看ObserveOnOberver代碼, 我們這里著重看下onSubscribeonNext方法

@Override
public void onSubscribe(Disposable s) {
    if (DisposableHelper.validate(this.s, s)) {
        this.s = s;
        // 發(fā)送的數(shù)據(jù)是集合隊(duì)列形式的時(shí)候
        if (s instanceof QueueDisposable) {
            @SuppressWarnings("unchecked")
            QueueDisposable<T> qd = (QueueDisposable<T>) s;

            int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
            //是同步模式的時(shí)候
            if (m == QueueDisposable.SYNC) {
                sourceMode = m;
                queue = qd;
                done = true;
                actual.onSubscribe(this);
                // 線程調(diào)度
                schedule();
                return;
            }
            // 異步模式
            if (m == QueueDisposable.ASYNC) {
                sourceMode = m;
                queue = qd;
                actual.onSubscribe(this);
                return;
            }
        }

        queue = new SpscLinkedArrayQueue<T>(bufferSize);

        actual.onSubscribe(this);
    }
}

@Override
public void onNext(T t) {
    // 是否已經(jīng)調(diào)用到onComplete 或者 onError, 如果是, 則不再執(zhí)行后面的onNext
    if (done) {
        return;
    }
    // 如果是非異步操作, 將數(shù)據(jù)添加到隊(duì)列中
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    // 線程調(diào)度
    schedule();
}

大概的注釋都加在代碼上了, 我們?cè)傺a(bǔ)充看下看onSubscribe方法, 首先判斷發(fā)送的數(shù)據(jù)是否屬于QueueDisposable, 如果不是, 直接執(zhí)行下游的onSubscribe,這里我卡了一下, 看不到他的線程切換是在哪里做, 后來(lái)往回看, 發(fā)現(xiàn)在我們執(zhí)行ObservableSubscribeOn.subscribeActual(observer)的時(shí)候, onSubscribe()方法本身的確不是在切換后的線程內(nèi)執(zhí)行的. 但是, 當(dāng)我們發(fā)送的是集合數(shù)據(jù), 那么我們需要判斷是哪種線程模式進(jìn)行線程調(diào)度.

我們來(lái)看具體的schedule()方法代碼

void schedule() {
            // 判斷當(dāng)前自增值是否為0, 原子性保證worker.schedule(this);不會(huì)在調(diào)用結(jié)束前被重復(fù)調(diào)用
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

這個(gè)時(shí)候就是在指定線程內(nèi)run了, Disposable schedule(@NonNull Runnable run)傳入的是個(gè)Runnable的實(shí)現(xiàn)類(lèi), 我們來(lái)找重載的run方法

@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

void drainNormal() {
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = actual;
    // 無(wú)限循環(huán)
    for (;;) {
        // 判斷是否被取消, 或者調(diào)用onError 或者調(diào)用onComplete則退出循環(huán)
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }
        // 無(wú)限循環(huán)
        for (;;) {
            boolean d = done;
            T v;

            try {
                // 隊(duì)列數(shù)據(jù)分發(fā)
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                s.dispose();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;
            // 判斷是否應(yīng)該被終止
            if (checkTerminated(d, empty, a)) {
                return;
            }

            if (empty) {
                break;
            }
            a.onNext(v);
        }
        // 原子性保證worker.schedule(this)的調(diào)用
        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

// 判斷循環(huán)是否終止
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
    // 如果訂閱已經(jīng)被取消, 則清除隊(duì)列, 終止
    if (cancelled) {
        queue.clear();
        return true;
    }
    // 如果調(diào)用過(guò)onError 或者 onComplete
    if (d) {
        Throwable e = error;
        // 默認(rèn)false
        if (delayError) {
            // 等到隊(duì)列為空的時(shí)候再調(diào)用onError或者onComplete
            if (empty) {
                if (e != null) {
                    a.onError(e);
                } else {
                    a.onComplete();
                }
                worker.dispose();
                return true;
            }
        } else {
            // 如果有拋出異常, 走下游的onError
            // 線程任務(wù)停止
            if (e != null) {
                queue.clear();
                a.onError(e);
                worker.dispose();
                return true;
            }
            // 沒(méi)有, 走下游的onComplete
            // 線程任務(wù)停止
            else if (empty) {
                a.onComplete();
                worker.dispose();
                return true;
            }
        }
    }
    // 否則不結(jié)束
    return false;
}

由此我們可以得出結(jié)論, observeOn的操作符可以保證我們下流操作線程切換生效

總結(jié)

到這里, 我們線程切換的原理大體流程就基本分析完畢了, 可以看出subscribeOn操作符只對(duì)上游生效, 而且因?yàn)樗窃谟嗛喌臅r(shí)候進(jìn)行線程切換, 而我們每個(gè)操作符中間都有訂閱動(dòng)作, 所以越接近我們的ObservableSource的訂閱的subscribeOn越是最后生效的. 而observeOn生效在我們的onNext,onComplete, onError方法內(nèi), 所以每次的observeOn針對(duì)它的下游都可以生效.

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容