前言
本篇主要解析RxJava的線程切換的原理實(shí)現(xiàn)
subscribeOn
首先, 我們先看下subscribeOn()方法, 老樣子, 先上Demo
Observable<Integer> observable =
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(123);
emitter.onComplete();
}
});
observable
.subscribeOn(Schedulers.io())
.subscribe(getObserver());
subscribeOn操作符源碼里其實(shí)是返回了一個(gè)ObservableSubscribeOn對(duì)象, 而從上篇我們已經(jīng)知道, 訂閱的動(dòng)作其實(shí)在每個(gè)Observable的subscribeActual(observer)中執(zhí)行, 所以我們直接去看ObservableSubscribeOn中的對(duì)應(yīng)重載方法就行了.
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
SubscribeTask是一個(gè)Runnable的實(shí)現(xiàn)類(lèi), 執(zhí)行內(nèi)容就是修飾后的Observer訂閱上游的動(dòng)作, 我們先看scheduler.scheduleDirect(runable)方法
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
這里createWorker是個(gè)抽象方法, 我們需要找到對(duì)應(yīng)的修飾類(lèi), 我們返回去看Schedulers.io(), IO是IoScheduler的實(shí)例, 它的重載方法代碼如下
final AtomicReference<CachedWorkerPool> pool;
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
可以看到IO線程實(shí)際使用的是一個(gè)有線程緩存的線程調(diào)度器.它內(nèi)部通過(guò)ScheduledExecutorService實(shí)例來(lái)嘗試重用之前worker開(kāi)始使用的實(shí)例, 由于本篇著重在流程實(shí)現(xiàn)原理, 所以略過(guò)細(xì)節(jié)處.
在EventLoopWorker中, 我們看下對(duì)應(yīng)的重載方法
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
繼續(xù)往下, 其實(shí)這個(gè)時(shí)候已經(jīng)是在線程池目標(biāo)線程執(zhí)行相關(guān)的工作了. 再深入就是線程池的操作了, 所以這里我們不再贅述
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
由此我們可以看出來(lái), 每次每個(gè)subscribeOn操作符執(zhí)行的時(shí)候, 其實(shí)在source.subscribe(parent);訂閱動(dòng)作就做了線程切換, 所以在多次調(diào)subscribeOn的時(shí)候, 就會(huì)一直切換線程, 直到離ObservableSource最近的subscribeOn線程切換生效.
observeOn
廢話不說(shuō), 我們直接看ObservableObserveOn.subscribeActual(observer)
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
熟悉的配方, 當(dāng)相同想成的時(shí)候, 直接訂閱, 而當(dāng)不同線程的時(shí)候, 可以看到我們獲取目標(biāo)切換線程對(duì)應(yīng)的worker實(shí)例以及裝飾對(duì)應(yīng)的Observer成ObserveOnOberver,后面的流程我們心知肚明, 就是Observer層層訂閱上去, 然后我們看當(dāng)碰到最上流的ObservableSource往下執(zhí)行的時(shí)候, 做什么操作.具體我們看ObserveOnOberver代碼, 我們這里著重看下onSubscribe和onNext方法
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
// 發(fā)送的數(shù)據(jù)是集合隊(duì)列形式的時(shí)候
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
//是同步模式的時(shí)候
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
// 線程調(diào)度
schedule();
return;
}
// 異步模式
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
// 是否已經(jīng)調(diào)用到onComplete 或者 onError, 如果是, 則不再執(zhí)行后面的onNext
if (done) {
return;
}
// 如果是非異步操作, 將數(shù)據(jù)添加到隊(duì)列中
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
// 線程調(diào)度
schedule();
}
大概的注釋都加在代碼上了, 我們?cè)傺a(bǔ)充看下看onSubscribe方法, 首先判斷發(fā)送的數(shù)據(jù)是否屬于QueueDisposable, 如果不是, 直接執(zhí)行下游的onSubscribe,這里我卡了一下, 看不到他的線程切換是在哪里做, 后來(lái)往回看, 發(fā)現(xiàn)在我們執(zhí)行ObservableSubscribeOn.subscribeActual(observer)的時(shí)候, onSubscribe()方法本身的確不是在切換后的線程內(nèi)執(zhí)行的. 但是, 當(dāng)我們發(fā)送的是集合數(shù)據(jù), 那么我們需要判斷是哪種線程模式進(jìn)行線程調(diào)度.
我們來(lái)看具體的schedule()方法代碼
void schedule() {
// 判斷當(dāng)前自增值是否為0, 原子性保證worker.schedule(this);不會(huì)在調(diào)用結(jié)束前被重復(fù)調(diào)用
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
這個(gè)時(shí)候就是在指定線程內(nèi)run了, Disposable schedule(@NonNull Runnable run)傳入的是個(gè)Runnable的實(shí)現(xiàn)類(lèi), 我們來(lái)找重載的run方法
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
// 無(wú)限循環(huán)
for (;;) {
// 判斷是否被取消, 或者調(diào)用onError 或者調(diào)用onComplete則退出循環(huán)
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
// 無(wú)限循環(huán)
for (;;) {
boolean d = done;
T v;
try {
// 隊(duì)列數(shù)據(jù)分發(fā)
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
// 判斷是否應(yīng)該被終止
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
// 原子性保證worker.schedule(this)的調(diào)用
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
// 判斷循環(huán)是否終止
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
// 如果訂閱已經(jīng)被取消, 則清除隊(duì)列, 終止
if (cancelled) {
queue.clear();
return true;
}
// 如果調(diào)用過(guò)onError 或者 onComplete
if (d) {
Throwable e = error;
// 默認(rèn)false
if (delayError) {
// 等到隊(duì)列為空的時(shí)候再調(diào)用onError或者onComplete
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
// 如果有拋出異常, 走下游的onError
// 線程任務(wù)停止
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
}
// 沒(méi)有, 走下游的onComplete
// 線程任務(wù)停止
else if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
// 否則不結(jié)束
return false;
}
由此我們可以得出結(jié)論, observeOn的操作符可以保證我們下流操作線程切換生效
總結(jié)
到這里, 我們線程切換的原理大體流程就基本分析完畢了, 可以看出subscribeOn操作符只對(duì)上游生效, 而且因?yàn)樗窃谟嗛喌臅r(shí)候進(jìn)行線程切換, 而我們每個(gè)操作符中間都有訂閱動(dòng)作, 所以越接近我們的ObservableSource的訂閱的subscribeOn越是最后生效的. 而observeOn生效在我們的onNext,onComplete, onError方法內(nèi), 所以每次的observeOn針對(duì)它的下游都可以生效.