RxJava中幾種線程的源碼分析

由于RxJava1.x版本已經(jīng)在2018.3.31不再更新了,因此本文是基于的2.x版本的源碼。至于2.x和1.x有什么不同可以去他們Github主頁了解。

RxJava 中常用的幾種線程,通過 Schedulers 入口創(chuàng)建

  • newThread():一個(gè)新線程
  • io():異步IO線程
  • computation():計(jì)算密集型線程
    還有一個(gè)Android中主線程(UI線程)
  • AndroidSchedulers.mainThread()

newThread()

通過Schedulers來調(diào)用(精簡后):

#Schedulers.java
public static Scheduler newThread() {  
    Scheduler default = new NewThreadScheduler();
    return default;
}

返回的 NewThreadScheduler,繼承自Scheduler,NewThreadScheduler內(nèi)部里有一個(gè)自定義的ThreadFactory:RxThreadFactory,給線程起了名字并且設(shè)置了優(yōu)先級(jí)。

為什么要使用線程工廠來創(chuàng)建線程呢? 目的有4個(gè):

  • 可以給線程起名字(方便debug/profile時(shí)做分析)
  • 可以選擇線程類型,daemon or user thread
  • 設(shè)定線程優(yōu)先級(jí)
  • 按需處理未捕獲的異常
#NewThreadScheduler.java
public final class NewThreadScheduler extends Scheduler {
  ...
  ...
  @Override
  public Worker createWorker() {
      return new NewThreadWorker(threadFactory);//此行打斷點(diǎn)A
  }
}

NewThreadScheduler 中最重要的方法是重寫 createWorker() 返回了 NewThreadWorker,2.x把原先1.x的Subscription 重命名為 Disposable 了,因此現(xiàn)在的 NewThreadWorker 是實(shí)現(xiàn)自 Disposable 接口.

#NewThreadWorker.java
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;
    public NewThreadWorker(ThreadFactory threadFactory) {
    executor = SchedulerPoolFactory.create(threadFactory);//斷點(diǎn)A的調(diào)轉(zhuǎn)
    }
}

SchedulerPoolFactory 以工廠模式來生成ScheduledExecutorService(一個(gè)單線程的線程池),同時(shí)把每個(gè)單線程的線程池放入到一個(gè)集合ConcurrentHashMap中,并且啟動(dòng)了一個(gè)PURGE線程(也是個(gè)ScheduledExecutorService)通過定時(shí)任務(wù)來遍歷這個(gè)集合中的executors,如果是isShutdown == true就把它從集合中刪除,如果沒有shutdown就調(diào)用purge()方法,但是這個(gè)purge方法只能用于移除隊(duì)列中已被取消的通過submit()轉(zhuǎn)化的Future類型任務(wù)。

ScheduledExecutorService extends ThreadPoolExecutor implements ExecutorService,是一個(gè)可以執(zhí)行定時(shí)任務(wù)的線程池。

 #SchedulerPoolFactory.java
public final class SchedulerPoolFactory {
    //內(nèi)部維護(hù)一個(gè)放創(chuàng)建ScheduledExecutorService的HashMap
    static final Map<ScheduledThreadPoolExecutor, Object> POOLS =
       new ConcurrentHashMap<ScheduledThreadPoolExecutor, Object>();
    //斷點(diǎn)A的最終調(diào)轉(zhuǎn)方法:create(),每次都生成新的含有單線程的線程池并且放入POOL里,
    public static ScheduledExecutorService create(ThreadFactory factory) {
       final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
       ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
       POOLS.put(e, exec);
       return exec;
    }
    //開啟purge線程(非完整源碼),此方法在SchedulerPoolFactory的static塊中調(diào)用
    public static void start() {
       ScheduledExecutorService next = Executors.newScheduledThreadPool(1, new RxThreadFactory("RxSchedulerPurge"));
       next.scheduleAtFixedRate(new ScheduledTask(), PURGE_PERIOD_SECONDS, PURGE_PERIOD_SECONDS, TimeUnit.SECONDS);
    }

