前言
RxJava 是在今年年初的時候上的車,接觸也快要滿一年了。從最初只知道幾個操作符,寫寫 Demo ,或者跟著別人的項目和經(jīng)驗依葫蘆畫瓢,到目前終于有點初窺門徑的地步。
RxJava 對于 Android 來說,最直觀地便利就在于線程切換。所以本篇內(nèi)容就是學習 RxJava 是如何實現(xiàn)切換線程。
希望讀者閱讀此篇文章,是有用過 RxJava 的童鞋。
本章內(nèi)容基于源碼版本
RxJava: 1.2.4
- 文章來源:itsCoder 的 WeeklyBolg 項目
- itsCoder主頁:http://itscoder.com/
- 作者:謝三弟
- 審閱者:
目錄
準備
答案我會放在文章末尾
先來一道開胃菜:
指出下列程序操作符所運行的線程。
Observable.just() //1
.subscribeOn(Schedulers.newThread())
.map() //2
.subscribeOn(Schedulers.io())
.map() //3
.observeOn(Schedulers.computation())
.map() //4
.observeOn(Schedulers.newThread())
.subscribe() //5
開胃菜就到上面結束,如果你能夠清楚明白每個操作運行的線程,說明對于 RxJava 的線程切換的理解很正確。
再具體分析 RxJava 是如何線程切換的,希望能清楚以下幾個 RxJava 中名詞的意思。
- Create()
- OnSubscribe
- Operator
如果你特別明白這幾個 RxJava 類/方法的作用,可以直接跳過看切換這部分。
-
Create()
/** * Returns an Observable that will execute the specified function when a {@link Subscriber} subscribes to * it. */ public static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(RxJavaHooks.onCreate(f)); }方法注釋上說明,當訂閱者訂閱之后,該函數(shù)會返回將會執(zhí)行具體功能的流。操作符進入源碼會發(fā)現(xiàn)他們最終都會調(diào)用到
create()函數(shù)。 -
OnSubscribe
/** * Invoked when Observable.subscribe is called. * @param <T> the output value type */ public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {}首先我們知道這是一個繼承
Action1的接口,并且是在Observable.subscribe流進行訂閱操作后回調(diào)。而且回顧剛剛create()源碼中也發(fā)現(xiàn)參數(shù)就是這個OnSubscribe。Action的作用就是執(zhí)行其中的call()方法。Observable.OnSubscribe 有點像 Todo List ,里面都是一個一個待處理的事務,并且這個 List 是有序的(這個很關鍵)。
-
Operator
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> { // cover for generics insanity }簡單來說它的職責就是將一個
Subscriber變成另外一個Subscriber。
切換
上面知識點是一些小鋪墊,因為后面的內(nèi)容核心其實就是上面幾個類的作用。
SubscribeOn
追蹤這個方法,核心是在這個類:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
}
我先貼出這個類的,構造方法和成員變量,因為很重要,我們先把前因弄清楚。
首先我們發(fā)現(xiàn)這個類是實現(xiàn)了 OnSubscribe 接口,之前復習到這個的作用就是在該流被訂閱之后執(zhí)行 call() 方法,這里面就是后果,待會我們來看。
前因其實很簡單,就是傳入兩個參數(shù):
一個是
Scheduler,調(diào)度器,它的具體實現(xiàn)在Schedulers里。-
Observable<T> source這個其實就是當前這個流。public final Observable<T> subscribeOn(Scheduler scheduler) { if (this instanceof ScalarSynchronousObservable) { return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); } return create(new OperatorSubscribeOn<T>(this, scheduler)); }
接下來看看 call() 核心代碼里做的事情:
// 因為是 OnSubscribe 類,這里 call() 中傳入的參數(shù)是 Observable.subscribe(s) 中的 s
@Override
public void call(final Subscriber<? super T> subscriber) {
// 根據(jù)傳入的調(diào)度器,創(chuàng)建一個 Worker 對象 inner
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
// 在 Worker 對象 inner 中執(zhí)行(意思就是,在我們指定的調(diào)度器創(chuàng)建的線程中運行)
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
// 對訂閱者包裝
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
······
};
// 這一句位置很關鍵
// 首先 source 是之前傳入的流(也就是當前流),在 Worker 內(nèi)部進行了訂閱操作,所以該流所有操作都執(zhí)行在其中
source.unsafeSubscribe(s);
}
});
}
通過我們指定的調(diào)度器,創(chuàng)建好 Worker ,之前傳入的流在 Worker 內(nèi)部,對重新包裹的 subscriber 進行訂閱操作。
所以 SubscribeOn()最關鍵的地方其實是因為這行代碼在調(diào)度器創(chuàng)建的 Worker 的 call() 中
source.unsafeSubscribe(s);
總結:
subscribeOn其實是改變了調(diào)用前序列所運行的線程。
ObserveOn
同樣的方法來分析,最終的回調(diào)會到:
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
其實看到關鍵字 lift 和 operator 就大約可以猜到是做什么的了。
接下來我們進入到 OperatorObserveOn 類中:
public final class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
// 省略不必要的代碼
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
// 省略 ···
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
}
我們首先會注意到它是一個 Operator ,并且沒有對上層 Observale 做任何修改和包裝。那么它的作用就是將一個 Subscriber 變成另外一個 Subscriber。所以接下來我們的首要任務就是看轉(zhuǎn)換后的 Subscriber 做了什么改變。
關鍵代碼在
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
child 是改變前的 Subscriber ,最后返回了 parent 。
我們發(fā)現(xiàn) ObserveOnSubscriber 同樣也是一個 Subscriber 類,所以肯定含有 onNext/onError/onComplete 這三個標準方法,重要的肯定是 onNext ,所以我只貼上了該類三個有關函數(shù)。
void init() {
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
// 執(zhí)行
schedule();
}
}
});
// recursiveScheduler 這個是構造函數(shù)時傳入調(diào)度器創(chuàng)建的 worker
localChild.add(recursiveScheduler);
localChild.add(this);
}
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
// 條件判斷里先將之前流的結果緩存進隊列
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
// 執(zhí)行
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
// 在當前 worker 上執(zhí)行該類的 call 方法
recursiveScheduler.schedule(this);
}
}
call() 方法有點冗長,做的事情其實很簡單,就是取出我們緩存之前流的所有值,然后在 Worker 工作線程中傳下去。
總結:
- ObserveOn 不會關心之前的流的線程
- ObserveOn 會先將之前的流的值緩存起來,然后再在指定的線程上,將緩存推送給后面的
Subscriber
共用時各自的作用域
Observable.just() //1
.subscribeOn(Schedulers.newThread())
.map() //2
.map() //3
.observeOn(Schedulers.computation())
.map() //4
.observeOn(Schedulers.newThread())
.subscribe() //5
如果分析這個流各個操作符的執(zhí)行線程,我們先把第一個 subscribeOn() 之前和第一個 observeOn() 之前的 Todo Items 找出來然后求并集:
得到的結果就是 subscribeOn() 的作用域。

