由于RxJava1.x版本已經(jīng)在2018.3.31不再更新了,因此本文是基于的2.x版本的源碼。至于2.x和1.x有什么不同可以去他們Github主頁了解。
RxJava 中常用的幾種線程,通過 Schedulers 入口創(chuàng)建
- newThread():一個(gè)新線程
- io():異步IO線程
- computation():計(jì)算密集型線程
還有一個(gè)Android中主線程(UI線程) - AndroidSchedulers.mainThread()
newThread()
通過Schedulers來調(diào)用(精簡后):
#Schedulers.java
public static Scheduler newThread() {
Scheduler default = new NewThreadScheduler();
return default;
}
返回的 NewThreadScheduler,繼承自Scheduler,NewThreadScheduler內(nèi)部里有一個(gè)自定義的ThreadFactory:RxThreadFactory,給線程起了名字并且設(shè)置了優(yōu)先級(jí)。
為什么要使用線程工廠來創(chuàng)建線程呢? 目的有4個(gè):
- 可以給線程起名字(方便debug/profile時(shí)做分析)
- 可以選擇線程類型,daemon or user thread
- 設(shè)定線程優(yōu)先級(jí)
- 按需處理未捕獲的異常
#NewThreadScheduler.java
public final class NewThreadScheduler extends Scheduler {
...
...
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);//此行打斷點(diǎn)A
}
}
NewThreadScheduler 中最重要的方法是重寫 createWorker() 返回了 NewThreadWorker,2.x把原先1.x的Subscription 重命名為 Disposable 了,因此現(xiàn)在的 NewThreadWorker 是實(shí)現(xiàn)自 Disposable 接口.
#NewThreadWorker.java
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);//斷點(diǎn)A的調(diào)轉(zhuǎn)
}
}
SchedulerPoolFactory 以工廠模式來生成ScheduledExecutorService(一個(gè)單線程的線程池),同時(shí)把每個(gè)單線程的線程池放入到一個(gè)集合ConcurrentHashMap中,并且啟動(dòng)了一個(gè)PURGE線程(也是個(gè)ScheduledExecutorService)通過定時(shí)任務(wù)來遍歷這個(gè)集合中的executors,如果是isShutdown == true就把它從集合中刪除,如果沒有shutdown就調(diào)用purge()方法,但是這個(gè)purge方法只能用于移除隊(duì)列中已被取消的通過submit()轉(zhuǎn)化的Future類型任務(wù)。
ScheduledExecutorService extends ThreadPoolExecutor implements ExecutorService,是一個(gè)可以執(zhí)行定時(shí)任務(wù)的線程池。
#SchedulerPoolFactory.java
public final class SchedulerPoolFactory {
//內(nèi)部維護(hù)一個(gè)放創(chuàng)建ScheduledExecutorService的HashMap
static final Map<ScheduledThreadPoolExecutor, Object> POOLS =
new ConcurrentHashMap<ScheduledThreadPoolExecutor, Object>();
//斷點(diǎn)A的最終調(diào)轉(zhuǎn)方法:create(),每次都生成新的含有單線程的線程池并且放入POOL里,
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
return exec;
}
//開啟purge線程(非完整源碼),此方法在SchedulerPoolFactory的static塊中調(diào)用
public static void start() {
ScheduledExecutorService next = Executors.newScheduledThreadPool(1, new RxThreadFactory("RxSchedulerPurge"));
next.scheduleAtFixedRate(new ScheduledTask(), PURGE_PERIOD_SECONDS, PURGE_PERIOD_SECONDS, TimeUnit.SECONDS);
}
static final class ScheduledTask implements Runnable {
@Override
public void run() {
for (ScheduledThreadPoolExecutor e : new ArrayList<ScheduledThreadPoolExecutor>(POOLS.keySet())) {
if (e.isShutdown()) {
POOLS.remove(e);
} else {
e.purge();
}
}
}
}
工廠生成了 ScheduledExecutorService 了以后回給 NewThreadWorker,NewThreadWorker主要重寫了schedulerXXX(Runnable, delayTime, TimeUnit),方法內(nèi)部根據(jù)delaytime將異步任務(wù)submit 或者 schedule 。
總結(jié):打斷點(diǎn)可以分析到,上層每次調(diào)用Schedulers.newThread()時(shí),都會(huì)生成新的線程池,而創(chuàng)建線程池是很耗費(fèi)內(nèi)存開銷的。打開Profile來觀察下頻繁地調(diào)用Schedulers.newThread()的CPU及內(nèi)存情況,可以看到此時(shí)有29個(gè)線程存在,有一大部分是RxNewThreadScheduler,內(nèi)存也很吃緊,而且如源碼看到的那樣,每個(gè)線程只執(zhí)行一個(gè)任務(wù)而且也沒有被及時(shí)清除。


