RxJava訂閱過程和線程切換原理
1 - Observable.just("hello world").subscribe(observer)
這是RxJava中的生產(chǎn)/消費(fèi)模式中最簡(jiǎn)單的一種,就是生產(chǎn)發(fā)送“hello world"在用observer去監(jiān)聽消費(fèi)數(shù)據(jù),那么具體內(nèi)部RxJava是如何實(shí)現(xiàn)的呢?
public abstract class Observable<T> implements ObservableSource<T> {
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
// 1.創(chuàng)建實(shí)際的Observable類ObservableJust,并把數(shù)據(jù)傳進(jìn)去
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
// subscribe操作
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// 2.Observable的subscribeActual是abstract,實(shí)現(xiàn)在ObservableJust中
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
}
// Observable.just("Hello world"),創(chuàng)建的Observable實(shí)現(xiàn)類
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
// subscribe時(shí)具體實(shí)現(xiàn)邏輯
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
// 3.先調(diào)用observer.onSubscribe()方法
observer.onSubscribe(sd);
// 4.執(zhí)行后續(xù)的調(diào)用
sd.run();
}
@Override
public T call() {
return value;
}
}
public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable {
@Override
public void run() {
// 5.如果時(shí)START,設(shè)置為ON_NEXT
if (get() == START && compareAndSet(START, ON_NEXT)) {
// 6.調(diào)用observer.onNext()方法
observer.onNext(value);
if (get() == ON_NEXT) {
// 7.設(shè)置為ON_COMPLETE
lazySet(ON_COMPLETE);
// 8.調(diào)用observer.onComplete()方法
observer.onComplete();
}
}
}
}
上面代碼的注釋可以清晰的了解到整個(gè)Observable.just("Hello world").subscribe(observer)的調(diào)用過程,首先是在Observable.just("Hello world")中創(chuàng)建實(shí)際的Observable對(duì)象ObservableJust實(shí)例,然后在subscribe(observer)時(shí),調(diào)用ObservableJust的subscribeActual方法,在subscribeActual中先調(diào)用obsever.onSubscribe(),再調(diào)用ScalarDisposable的run()方法,run()方法中處理了onNext()/onComplete()邏輯。
2 - Observable.just("hello world").subscribeOn(Schedulers.IO).subscribe(observer),subscribeOn(scheduler)的線程切換分析,數(shù)據(jù)生產(chǎn)方的線程切換
public abstract class Observable<T> implements ObservableSource<T> {
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 1.創(chuàng)建ObservableSubscribeOn,即把原始Observable轉(zhuǎn)為ObservableSubscribeOn對(duì)象
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
// 2.把原始Observable保存為source
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
// 3.把原始o(jì)bserver轉(zhuǎn)成另一個(gè)代理類,代理類中對(duì)onSubscribe和dispose做了處理
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
// 4.調(diào)用Observer.onSubscribe()方法
s.onSubscribe(parent);
// 5.此處是關(guān)鍵,創(chuàng)建SubscribeTask,它是個(gè)Runnable,給scheduler去調(diào)用,即進(jìn)行線程切換,
// 在SubscribeTask中包含了orignalObservable.subscribe(orignalObserver)邏輯,
// 這樣就使訂閱邏輯執(zhí)行在scheduler線程中。此處的scheduler.scheduleDirect()邏輯后面在分析,
// 只要理解為把一個(gè)Runnable放在執(zhí)行線程執(zhí)行即可。
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
final class SubscribeTask implements Runnable {
@Override
public void run() {
// 5.source即為原始Observable,parent為原始Observer的代理類(SubscribeOnObserver),
//即這里實(shí)際就是執(zhí)行原始未切換線程邏輯下的Observable.subscribe(observer),
// 注意因?yàn)樯厦嬉呀?jīng)調(diào)用了originObserver.onSubscribe(parent);
// 所以在代理類onSubscribe方法中沒有調(diào)用originObserver.onSubscribe()
source.subscribe(parent);
}
}
}
從上面代碼注釋可以了解到,RxJava subscribe(發(fā)送端) 切換線程subscribe(Schedulers.IO)邏輯會(huì)把原始Observable轉(zhuǎn)為另一個(gè)訂閱時(shí)線程切換的Observable(ObservableSubscribeOn),在ObservableSubscribeOn.subscribeActual()中把原始o(jì)bserver轉(zhuǎn)為一個(gè)代理對(duì)象parent,調(diào)用originObserver.onSubscribe()方法,并把未切換線程的subscribe邏輯包裝為Runnable,再把Runnable給Scheduler去調(diào)用執(zhí)行,從而達(dá)到切換線程執(zhí)行subscribe邏輯。
3 - Observable.just("hello world").observeOn(Schedulers.IO).subscribe(observer), observeOn(scheduler)的線程切換分析,數(shù)據(jù)消費(fèi)方線程切換
public abstract class Observable<T> implements ObservableSource<T> {
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
// 1.創(chuàng)建ObservableObserveOn,即把原始Observable轉(zhuǎn)為ObservableObserveOn對(duì)象
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
// 2.創(chuàng)建ObserveOnObserver,即把原始o(jì)bserver轉(zhuǎn)為ObserveOnObserver對(duì)象,
// ObserveOnObserver中會(huì)把observer相關(guān)的回調(diào)通過worker切換到指定線程去調(diào)用
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
// 3.切換線程調(diào)用
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
// 4.切換到worker線程調(diào)用,里面具體調(diào)用哪個(gè)方法執(zhí)行邏輯較多,總體來說到這里就達(dá)到了observer回調(diào)切換線程的目的
worker.schedule(this);
}
}
}
}
從上面代碼注釋可以了解到,RxJava observer(接收端) 切換線程observerOn(Schedulers.IO)邏輯會(huì)把原始Observable轉(zhuǎn)為另一個(gè)訂閱時(shí)線程切換的Observable(ObservableObserveOn),在ObservableObserveOn.subscribeActual()中把原始o(jì)bserver轉(zhuǎn)為一個(gè)代理對(duì)象ObserveOnObserver,在ObserveOnObserver中對(duì)observer相關(guān)的回調(diào)做線程切換處理,從而達(dá)到observer回調(diào)切換線程的目的。
4 - 生產(chǎn)端和消費(fèi)端都切換線程的分析, Observable.just("hello world").subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(observer)
public abstract class Observable<T> implements ObservableSource<T> {
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
// 1.創(chuàng)建實(shí)際的Observable類ObservableJust,并把數(shù)據(jù)傳進(jìn)去
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 2.創(chuàng)建ObservableSubscribeOn,即把ObservableJust轉(zhuǎn)為ObservableSubscribeOn對(duì)象
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
// 3.創(chuàng)建ObservableObserveOn,即把ObservableSubscribeOn轉(zhuǎn)為ObservableObserveOn對(duì)象
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
}
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
@Override
protected void subscribeActual(Observer<? super T> s) {
// 4.這是ObservableJust執(zhí)行subscribe的邏輯。記:TAG-4
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
}
public static final class ScalarDisposable<T> extends AtomicInteger implements QueueDisposable<T>, Runnable {
public void run() {
// 5.ObservableJust發(fā)送端邏輯,這里會(huì)調(diào)線程切換轉(zhuǎn)化后的Observer,記后面的TAG-10位置
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
// 6.把原始Observable保存為source
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
// 注意此處的s不是原始的observer,是在ObservableObserveOn中轉(zhuǎn)化過的observer,可以見下面分析
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
//7.把observable.subscribe(observer)包裝進(jìn)Runnable中,并給scheduler去執(zhí)行,留意此處在哪調(diào)用的,后面會(huì)分析。記:TAG-7,這里會(huì)調(diào)用到第六步TAG-8
//注意注意注意?。。。哼@里達(dá)到發(fā)送端切換線程目的
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
final class SubscribeTask implements Runnable {
@Override
public void run() {
// 8.在io線程中執(zhí)行observable.subscribe(observer),注意此處的source是ObservableJust,它會(huì)調(diào)用TAG-4。記:TAG-8
source.subscribe(parent);
}
}
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
// 9.創(chuàng)建ObserveOnObserver,即把原始o(jì)bserver轉(zhuǎn)為ObserveOnObserver對(duì)象,并調(diào)用到上面第7步的流程,即TAG-7的地方
// 注意注意注意!?。。篛bserveOnObserver會(huì)把原始o(jì)bserver的回調(diào)放在指定線程去回調(diào),達(dá)到接收端切換線程目的
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
// 10.observer切換線程的地方。記TAG-10
schedule();
}
}
}
從上面代碼注釋可以了解到,其實(shí)RxJava對(duì)于發(fā)送端和接收端切換線程的邏輯就是把對(duì)應(yīng)的Observable轉(zhuǎn)為另一個(gè)Observable,且對(duì)于observeOn(scheduler)會(huì)把observer轉(zhuǎn)為切換線程調(diào)用的oberver,當(dāng)subscribe時(shí),就會(huì)從轉(zhuǎn)化后的Observable一級(jí)一級(jí)調(diào)用到原始的Observable方法,當(dāng)然中間做了subscribe的切換線程操作,在原始Observable上再調(diào)用轉(zhuǎn)化后的可線程切換的Observer的回調(diào),在切換線程Observer中對(duì)原始Observer回調(diào)進(jìn)行線程切換后調(diào)用。
RxJava源碼分析系列文章主題目錄:
- 1. RxJava源碼分析-----初始篇
- 2. RxJava源碼分析之 --- 訂閱過程和線程切換
- RxJava源碼分析之 --- 操作符
- RxJava源碼分析之 --- Backpressure
- RxJava源碼分析之 --- hook