Retrofit2+rxjava2源碼解析(二):rxjava2源碼解析

上一篇講了retrofit2的原理,這一篇咱們重點講講rxjava2的實現(xiàn)原理。不過呢,由于rxjava2博大精深,這里篇幅有限,而且精力有限,所以這里只講以下幾個點:
1.上游被觀察者Observable的創(chuàng)建
2.subscribeOn如何初始化調(diào)度線程者
3.observeOn如何初始化調(diào)度線程者
4.下游觀察者的創(chuàng)建,上游被觀察者的訂閱事件,subscribeOn如何調(diào)度上游被觀察者線程以及observeOn如何調(diào)度下游觀察者線程

        //第一步
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                //log1
                LogUtil.d("subscribe : " + Thread.currentThread().getName());
                e.onNext("a");
                e.onComplete();
            }
        }).subscribeOn(Schedulers.io())//第二步
        .observeOn(AndroidSchedulers.mainThread())//第三步
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                 //log2
                LogUtil.d("onSubscribe : " + Thread.currentThread().getName());
            }

            @Override
            public void onNext(String str) {
                 //log3
                LogUtil.d("onSubscribe : " + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
                 //log4
                LogUtil.d("onSubscribe : " + Thread.currentThread().getName());
            }

            @Override
            public void onComplete() {
                 //log5
                LogUtil.d("onSubscribe : " + Thread.currentThread().getName());
            }
        });//第四步

一、上游被觀察者Observable的創(chuàng)建:

首先進入Observable的create方法里面去:

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
public final class RxJavaPlugins {
    ...代碼省略...
    /**
     * Calls the associated hook function.
     * @param <T> the value type
     * @param source the hook's input value
     * @return the value returned by the hook
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

    ...代碼省略...
}

RxJavaPlugins.onAssembly 是個hook方法,在這里不做詳細介紹,我們可以

RxJavaPlugins.onAssembly(new ObservableCreate<T>(source))

直接看成

new ObservableCreate<T>(source)

就可以了。
下面很多地方會有類似的調(diào)用,都可以以這種形式看待,就不在說明了。
這里就返回了ObservableCreate的對象,然后把source傳了進來,這里的source便是一開始new出來的ObservableOnSubscribe對象。

二、subscribeOn如何初始化調(diào)度線程者

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

1.我們可以看出此時返回的是一個ObservableSubscribeOn對象,參數(shù)一個是this說明是第一步new出來的ObservableCreate對象,這個就是當(dāng)前對象里面的source變量,第二個參數(shù)是scheduler即Schedulers.io()。
2.那么Schedulers.io()到底是什么呢,一起去看看吧:

    @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
public final class RxJavaPlugins {
    ...代碼省略...

     /**
     * Calls the associated hook function.
     * @param defaultScheduler the hook's input value
     * @return the value returned by the hook
     */
    @NonNull
    public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
        Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
        if (f == null) {
            return defaultScheduler;
        }
        return apply(f, defaultScheduler);
    }

    ...代碼省略...
}

其實這也是個hook方法,所以返回的就是IO。再來看下IO是個什么東西

    static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }

上圖所知,這是個靜態(tài)塊,在程序加載的時候就會初始化出來

    /**
     * Calls the associated hook function.
     * @param defaultScheduler a {@link Callable} which returns the hook's input value
     * @return the value returned by the hook, not null
     * @throws NullPointerException if the callable parameter or its result are null
     */
    @NonNull
    public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
        ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
        Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
        if (f == null) {
            return callRequireNonNull(defaultScheduler);
        }
        return applyRequireNonNull(f, defaultScheduler);
    }

然后根據(jù)hook function可以得知返回的Scheduler就是
callRequireNonNull(defaultScheduler)返回的對象。這個defaultScheduler是個IOTask對象,看下面代碼

    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }

    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

IOTask有個重寫方法call,返回的是IoHolder.DEFAULT即IoScheduler。那么這個call方法是什么時候調(diào)用的呢,其實就是上面的callRequireNonNull方法里面調(diào)用。

所以可以得出Schedulers.io()獲取到的就是IoSchedulers對象。

三、observeOn如何初始化調(diào)度線程者

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
  
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