io()
依然通過Schedulers來調(diào)用(精簡后):
#Schedulers.java
public static Scheduler io() {
Scheduler default = new IoScheduler();
return default;
}
IoScheduler內(nèi)部的緩存了threads in pool / thread in pools,可以對(duì)空閑thread進(jìn)行復(fù)用,而不是像Schedulers.newThread()那樣每次創(chuàng)建新線程來執(zhí)行異步任務(wù)。
#IoScheduler .java
public final class IoScheduler extends Scheduler {
final AtomicReference<CachedWorkerPool> pool;
...
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
CachedWorkerPool update = new CachedWorkerPool(60, TimeUnit.SECONDS, threadFactory);
}
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
}
IoScheduler 的構(gòu)造函數(shù)中創(chuàng)建了一個(gè)CachedWorkerPool,創(chuàng)建線程的方式是返回一個(gè)EventLoopWorker(CachedWorkerPool pool),也是繼承自Scheduler.Worker,創(chuàng)建時(shí)傳入了一個(gè)參數(shù) pool.get(),參數(shù)的類型是CachedWorkerPool implements Runnable,get() 方法的代碼如下
#IoScheduler .java
static final class CachedWorkerPool implements Runnable {
...
...
ThreadWorker get() {
if(allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
}
CachedWorkerPool的獲得是先試圖從一個(gè) expiringWorkerQueue 中取得,如果有緩存的 worker 則 poll()(隊(duì)頭的ThreadWorker,Queue:FIFO),如果沒有(1.初始時(shí) 2.queque 中的 worker 都被 pool() 出去執(zhí)行任務(wù)但是還沒執(zhí)行完)則新建一個(gè)ThreadWorker返回用以去執(zhí)行異步任務(wù)。
expiringWorkerQueue 的類型是ConcurrentLinkedQueue<ThreadWorker>,它儲(chǔ)存著內(nèi)部類 ThreadWorker,繼承自 NewThreadWorker,但是多了一可以 get/set 的過期時(shí)間的字段:expirationTime當(dāng)異步任務(wù)執(zhí)行完畢會(huì)通過回調(diào) EventLoopWorker 的 dispose() 方法來更新當(dāng)前 worker 的過期時(shí)間字段,并調(diào)用 offer() 把它放入到expiringWorkerQueue中。代碼如下:
#IoScheduler .java
static final class EventLoopWorker extends Scheduler.Worker {
...
...
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
//更新expirationTime +60s
threadWorker.setExpirationTime(now() + keepAliveTime);
//將更新完的threadWorker放回隊(duì)列中
expiringWorkerQueue.offer(threadWorker);
}
}
}
而 CachedWorkerPool 內(nèi)部維護(hù)了一個(gè)60秒執(zhí)行的定時(shí)任務(wù)去循環(huán) expiringWorkerQueue 中的 ThreadWorker ,將它的 expirationTime 跟當(dāng)前時(shí)間比,如果小于當(dāng)前時(shí)間(證明已過期)就將它從 expiringWorkerQueue 移除,且從 allWorkers 移除。ConcurrentLinkedQueue是一個(gè)線程安全的無邊界的queque,如果并發(fā)的任務(wù)足夠多且每個(gè)已緩存的 worker 都被 poll() 出去執(zhí)行任務(wù)了,那么便會(huì)不斷地 new ThreadWorker(),隊(duì)列也會(huì)越來越長,通過定時(shí)任務(wù)來清理掉已經(jīng)過期的worker避免占用內(nèi)存。代碼如下:
static final class CachedWorkerPool implements Runnable {
...
...
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
ScheduledExecutorService evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
Future<?> task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
@Override
public void run() {
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
if (threadWorker.getExpirationTime() <= currentTimestamp) {
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
} else {
break;
}
}
}
}
}
值得注意的是,由于ThreadWorker是繼承自NewThreadWorker的,而每一個(gè)NewThreadWorker的是通過SchedulerPoolFactory生成并且用PURGE線程來對(duì)它們進(jìn)行管理的。
通過Profile可以看到,緩存的線程得到了復(fù)用,且復(fù)用線程來執(zhí)行任務(wù)時(shí),CPU/MEMORY較上一種Schedulers.newThread()每次都創(chuàng)建新線程得到了很大的改善。