    static final class ScheduledTask implements Runnable {
        @Override
        public void run() {
            for (ScheduledThreadPoolExecutor e : new ArrayList<ScheduledThreadPoolExecutor>(POOLS.keySet())) {
                if (e.isShutdown()) {
                    POOLS.remove(e);
                } else {
                    e.purge();
                }
             }
     }
 }

工廠生成了 ScheduledExecutorService 了以后回給 NewThreadWorkerNewThreadWorker主要重寫了schedulerXXX(Runnable, delayTime, TimeUnit),方法內(nèi)部根據(jù)delaytime將異步任務(wù)submit 或者 schedule 。

總結(jié):打斷點(diǎn)可以分析到,上層每次調(diào)用Schedulers.newThread()時(shí),都會(huì)生成新的線程池,而創(chuàng)建線程池是很耗費(fèi)內(nèi)存開銷的。打開Profile來觀察下頻繁地調(diào)用Schedulers.newThread()的CPU及內(nèi)存情況,可以看到此時(shí)有29個(gè)線程存在,有一大部分是RxNewThreadScheduler,內(nèi)存也很吃緊,而且如源碼看到的那樣,每個(gè)線程只執(zhí)行一個(gè)任務(wù)而且也沒有被及時(shí)清除。


newthread cpu.png
newthread memory.png

io()

依然通過Schedulers來調(diào)用(精簡后):

#Schedulers.java
public static Scheduler io() {  
    Scheduler default = new IoScheduler();
    return default;
}

IoScheduler內(nèi)部的緩存了threads in pool / thread in pools,可以對(duì)空閑thread進(jìn)行復(fù)用,而不是像Schedulers.newThread()那樣每次創(chuàng)建新線程來執(zhí)行異步任務(wù)。

#IoScheduler .java
public final class IoScheduler extends Scheduler {
  final AtomicReference<CachedWorkerPool> pool;
  ...
  public IoScheduler(ThreadFactory threadFactory) {
    this.threadFactory = threadFactory;
    CachedWorkerPool update = new CachedWorkerPool(60, TimeUnit.SECONDS, threadFactory);
  }
  @Override
  public Worker createWorker() {
      return new EventLoopWorker(pool.get());
  }
}

IoScheduler 的構(gòu)造函數(shù)中創(chuàng)建了一個(gè)CachedWorkerPool,創(chuàng)建線程的方式是返回一個(gè)EventLoopWorker(CachedWorkerPool pool),也是繼承自Scheduler.Worker,創(chuàng)建時(shí)傳入了一個(gè)參數(shù) pool.get(),參數(shù)的類型是CachedWorkerPool implements Runnable,get() 方法的代碼如下