1.可以看出來這里返回的是一個ObservableObserveOn對象,這里的this參數(shù)就是第二步的ObservableSubscribeOn對象,這個就是當(dāng)前對象里面的source變量。而這個scheduler參數(shù)就是傳入的AndroidSchedulers.mainThread(),看名字大概能猜出來這是個主線程調(diào)度者
2.那么就隨我進入AndroidSchedulers的代碼里一探究竟吧:

public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
  ...代碼省略...
}

mainThread方法里面返回的是MAIN_THREAD,而這個MAIN_THREAD是通過RxAndroidPlugins.initMainThreadScheduler返回的MainHolder.DEFAULT對象,也就是HandlerScheduler對象。這個對象傳入了主線程的handler,用來調(diào)度線程。

四、下游觀察者的創(chuàng)建,上游被觀察者的訂閱事件,subscribeOn如何調(diào)度上游被觀察者線程以及observeOn如何調(diào)度下游觀察者線程

最關(guān)鍵的一步來了,先上代碼:

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

1.ObjectHelper.requireNonNull已經(jīng)出現(xiàn)了很多次,其實就是為了判斷是否為空用的
2.這里的observer就是傳入的觀察者,即剛剛在最外層new出來的observer。
3.這個時候進入subscribeActual方法,發(fā)現(xiàn)這是個抽象方法,那么是誰實現(xiàn)了這個方法呢。還記得我們現(xiàn)在的subscribe是誰調(diào)用的嘛,沒錯就是第三步返回的ObservableObserveOn對象,所以O(shè)bservableObserveOn實現(xiàn)了subscribeActual方法:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

這里的source就是之前步驟3.1保存的ObservableSubscribeOn對象,而observer就是最外層傳入的觀察者對象,由此可以看出觀察者被內(nèi)部類ObserveOnObserver進行了裝飾。
w就是handler線程調(diào)度者創(chuàng)建的工作者主要用來將ObserveOnObserver(實現(xiàn)了runnable接口)通過handler發(fā)送到主線程上面去,從而實現(xiàn)線程調(diào)度
4.這個時候要調(diào)用source的subscribe了,跟ObservableObserveOn類似,之后進入到ObservableSubscribeOn的subscribeActual方法中:

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

這里的s就是步驟4.3new出來的ObserveOnObserver對象,這里將s裝飾到SubscribeOnObserver對象中,然后調(diào)用ObserveOnObserver的onSubscribe方法 ↓↓↓

        @Override
        public void onSubscribe(Disposable s) {
                ...代碼省略...
                actual.onSubscribe(this);
            }
        }

這里的actual其實就是最外層的觀察者,然后就會調(diào)用最外層觀察者的onSubscribe方法,此時由于并沒有做任何的線程調(diào)度,所以當(dāng)前的操作處于主線程中。因此log5出打出來的線程應(yīng)該是主線程

好了,我們再回到subscribeActual方法中,此時調(diào)用scheduler.scheduleDirect方法,將SubscribeTask對象傳入:

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

run就是傳入SubscribeTask的對象,將SubscribeTask的對象裝飾到DisposeTask中,
w是IO線程調(diào)度者創(chuàng)建的切換線程的實際工作者,此時調(diào)用createWorker就會執(zhí)行IoScheduler的createWork方法new 出EventLoopWorker對象:

    @NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

而w.schedule也就是執(zhí)行了EventLoopWorker的schedule方法,將DisposeTask傳入:

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }

接著調(diào)用threadWorker里面的scheduleActual方法:

   @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

因為之前傳入的delayTime是0,所以此時會走 f = executor.submit((Callable<Object>)sr);這條線。然后我們看下executor是什么時候初始化的:

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    ...代碼省略...
      /**
     * Creates a ScheduledExecutorService with the given factory.
     * @param factory the thread factory
     * @return the ScheduledExecutorService
     */
    public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        if (exec instanceof ScheduledThreadPoolExecutor) {
            ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
            POOLS.put(e, exec);
        }
        return exec;
    }

由此可以得出這個executor是ScheduledExecutorService對象,然后調(diào)用ScheduledExecutorService的submit方法:

   public Future<?> submit(Runnable task) {
        return schedule(task, 0, NANOSECONDS);
    }
...代碼省略...
    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<Void> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit),
                                          sequencer.getAndIncrement()));
        delayedExecute(t);
        return t;
    }