總結(jié):如上面提到的,內(nèi)部維護(hù)的ConcurrentLinkedQueue是一個(gè)線程安全但無邊界的queque,如果并發(fā)足夠多就會(huì)大量創(chuàng)建線程(池),因此這種方式如同它的名字Schedulers.io()一樣,適合做非CPU敏感型的IO操作,比如訪問文件系統(tǒng)、執(zhí)行網(wǎng)絡(luò)請求、數(shù)據(jù)庫操作等等。
computation()
Schedulers.computation()是一個(gè)線程池,池內(nèi)線程數(shù)(準(zhǔn)確來說是單線程的ScheduledExecutorService)為CPU核數(shù),且采用輪詢調(diào)度分配算法(a round-robin fashion)來分配池中的線程。構(gòu)造方法的代碼如下:
#Schedulers.java
public static Scheduler computation() {
Scheduler default = new ComputationScheduler();
return default;
}
#ComputationScheduler.java
public final class ComputationScheduler extends Scheduler implements SchedulerMultiWorkerSupport {
...
static {
//CPU核數(shù)
MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
}
public ComputationScheduler(ThreadFactory threadFactory) {
//線程池的大小是固定的(CPU核數(shù))
FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS, threadFactory);
}
}
FixedSchedulerPool就這個(gè)線程池,源碼很簡單,創(chuàng)建MAX_THREADS個(gè)PoolWorker,并且 createWorker() 就是以輪詢調(diào)度 pool 中 PoolWorker,去執(zhí)行各種異步的即時(shí)/定時(shí)任務(wù)。
static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
final int cores;
final PoolWorker[] eventLoops;
long n;
FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
// initialize event loops
this.cores = maxThreads;
this.eventLoops = new PoolWorker[maxThreads];
for (int i = 0; i < maxThreads; i++) {
this.eventLoops[i] = new PoolWorker(threadFactory);
}
}
//createWorker(worker)中,參數(shù)worker就是通過這個(gè)方法調(diào)用獲得
public PoolWorker getEventLoop() {
int c = cores;
if (c == 0) {
return SHUTDOWN_WORKER;
}
// simple round robin, improvements to come
return eventLoops[(int)(n++ % c)];
}
}
總結(jié):Schedulers.computation()的源碼倒是挺簡單的,如文檔所說適合做一些CPU密集的操作,比如大量數(shù)據(jù)的解析和圖片的處理等。
除了newThread()、io()、computation()外,RxJava還提供了
- single():一個(gè)單線程在主線程上執(zhí)行
- trampoline():在當(dāng)前的主線程執(zhí)行,所有任務(wù)被排列等候上一個(gè)任務(wù)執(zhí)行完再執(zhí)行。
AndroidSchedulers.mainThread()
#AndroidSchedulers.java
public static Scheduler mainThread() {
Scheduler default = new HandlerScheduler(new Handler(Looper.getMainLooper()));
return default;
}
可以看到調(diào)用AndroidSchedulers.mainThread()是返回了一個(gè)以mainLooper為參數(shù)構(gòu)造的HandlerScheduler。根據(jù)名字也能猜想的到是用的Handler機(jī)制來更新主線程UI。
createWorker() 方法返回了HandlerWorker(handler),這個(gè)handler就是剛才外部傳進(jìn)來的那個(gè) new Handler(Looper.getMainLooper())。HandlerWorker源碼如下:
private static final class HandlerWorker extends Worker {
private final Handler handler;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
//根據(jù)RxJavaPlugins的源碼,這句只是把run包了一層try..catch..
run = RxJavaPlugins.onSchedule(run);
//ScheduledRunnable 是個(gè)內(nèi)部類,主要功能就是當(dāng)dispose()時(shí)執(zhí)行handler.removeCallbacks(runnable)
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, unit.toMillis(delay));
return scheduled;
}
}