 #IoScheduler .java
 static final class CachedWorkerPool implements Runnable {
     ...
     ...
     ThreadWorker get() {
        if(allWorkers.isDisposed()) {
              return SHUTDOWN_THREAD_WORKER;
        }
        while (!expiringWorkerQueue.isEmpty()) {
              ThreadWorker threadWorker = expiringWorkerQueue.poll();
              if (threadWorker != null) {
                  return threadWorker;
              }
       }
       // No cached worker found, so create a new one.
       ThreadWorker w = new ThreadWorker(threadFactory);
       allWorkers.add(w);
       return w;
     } 
 }

CachedWorkerPool的獲得是先試圖從一個(gè) expiringWorkerQueue 中取得,如果有緩存的 worker 則 poll()(隊(duì)頭的ThreadWorker,Queue:FIFO),如果沒有(1.初始時(shí) 2.queque 中的 worker 都被 pool() 出去執(zhí)行任務(wù)但是還沒執(zhí)行完)則新建一個(gè)ThreadWorker返回用以去執(zhí)行異步任務(wù)。
expiringWorkerQueue 的類型是ConcurrentLinkedQueue<ThreadWorker>,它儲(chǔ)存著內(nèi)部類 ThreadWorker,繼承自 NewThreadWorker,但是多了一可以 get/set 的過期時(shí)間的字段:expirationTime當(dāng)異步任務(wù)執(zhí)行完畢會(huì)通過回調(diào) EventLoopWorker 的 dispose() 方法來更新當(dāng)前 worker 的過期時(shí)間字段,并調(diào)用 offer() 把它放入到expiringWorkerQueue中。代碼如下:

#IoScheduler .java
static final class EventLoopWorker extends Scheduler.Worker {
  ...
  ...
  @Override
      public void dispose() {
          if (once.compareAndSet(false, true)) {
              tasks.dispose();
              //更新expirationTime +60s
              threadWorker.setExpirationTime(now() + keepAliveTime);
              //將更新完的threadWorker放回隊(duì)列中
              expiringWorkerQueue.offer(threadWorker);
          }
      }
 }

CachedWorkerPool 內(nèi)部維護(hù)了一個(gè)60秒執(zhí)行的定時(shí)任務(wù)去循環(huán) expiringWorkerQueue 中的 ThreadWorker ,將它的 expirationTime 跟當(dāng)前時(shí)間比,如果小于當(dāng)前時(shí)間(證明已過期)就將它從 expiringWorkerQueue 移除,且從 allWorkers 移除。ConcurrentLinkedQueue是一個(gè)線程安全的無邊界的queque,如果并發(fā)的任務(wù)足夠多且每個(gè)已緩存的 worker 都被 poll() 出去執(zhí)行任務(wù)了,那么便會(huì)不斷地 new ThreadWorker(),隊(duì)列也會(huì)越來越長,通過定時(shí)任務(wù)來清理掉已經(jīng)過期的worker避免占用內(nèi)存。代碼如下:

  static final class CachedWorkerPool implements Runnable {
      ...
      ...
      CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
          ScheduledExecutorService evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
          Future<?> task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
      }

      @Override
      public void run() {
          if (!expiringWorkerQueue.isEmpty()) {
              long currentTimestamp = now();
              for (ThreadWorker threadWorker : expiringWorkerQueue) {
                  if (threadWorker.getExpirationTime() <= currentTimestamp) {
                      if (expiringWorkerQueue.remove(threadWorker)) {
                          allWorkers.remove(threadWorker);
                      }
                  } else {
                      break;
                  }
              }
          }
      }
  }

值得注意的是,由于ThreadWorker是繼承自NewThreadWorker的,而每一個(gè)NewThreadWorker的是通過SchedulerPoolFactory生成并且用PURGE線程來對(duì)它們進(jìn)行管理的。
通過Profile可以看到,緩存的線程得到了復(fù)用,且復(fù)用線程來執(zhí)行任務(wù)時(shí),CPU/MEMORY較上一種Schedulers.newThread()每次都創(chuàng)建新線程得到了很大的改善。

RxCachedThreadScheduler cpu.png

RxCachedThreadScheduler memory.png

總結(jié):如上面提到的,內(nèi)部維護(hù)的ConcurrentLinkedQueue是一個(gè)線程安全但無邊界的queque,如果并發(fā)足夠多就會(huì)大量創(chuàng)建線程(池),因此這種方式如同它的名字Schedulers.io()一樣,適合做非CPU敏感型的IO操作,比如訪問文件系統(tǒng)、執(zhí)行網(wǎng)絡(luò)請求、數(shù)據(jù)庫操作等等。

computation()

Schedulers.computation()是一個(gè)線程池,池內(nèi)線程數(shù)(準(zhǔn)確來說是單線程的ScheduledExecutorService)為CPU核數(shù),且采用輪詢調(diào)度分配算法(a round-robin fashion)來分配池中的線程。構(gòu)造方法的代碼如下:

#Schedulers.java
public static Scheduler computation() {  
    Scheduler default = new ComputationScheduler();
    return default;
}

#ComputationScheduler.java
public final class ComputationScheduler extends Scheduler implements SchedulerMultiWorkerSupport {
   ...
   static {
      //CPU核數(shù)
      MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
   }
   public ComputationScheduler(ThreadFactory threadFactory) {
    //線程池的大小是固定的(CPU核數(shù))
      FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS, threadFactory);
   }
}

