RxJava 線程切換源碼的一些體會和思考

前言

RxJava 是在今年年初的時候上的車,接觸也快要滿一年了。從最初只知道幾個操作符,寫寫 Demo ,或者跟著別人的項目和經(jīng)驗依葫蘆畫瓢,到目前終于有點初窺門徑的地步。

RxJava 對于 Android 來說,最直觀地便利就在于線程切換。所以本篇內(nèi)容就是學習 RxJava 是如何實現(xiàn)切換線程

希望讀者閱讀此篇文章,是有用過 RxJava 的童鞋。

本章內(nèi)容基于源碼版本

RxJava: 1.2.4

目錄

準備

答案我會放在文章末尾

先來一道開胃菜:

指出下列程序操作符所運行的線程。

Observable.just() //1
          .subscribeOn(Schedulers.newThread())
          .map() //2
          .subscribeOn(Schedulers.io())
          .map() //3
          .observeOn(Schedulers.computation())
          .map() //4
          .observeOn(Schedulers.newThread())
          .subscribe() //5

開胃菜就到上面結束,如果你能夠清楚明白每個操作運行的線程,說明對于 RxJava 的線程切換的理解很正確。

再具體分析 RxJava 是如何線程切換的,希望能清楚以下幾個 RxJava 中名詞的意思。

  • Create()
  • OnSubscribe
  • Operator

如果你特別明白這幾個 RxJava 類/方法的作用,可以直接跳過看切換這部分。

  1. Create()

    /**
     * Returns an Observable that will execute the specified function when a {@link Subscriber} subscribes to
     * it.
     */
    
    public static <T> Observable<T> create(OnSubscribe<T> f) {
       return new Observable<T>(RxJavaHooks.onCreate(f));
    }
    

    方法注釋上說明,當訂閱者訂閱之后,該函數(shù)會返回將會執(zhí)行具體功能的流。操作符進入源碼會發(fā)現(xiàn)他們最終都會調(diào)用到 create() 函數(shù)。

  2. OnSubscribe

    /**
     * Invoked when Observable.subscribe is called.
     * @param <T> the output value type
     */
    public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {}
    

    首先我們知道這是一個繼承 Action1 的接口,并且是在 Observable.subscribe 流進行訂閱操作后回調(diào)。而且回顧剛剛 create() 源碼中也發(fā)現(xiàn)參數(shù)就是這個 OnSubscribe 。 Action 的作用就是執(zhí)行其中的 call() 方法。

    Observable.OnSubscribe 有點像 Todo List ,里面都是一個一個待處理的事務,并且這個 List 是有序的(這個很關鍵)。

  3. Operator

    public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
      // cover for generics insanity
    }
    

    簡單來說它的職責就是將一個 Subscriber 變成另外一個 Subscriber。

切換

上面知識點是一些小鋪墊,因為后面的內(nèi)容核心其實就是上面幾個類的作用。

SubscribeOn

追蹤這個方法,核心是在這個類:

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

    final Scheduler scheduler;
    final Observable<T> source;

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }
}

我先貼出這個類的,構造方法和成員變量,因為很重要,我們先把前因弄清楚。

首先我們發(fā)現(xiàn)這個類是實現(xiàn)了 OnSubscribe 接口,之前復習到這個的作用就是在該流被訂閱之后執(zhí)行 call() 方法,這里面就是后果,待會我們來看。

前因其實很簡單,就是傳入兩個參數(shù):

  1. 一個是 Scheduler ,調(diào)度器,它的具體實現(xiàn)在 Schedulers 里。

  2. Observable<T> source 這個其實就是當前這個流。

    public final Observable<T> subscribeOn(Scheduler scheduler) {
      if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
      }
      return create(new OperatorSubscribeOn<T>(this, scheduler));
    }
    