這里先是ScheduledFutureTask的初始化,將command傳入,然后調(diào)用其父類的構(gòu)造方法->調(diào)用Executors.callable方法返回一個RunnableAdapter對象(將command傳入構(gòu)造方法內(nèi),賦值給task),然后賦值給callable對象。
然后調(diào)用delayedExecute以后就會執(zhí)行ScheduledFutureTask的run方法,調(diào)用父類的run方法:

    public void run() {
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

接著就調(diào)用callable的call方法,這個callable就是上面提到的RunnableAdapter對象,調(diào)用RunnableAdapter的call方法->task.run,此處的task便是傳入的command。
那么往前追溯到NewThreadWorker的schedule中,發(fā)現(xiàn)傳入到ScheduledThreadPoolExecutor中的command是ScheduledRunnable對象,也就是說調(diào)用了ScheduledRunnable.run->DisposeTask.run->SubscribeTask.run->SubscribeOnObserver.run->source.subscribe(parent);
所以上面的一系列操作全部都在子線程中執(zhí)行

SubscribeOnObserver里面的source是之前保存的ObservableSubscribeOn的ObservableCreate對象。
parent是裝飾了下游ObservableObserveOn的內(nèi)部類對象(裝飾了外部觀察者)的ObservableSubscribeOn的內(nèi)部類,這句話比較拗口,多讀幾次理解一下。

5.這時候就執(zhí)行到ObservableCreate的subscribeActual方法了:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

observer是裝飾了下游ObservableSubscribeOn的內(nèi)部類SubscribeOnObserver對象,該對象裝飾了他下游ObservableObserveOn的內(nèi)部類ObserveOnObserver對象(該對象裝飾了外部觀察者)如下圖所示:


observer.png

source是一開始在創(chuàng)建ObservableCreate時就傳入的外部被觀察者對象
因此,最終source.subscribe(parent)會調(diào)用最外層被觀察者的subscribe方法,因此log1打印出來的線程應(yīng)該處于子線程中,

                //log1
                LogUtil.d("subscribe : " + Thread.currentThread().getName());
                e.onNext("a");
                e.onComplete();

e就是傳入的CreateEmitter對象,調(diào)用createEmitter的onNext方法:

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

會依次調(diào)用observer.png所展示的對象的onNext方法,重點看下ObservableObserveOn.ObserveOnObserver的onNext方法:

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

worker就是上面步驟4.3里面調(diào)用HandlerScheduler.createWorker方法new出來的HandlerWorker對象,調(diào)用HandlerWorker的schedule方法:

        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

將runnable封裝到消息對象里面發(fā)送給主線程,主線程執(zhí)行run方法,從而完成了線程調(diào)度,這時操作處理已經(jīng)進入到了主線程當(dāng)中執(zhí)行ScheduledRunnable的run方法:

        @Override
        public void run() {
            try {
                delegate.run();
            } catch (Throwable t) {
                IllegalStateException ie =
                    new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                RxJavaPlugins.onError(ie);
                Thread thread = Thread.currentThread();
                thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
            }
        }

這里的delegate就是ObservableObserveOn.ObserveOnObserver對象,所以重新進入ObservableObserveOn.ObserveOnObserver代碼的run方法中:

        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }

                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }

run-> drainNormal->actual.onNext(v);
這里的actual便是最外層的觀察者,所以觀察者里面的log3,log4,log5都會是在主線程中打印出來。

接下來我們打印出來看看:

08-08 14:09:03.138 4928-4928/com.zkyl.zkyl D/(MainActivity.java:57): onSubscribe  onSubscribe : main
08-08 14:09:03.142 4928-4974/com.zkyl.zkyl D/(MainActivity.java:47): subscribe  subscribe : RxCachedThreadScheduler-1
08-08 14:09:03.162 4928-4928/com.zkyl.zkyl D/(MainActivity.java:62): onNext  onSubscribe : main
08-08 14:09:03.162 4928-4928/com.zkyl.zkyl D/(MainActivity.java:72): onComplete  onSubscribe : main

確實正如代碼分析一般只有l(wèi)og1是處于子線程中,其他都處于主線程當(dāng)中。

感想

rxjava2的設(shè)計實在是過于龐大,精髓遠遠不止這些。研究到這里已經(jīng)非常累了,所以rxjava2的研究道路還是非常遙遠漫長的

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容