FixedSchedulerPool就這個(gè)線程池,源碼很簡單,創(chuàng)建MAX_THREADS個(gè)PoolWorker,并且 createWorker() 就是以輪詢調(diào)度 pool 中 PoolWorker,去執(zhí)行各種異步的即時(shí)/定時(shí)任務(wù)。

 static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
    final int cores;

    final PoolWorker[] eventLoops;
    long n;

    FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
        // initialize event loops
        this.cores = maxThreads;
        this.eventLoops = new PoolWorker[maxThreads];
        for (int i = 0; i < maxThreads; i++) {
            this.eventLoops[i] = new PoolWorker(threadFactory);
        }
    }
    //createWorker(worker)中,參數(shù)worker就是通過這個(gè)方法調(diào)用獲得
    public PoolWorker getEventLoop() {
        int c = cores;
        if (c == 0) {
            return SHUTDOWN_WORKER;
        }
        // simple round robin, improvements to come
        return eventLoops[(int)(n++ % c)];
    }
}

總結(jié):Schedulers.computation()的源碼倒是挺簡單的,如文檔所說適合做一些CPU密集的操作,比如大量數(shù)據(jù)的解析和圖片的處理等。

除了newThread()、io()、computation()外,RxJava還提供了

  • single():一個(gè)單線程在主線程上執(zhí)行
  • trampoline():在當(dāng)前的主線程執(zhí)行,所有任務(wù)被排列等候上一個(gè)任務(wù)執(zhí)行完再執(zhí)行。

AndroidSchedulers.mainThread()

#AndroidSchedulers.java
public static Scheduler mainThread() {
    Scheduler default = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    return default;
}

可以看到調(diào)用AndroidSchedulers.mainThread()是返回了一個(gè)以mainLooper為參數(shù)構(gòu)造的HandlerScheduler。根據(jù)名字也能猜想的到是用的Handler機(jī)制來更新主線程UI。
createWorker() 方法返回了HandlerWorker(handler),這個(gè)handler就是剛才外部傳進(jìn)來的那個(gè) new Handler(Looper.getMainLooper())。HandlerWorker源碼如下:

private static final class HandlerWorker extends Worker {
    private final Handler handler;

    HandlerWorker(Handler handler) {
        this.handler = handler;
    }

    @Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        //根據(jù)RxJavaPlugins的源碼,這句只是把run包了一層try..catch..
        run = RxJavaPlugins.onSchedule(run);
        //ScheduledRunnable 是個(gè)內(nèi)部類,主要功能就是當(dāng)dispose()時(shí)執(zhí)行handler.removeCallbacks(runnable)
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.

        handler.sendMessageDelayed(message, unit.toMillis(delay));
        return scheduled;
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 前言 通過前一篇的從觀察者模式出發(fā),聊聊RxJava,我們大致理解了RxJava的實(shí)現(xiàn)原理,在RxJava中可以非...
    CuiTao閱讀 655評(píng)論 0 4
  • 前言 通過前一篇的從觀察者模式出發(fā),聊聊RxJava,我們大致理解了RxJava的實(shí)現(xiàn)原理,在RxJava中可以非...
    IAM四十二閱讀 16,245評(píng)論 11 37
  • RxJava的被觀察者在使用操作符時(shí)可以利用線程調(diào)度器--Scheduler來切換線程,例如 被觀察者(Obser...
    fengzhizi715閱讀 5,049評(píng)論 0 8
  • 前言 JDK中為我們提供了一個(gè)并發(fā)線程框架,它是的我們可以在有異步任務(wù)或大量并發(fā)任務(wù)需要執(zhí)行時(shí)可以使用它提供的線程...
    Justlearn閱讀 1,909評(píng)論 0 10
  • 本周拿到了心儀的offer,500強(qiáng),業(yè)務(wù)不涉及中國,一年32天年假,工資在本土我這個(gè)年紀(jì)的白人里都算中上的,老板...
    Catman閱讀 889評(píng)論 2 50

友情鏈接更多精彩內(nèi)容