接下來看看 call() 核心代碼里做的事情:


    // 因為是 OnSubscribe 類,這里 call() 中傳入的參數(shù)是 Observable.subscribe(s) 中的 s
    @Override
    public void call(final Subscriber<? super T> subscriber) {
        // 根據(jù)傳入的調(diào)度器,創(chuàng)建一個 Worker 對象 inner
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);

        // 在 Worker 對象 inner 中執(zhí)行(意思就是,在我們指定的調(diào)度器創(chuàng)建的線程中運行)
        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();

                // 對訂閱者包裝
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                    ······
                };

                // 這一句位置很關鍵
                // 首先 source 是之前傳入的流(也就是當前流),在 Worker 內(nèi)部進行了訂閱操作,所以該流所有操作都執(zhí)行在其中
                source.unsafeSubscribe(s);
            }
        });
    }

通過我們指定的調(diào)度器,創(chuàng)建好 Worker ,之前傳入的流在 Worker 內(nèi)部,對重新包裹的 subscriber 進行訂閱操作。

所以 SubscribeOn()最關鍵的地方其實是因為這行代碼在調(diào)度器創(chuàng)建的 Worker 的 call()

source.unsafeSubscribe(s);

總結:

subscribeOn 其實是改變了調(diào)用前序列所運行的線程。

ObserveOn

同樣的方法來分析,最終的回調(diào)會到:

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
  if (this instanceof ScalarSynchronousObservable) {
    return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
  }
  return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}

其實看到關鍵字 lift 和 operator 就大約可以猜到是做什么的了。

接下來我們進入到 OperatorObserveOn 類中:

public final class OperatorObserveOn<T> implements Operator<T, T> {

    private final Scheduler scheduler;
    // 省略不必要的代碼

    @Override
    public Subscriber<? super T> call(Subscriber<? super T> child) {
            // 省略 ···
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
            parent.init();
            return parent;
        }
    }
}

我們首先會注意到它是一個 Operator ,并且沒有對上層 Observale 做任何修改和包裝。那么它的作用就是將一個 Subscriber 變成另外一個 Subscriber。所以接下來我們的首要任務就是看轉(zhuǎn)換后的 Subscriber 做了什么改變。

關鍵代碼在

ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();

child 是改變前的 Subscriber ,最后返回了 parent 。

我們發(fā)現(xiàn) ObserveOnSubscriber 同樣也是一個 Subscriber 類,所以肯定含有 onNext/onError/onComplete 這三個標準方法,重要的肯定是 onNext ,所以我只貼上了該類三個有關函數(shù)。

void init() {
    Subscriber<? super T> localChild = child;

    localChild.setProducer(new Producer() {

        @Override
        public void request(long n) {
            if (n > 0L) {
                BackpressureUtils.getAndAddRequest(requested, n);
                // 執(zhí)行
                schedule();
            }
        }

    });
    // recursiveScheduler 這個是構造函數(shù)時傳入調(diào)度器創(chuàng)建的 worker
    localChild.add(recursiveScheduler);
    localChild.add(this);
}

@Override
public void onNext(final T t) {
  if (isUnsubscribed() || finished) {
    return;
  }
  // 條件判斷里先將之前流的結果緩存進隊列
  if (!queue.offer(on.next(t))) {
    onError(new MissingBackpressureException());
    return;
  }
  // 執(zhí)行
  schedule();
}


protected void schedule() {
    if (counter.getAndIncrement() == 0) {
        // 在當前 worker 上執(zhí)行該類的 call 方法
        recursiveScheduler.schedule(this);
    }
}

call() 方法有點冗長,做的事情其實很簡單,就是取出我們緩存之前流的所有值,然后在 Worker 工作線程中傳下去。

總結:

  1. ObserveOn 不會關心之前的流的線程
  2. ObserveOn 會先將之前的流的值緩存起來,然后再在指定的線程上,將緩存推送給后面的 Subscriber

共用時各自的作用域

 Observable.just() //1
            .subscribeOn(Schedulers.newThread())
            .map() //2
            .map() //3
            .observeOn(Schedulers.computation())
            .map() //4
            .observeOn(Schedulers.newThread())
            .subscribe() //5