之后的線程切換簡單了,遇到 observeOn() 就切換一次。
思考
為什么subscribeOn 只有第一次調(diào)用生效?
我的理解如下:
subscribeOn 的作用域就是調(diào)用前序列中所有的 Todo List 任務清單(Observable.OnSubscribe),當我們執(zhí)行 subscribe() 時,這些任務清單就會執(zhí)行在 subscribeOn 指定的工作線程,而第二個 subscribeOn 早就沒有任務可做了,所以無法生效。
知乎里這段說的比我專業(yè):
正像 StackOverflow 上那段描述的,整個 Observable 數(shù)據(jù)流工作起來是分為兩個階段(或者說是兩個 lifecycle):upstream 的 subscription-time 和 downstream 的 runtime。
subscription-time 的階段,是為了發(fā)起和驅(qū)動數(shù)據(jù)流的啟動,在內(nèi)部實現(xiàn)上體現(xiàn)為 OnSubscribe 向上游的逐級調(diào)用(控制流向上游傳遞)。支持 backpressure 的 producer request 也屬于這個階段。除了 producer request 的情況之外,subscription-time 階段一般就是從下游到上游調(diào)用一次就結束了,最終到達生產(chǎn)者(以最上游的那個 OnSubscribe 來體現(xiàn))。接下來數(shù)據(jù)流就開始向下游流動了。
Rxjava 中, subscribeOn 及 observeOn 方法切換線程發(fā)生的位置為什么設計為不同的? - 知乎
doOnSubscribe 的例外
我們再改動下開胃菜的代碼:
Observable.just() //1
.subscribeOn(Schedulers.newThread())
.map() //2
.subscribeOn(Schedulers.io())
.map() //3
.observeOn(Schedulers.computation())
.map() //4
.doOnSubscribe() //6
.observeOn(Schedulers.newThread())
.subscribe() //5
只添加了一行.doOnSubscribe() //6 ,也是探討這個操作符執(zhí)行的線程。
public class OperatorDoOnSubscribe<T> implements Operator<T, T> {
private final Action0 subscribe;
public OperatorDoOnSubscribe(Action0 subscribe) {
this.subscribe = subscribe;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
// 執(zhí)行我們的 Action
subscribe.call();
// Wrap 里面是包裝成一個新的 Subscriber 返回,不對這個流做任何改變
return Subscribers.wrap(child);
}
}
doOnSubscribe 執(zhí)行的線程其實就是 subscribe.call(); 所在的線程。這里觸發(fā)的時機就是,當我們進行 Observable.subscribe() 時,如果我們沒有在緊接之后SubscribeOn 指定線程,那么它就會運行在默認線程,然后返回一個新的流。
關于 doOnSubscribe() 留一個問題
Observable.just()
.doOnSubscribe() // 1
.doOnSubscribe() // 2
.subscribe()
問題是,對于 1 和 2 的執(zhí)行順序?
在開發(fā)中,我們肯定不會像問題那樣寫代碼,只是自己在看 doOnSubscribe 源碼的時候,在問自己為什么它在其他操作符之前,拓展到了 RxJava 流的一個執(zhí)行順序,也是自己想要明白的地方。所以下次準備探討學習。
對了,老司機說 RxJava 很像洋蔥,一層一層。
進行分析學習的時候可以類比幫助理解。
參考
Thomas Nield: RxJava- Understanding observeOn() and subscribeOn()
SubscribeOn 和 ObserveOn |Piasy Blog
答案:
1 newThread
2 newThread
3 newThread
4 computation
5 newThread