如果分析這個流各個操作符的執(zhí)行線程,我們先把第一個 subscribeOn() 之前和第一個 observeOn() 之前的 Todo Items 找出來然后求并集:

得到的結果就是 subscribeOn() 的作用域。

之后的線程切換簡單了,遇到 observeOn() 就切換一次。

思考

為什么subscribeOn 只有第一次調(diào)用生效?

我的理解如下:

subscribeOn 的作用域就是調(diào)用前序列中所有的 Todo List 任務清單(Observable.OnSubscribe),當我們執(zhí)行 subscribe() 時,這些任務清單就會執(zhí)行在 subscribeOn 指定的工作線程,而第二個 subscribeOn 早就沒有任務可做了,所以無法生效。


知乎里這段說的比我專業(yè):

正像 StackOverflow 上那段描述的,整個 Observable 數(shù)據(jù)流工作起來是分為兩個階段(或者說是兩個 lifecycle):upstream 的 subscription-time 和 downstream 的 runtime。

subscription-time 的階段,是為了發(fā)起和驅(qū)動數(shù)據(jù)流的啟動,在內(nèi)部實現(xiàn)上體現(xiàn)為 OnSubscribe 向上游的逐級調(diào)用(控制流向上游傳遞)。支持 backpressure 的 producer request 也屬于這個階段。除了 producer request 的情況之外,subscription-time 階段一般就是從下游到上游調(diào)用一次就結束了,最終到達生產(chǎn)者(以最上游的那個 OnSubscribe 來體現(xiàn))。接下來數(shù)據(jù)流就開始向下游流動了。

Rxjava 中, subscribeOn 及 observeOn 方法切換線程發(fā)生的位置為什么設計為不同的? - 知乎

doOnSubscribe 的例外

我們再改動下開胃菜的代碼:

Observable.just() //1
          .subscribeOn(Schedulers.newThread())
          .map() //2
          .subscribeOn(Schedulers.io())
          .map() //3
          .observeOn(Schedulers.computation())
          .map() //4
          .doOnSubscribe() //6
          .observeOn(Schedulers.newThread())
          .subscribe() //5

只添加了一行.doOnSubscribe() //6 ,也是探討這個操作符執(zhí)行的線程。

public class OperatorDoOnSubscribe<T> implements Operator<T, T> {
    private final Action0 subscribe;

    public OperatorDoOnSubscribe(Action0 subscribe) {
        this.subscribe = subscribe;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> child) {
        // 執(zhí)行我們的 Action
        subscribe.call();
        // Wrap 里面是包裝成一個新的 Subscriber 返回,不對這個流做任何改變
        return Subscribers.wrap(child);
    }
}

doOnSubscribe 執(zhí)行的線程其實就是 subscribe.call(); 所在的線程。這里觸發(fā)的時機就是,當我們進行 Observable.subscribe() 時,如果我們沒有在緊接之后SubscribeOn 指定線程,那么它就會運行在默認線程,然后返回一個新的流。


關于 doOnSubscribe() 留一個問題

Observable.just()
          .doOnSubscribe() // 1
          .doOnSubscribe() // 2
          .subscribe()

問題是,對于 1 和 2 的執(zhí)行順序?

在開發(fā)中,我們肯定不會像問題那樣寫代碼,只是自己在看 doOnSubscribe 源碼的時候,在問自己為什么它在其他操作符之前,拓展到了 RxJava 流的一個執(zhí)行順序,也是自己想要明白的地方。所以下次準備探討學習。

對了,老司機說 RxJava 很像洋蔥,一層一層。

進行分析學習的時候可以類比幫助理解。

參考

Thomas Nield: RxJava- Understanding observeOn() and subscribeOn()

SubscribeOn 和 ObserveOn |Piasy Blog

答案:

1 newThread

2 newThread

3 newThread

4 computation

5 newThread

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容