本文目錄
- 為什么要使用線程池?
- 線程池參數(shù)詳解
- 6種常見的線程池
- 為什么不能直接自動創(chuàng)建線程
- 如果自定義合適的線程池?
- 如何正確關閉線程池?
- 線程池實現(xiàn)線程復用原理
為什么要使用線程池?
為什么要使用線程池?
反復創(chuàng)建線程系統(tǒng)開銷比較大,而且每個線程的創(chuàng)建和銷毀都需要時間,如果任務比較簡單,那么有可能導致線程的創(chuàng)建和銷毀占用的資源超過執(zhí)行任務所消耗的資源.
如果當要執(zhí)行的任務比較多時,每個線程負責一個任務,那么需要創(chuàng)建很多線程去執(zhí)行任務,過多的線程會占用過多的內(nèi)存資源等,還會帶來上下文切換,同時還會導致系統(tǒng)不穩(wěn)定.
線程池好處
線程池解決了線程生命周期的系統(tǒng)開銷問題,線程池中的線程可以反復使用,可以用少量的線程去執(zhí)行大量的任務,減少了線程創(chuàng)建和銷毀的開銷,而且線程都是創(chuàng)建好的,來任務就可以執(zhí)行.
通過設置合適的線程池的線程數(shù),可以避免資源使用不當,線程池可以通過線程數(shù)和任務靈活的控制線程數(shù)量,任務多的時候可以繼續(xù)創(chuàng)建線程,任務少的時候只保留核心線程,這樣可以避免系統(tǒng)資源浪費和線程過多導致內(nèi)存溢出.
線程池可以統(tǒng)一管理資源,通過線程書和任務隊列,可以統(tǒng)一開始和結束,并設置相關的拒絕策略.
線程池參數(shù)詳解
介紹線程池各個參數(shù)含義
corePoolSize:核心線程數(shù),常駐線程池的線程數(shù)量
maxPoolSize:線程池最大線程數(shù)量,當任務特別多的時候,corePoolSize線程數(shù)量無法滿足需求的時候,就會繼續(xù)創(chuàng)建線程,最大不超過maxPoolSize.
KeepAliveTime+時間單位:空閑線程的存活時間
ThreadFactory:線程工廠,用來創(chuàng)建線程
WorkQueue:任務隊列,用來存放任務
Handler:處理被拒絕的策略
線程池處理任務流程圖:
如上圖所示,流程如下:
當提交任務后,,線程池首先會檢查當前線程數(shù),如果當前線程數(shù)小于核心線程數(shù),則新建線程并執(zhí)行任務.
隨著任務不斷增加,線程數(shù)達到了核心線程數(shù)的數(shù)量,此時任務依然在增加,那么新來的任務將會放到workQueue等待隊列中,等核心線程執(zhí)行完任務后重新從隊列中提取出等待被執(zhí)行的任務
如果已經(jīng)達到了核心線程數(shù),且任務隊列也滿了,則線程池就會繼續(xù)創(chuàng)建線程來執(zhí)行任務,如果任務不斷提交,線程池會持續(xù)創(chuàng)建線程直到達到maximumPoolSize最大線程數(shù),當達到了最大線程數(shù)后,任務仍不斷提交,那么此時就超過了線程池的最大處理能力,這個時候線程池就會拒絕處理這些任務,處理策略就是handler.
corePoolSize和maximumPoolSize:
從上面的流程中可以看出,線程池初始化時,默認的線程數(shù)是0,當有任務提交后,開始創(chuàng)建核心線程去執(zhí)行任務,當線程數(shù)達到核心線程數(shù)時且任務隊列滿了后,開始創(chuàng)建非核心線程執(zhí)行任務,最大可以達到maximumPoolSize,如果這是任務不提交了,線程開始空閑,那么默認情況下大于corePoolSize的線程在超過設置的KeepAliveTime時間后會被合理的收回,所以默認情況下,線程池中的線程數(shù)量處于corePoolSize和maximumPoolSize之間.
KeepAliveTime+時間單位:
默認情況下,當線程池中的數(shù)量多于核心線程數(shù)時,而此時有沒有任務可做,那么線程池就會檢測線程的KeepAliveTime,如果超過了規(guī)定的時間,則無事可做的線程就會被銷毀,以便減少內(nèi)存的占用和資源消耗,如果后期任務又多了起來,則線程池根據(jù)規(guī)則重新創(chuàng)建線程,通過這個可伸縮的功能,可以實現(xiàn)對資源的合理使用,我們可以通過setKeepAliveTime設置keepAliveTime時間,還可以通過設置allowCoreThreadTimeOut參數(shù),這個參數(shù)默認是false,如果設置成ture,則會給核心線程數(shù)設置超時等待時間, 如果超過時間了核心線程就會銷毀.
ThreadFactory:
ThreadFactory是一個線程工廠,負責生產(chǎn)線程去執(zhí)行任務,默認的線程工廠,創(chuàng)建的線程會在同一個線程組,并且擁有一樣的優(yōu)先級,且都不是守護線程,我們也可自定義線程工廠,以便給線程自定義名字.
workQueue:
阻塞隊列,用來存放任務,我們主要分析一下5種阻塞隊列:
ArrayBlockingQueue是基于數(shù)組的有界阻塞隊列,按照FIFO排序,新來的隊列會放到隊列尾部,有界的數(shù)組可以防止資源被耗盡問題,當線程達到了核心線程數(shù),再來任務的時候就放到隊列的尾部,當隊列滿了的時候,則繼續(xù)創(chuàng)建非核心線程,如果線程數(shù)量達到了maxPoolSize,則會執(zhí)行拒絕策略.
LinkedBlockingQueue是基于鏈表的無界阻塞隊列(最大容量是Integer.MAX),按照FIFO排序,當線程池中線程數(shù)量達到核心線程數(shù)時,繼續(xù)來了新任務會一直存放到隊列中,而不會創(chuàng)建新線程.因此使用此隊列時,maxPoolSize是不起做的
SynchronousQueue是一個不緩存任務的阻塞隊列,當來了新任務的時候,不會緩存到隊列中,而是直接被線程執(zhí)行該任務,如果沒有核心線程可用就創(chuàng)建新線程去執(zhí)行任務,達到了maxPoolSize時,就執(zhí)行拒絕策略.
PriorityBlockingQueue是一個具有優(yōu)先級的無界阻塞隊列,優(yōu)先級通過參數(shù)Comparator實現(xiàn)
DelayedWorkQueu隊列的特點是內(nèi)部的任務并不是按照放入的時間排序,而是會按照延遲的時間長短對任務進行排序,內(nèi)部采用的是“堆”數(shù)據(jù)結構.而且它也是一個無界隊列.
handler:
拒絕策略是當線程池中任務達到了隊列最大容量,且線程數(shù)量也達到了最大maxPoolSize的時候,如果繼續(xù)有新任務來了,則執(zhí)行這個拒絕策略來處理新來的任務,jdk提供4種拒絕策略,它們都實現(xiàn)了RejectedExecutionHandler接口:
CallRunsPolicy:該策略下,在調(diào)用者線程中直接執(zhí)行被拒絕任務的run方法,就是誰提交的任務,誰負責執(zhí)行任務,這樣任務不會丟失,而且執(zhí)行任務比較費時,那么提交任務的線程也會被占用,就可以減緩任務提交速度.
AbortPolicy:該策略下,直接拋棄任務,并拋RejectedExecutionException異常.
DiscardPolicy:該策略下,直接拋棄任務.
DiscardOldestPolicy:該策略下,拋棄最早進入隊列中的那個任務,然后嘗試把這次拒絕的任務放入隊列.
除此之外,我們還可以通過實現(xiàn) RejectedExecutionHandler 接口來實現(xiàn)自己的拒絕策略,在接口中我們需要實現(xiàn)rejectedExecution方法,在rejectedExecution方法中,執(zhí)行例如暫存任務、重新執(zhí)行等自定義拒絕策略.
六種常見的線程池
FixedThreadPool
這個線程池的核心線程數(shù)和最大線程數(shù)是一樣的,所以可以看作是固定線程數(shù)的線程池,特點是當線程達到核心線程數(shù)后,如果任務隊列滿了,也不會創(chuàng)建額外的非核心線程去執(zhí)行任務,而是執(zhí)行拒絕策略.
CachedThreadPool
這個線程池叫做緩存線程池,特點是線程數(shù)幾乎是可以無限增加的(最大值是Integer.MAX_VALUE,基本不會達到),當線程閑置時還可以進行回收,而且它采用的存儲任務的隊列是SynchronousQueue隊列,隊列容量是0,實際不存儲任務,只負責對任務的中轉(zhuǎn)和傳遞,所以來一個任務線程池就看是否有空閑的線程,有的話就用空閑的線程去執(zhí)行任務,否則就創(chuàng)建一個線程去執(zhí)行,效率比較高.
ScheduledThreadPool
通過這個線程池的名字可以看出,它支持定時或者周期性執(zhí)行任務,實現(xiàn)這種功能的方法主要有三種:
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
service.schedule(new Task(), 10, TimeUnit.SECONDS);
service.scheduleAtFixedRate(new Task(), 10, 10, TimeUnit.SECONDS);
service.scheduleWithFixedDelay(new Task(), 10, 10, TimeUnit.SECONDS);
第一種是schedule,通過延遲指定時間后執(zhí)行一次任務,代碼中設置的是10秒,所以10秒后執(zhí)行一次任務就結束.
第二種是scheduleAtFixedRate,通過名稱我們可以看出,第二種是以固定頻率去執(zhí)行任務,它的第二個參數(shù)initialDelay表示第一次延遲時間,第三個參數(shù)period表示周期,總體按照上面的代碼意思就是,第一次延遲10秒后執(zhí)行任務,然后,每次延遲10秒執(zhí)行一次任務.
第三種方法是scheduleWithFixeddelay這種與第二種方法類似,也是周期執(zhí)行任務,不同的是對周期的定義,之前的scheduleAtFixedRate是以任務的開始時間為起點開始計時,時間到了就開始執(zhí)行第二次任務,而不管任務需要多久執(zhí)行,而scheduleWithFixeddelay是以任務結束的時間作為下一次循環(huán)開始的時間起點.
SingleThreadExecutor
第四種線程池中只有一個線程去執(zhí)行任務,如果執(zhí)行任務過程中發(fā)生了異常,則線程池會創(chuàng)建一個新線程來執(zhí)行后續(xù)任務,這個線程因為只有一個線程,所以可以保證任務執(zhí)行的有序性.
SingleThreadScheduleExecutor
這個線程池它和ScheduledThreadPool很相似,只不過它的內(nèi)部也只有一個線程,他只是將核心線程數(shù)設置為了1,如果執(zhí)行期間發(fā)生異常,同樣會創(chuàng)建一個新線程去執(zhí)行任務.
ForkJoinPool
最后一種線程池是ForkJoinPool,這個線程池是來支持將一個任務拆分成多個“小任務”并行計算,這個線程池是在jdk1.7之后加入的,它主要用于實現(xiàn)“分而治之”的算法,特別是分治之后遞歸調(diào)用的函數(shù),這里只是對ForkJoinPool做了一個簡單的介紹,我們先來介紹一下ForkJoinPool和之前的線程池主要的兩個特點。
第一點是fork和join:
我們現(xiàn)來看看fork和join的含義,fork就是將任務分解成多個子任務,多個子任務互相獨立,不受影響,執(zhí)行的時候可以利用 CPU 的多核優(yōu)勢,并行計算,計算完成后各個子任務在調(diào)用join方法進行結果匯總,第一步是拆分也就是 Fork,第二步是匯總也就是 Join,我們通過下圖來理解:
我們通過舉例斐波那契數(shù)列來展示這個線程池的使用。
1.首先我們創(chuàng)建任務類FibonacciTask繼承RecursiveTask類,重寫compute方法。其中的ForkJoinTask代表一個可以并行、合并的任務,F(xiàn)orkJoinTask是一個抽象類,它還有兩個抽象子類:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任務,而RecusiveAction代表沒有返回值的任務,
2.我們在compute方法中實現(xiàn)斐波那契數(shù)列計算并獲取返回值。
3.在main方法中創(chuàng)建ForkJoinPool,并調(diào)用線程池的submit(ForkJoinTask<T>task)方法,通過獲取返回的task.get()方法獲取計算的返回值。
任務類:FibonacciTask
/**
* 這里我們的定義任務類繼承RecursiveTask,需要重寫一個compute方法,或者任務執(zhí)行的返回值
* RecursiveAction和RecursiveTask是ForkJoinTask的兩個抽象子類,
* 其中的ForkJoinTask,代表一個可以并行、合并的任務其中RecursiveAction
* 表示沒有返回值的任務,RecursiveTask是有返回值的任務
*/
public class FibonacciTask extends RecursiveTask<Integer> {
private int i;
FibonacciTask(int i){
this.i=i;
}
@Override
protected Integer compute() {
if(i<=1){
return i;
}
FibonacciTask f1=new FibonacciTask(i-1);
//用 fork() 方法分裂任務并分別執(zhí)行
f1.fork();
FibonacciTask f2=new FibonacciTask(i-2);
f2.fork();
//使用 join() 方法把結果匯總
return f1.join()+f2.join();
}
}
main方法:
public static void main(String[] args) {
ForkJoinPool forkJoinPool=new ForkJoinPool();
for(int i=0;i<10;i++){
ForkJoinTask<Integer> task = forkJoinPool.submit(new FibonacciTask(i));
try {
System.out.println(task.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
計算結果如下:
第二點是內(nèi)部結構不同:
之前的線程池所有的線程共用一個隊列,但 ForkJoinPool 線程池中每個線程都有自己獨立的任務隊列,這個隊列是雙端隊列,如圖下所示:
ForkJoinPool 線程池內(nèi)部除了有一個共用的任務隊列之外,每個線程還有一個對應的雙端隊列 deque,這時一旦線程中的任務被 Fork 分裂了,分裂出來的子任務放入線程自己的 deque 里,而不是放入公共的任務隊列中(公共任務隊列采用數(shù)組存放),如果此時有三個子任務放入線程 t1 的 deque 隊列中,對于線程 t1 而言獲取任務的成本就降低了,可以直接在自己的任務隊列中獲取而不必去公共隊列中爭搶也不會發(fā)生阻塞(除了后面會講到的 steal 情況外),減少了線程間的競爭和切換,是非常高效的。
我們再考慮一種情況,此時線程有多個,而線程 t1 的任務特別繁重,分裂了數(shù)十個子任務,但是 t0 此時卻無事可做,它自己的 deque 隊列為空,這時為了提高效率,t0 就會想辦法幫助 t1 執(zhí)行任務,這就是“work-stealing”的含義。
雙端隊列 deque 中,線程 t1 獲取任務的邏輯是后進先出,也就是LIFO(Last In Frist Out),而線程 t0 在“steal”偷線程 t1 的 deque 中的任務的邏輯是先進先出,也就是FIFO(Fast In Frist Out),如圖所示,圖中很好的描述了兩個線程使用雙端隊列分別獲取任務的情景。你可以看到,使用 “work-stealing” 算法和雙端隊列很好地平衡了各線程的負載。
最后,我們用一張全景圖來描述 ForkJoinPool 線程池的內(nèi)部結構,你可以看到 ForkJoinPool 線程池和其他線程池很多地方都是一樣的,但重點區(qū)別在于它每個線程都有一個自己的雙端隊列來存儲分裂出來的子任務。ForkJoinPool 非常適合用于遞歸的場景,例如樹的遍歷、最優(yōu)路徑搜索等場景。
為什么不能直接自動創(chuàng)建線程池
首先自動創(chuàng)建線程池通過直接調(diào)用Executors.newCachedThreadPool()方法直接創(chuàng)建線程池.但是開發(fā)中我們不能直接使用創(chuàng)建的線程池,原因如下:
FixedThreadPool
通過下面FiexdThreadPool內(nèi)部代碼可以看出,FixedThreadPool內(nèi)部調(diào)用的是ThreadPoolExecutor的構造函數(shù),構造函數(shù)中是的的阻塞隊列是LinkedBlockingQueue,那么這就帶來了問題,當任務處理速度比較慢的時候,雖然新增任務越來越多,隊列中堆積的任務就越來越多,最終會占用大量內(nèi)存,并發(fā)生OOM,就會嚴重影響到程序運行.
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
SingleThreadExecutor
通過看下面SingleThreadExecutor的內(nèi)部代碼可以發(fā)現(xiàn),newSingleThreadExecutor和newFixedThreadPool的原理是一樣的,只不過是核心線程數(shù)和最大線程數(shù)都設置成了1,但是任務隊列還是無界的LinkedBlockingQueue,所以也會導致任務堆積,發(fā)生OOM問題.
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
CachedThreadPool
繼續(xù)看下圖CachedThreadPool內(nèi)部代碼,從代碼中可以看出,CachedThreadPool使用的任務阻塞隊列是SynchronousQueue,SynchronousQueue隊列我們前面介紹過,并不存儲任務,只是對任務進行直接轉(zhuǎn)發(fā),這個隊列不會引發(fā)OOM問題,但是我們在看最大線程數(shù)設置成了Integer.MAX_VALUE,所以CachedThreadPool線程池并不線程的數(shù)量,那么任務特別多的時候,就會創(chuàng)建非常多的線程,進而導致系統(tǒng)內(nèi)存不足.
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
ScheduledThreadPool和SingleThreadScheduledExector
ScheduledThreadPool和SingleThreadScheduledExector差不多,只不過是后者線程池中只有一個線程,ScheduledThreadPool的源碼如下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
我們在進入ScheduledThreadPoolExecutor構造方法中去,從下圖可以看出,它采用的任務隊列是DelayWorkQueue,上面我們說過這個隊列一個延遲隊列同時也是一個無界隊列,所以它和LinkedBlockingQueue一樣,如果任務過多就可能OOM,代碼如下:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}
SingleThreadExecutor
第四種線程池中只有一個線程去執(zhí)行任務,如果執(zhí)行任務過程中發(fā)生了異常,則線程池會創(chuàng)建一個新線程來執(zhí)行后續(xù)任務,這個線程因為只有一個線程,所以可以保證任務執(zhí)行的有序性.
如何自定義合適的線程池
這個也是面試中會被問到的問題,如果自定義合適的線程池呢?首先我們要調(diào)整線程池中的線程數(shù)量以便充分并合理的使用CPU和內(nèi)存資源,從而最大限度的提高性能.
CPU密集型任務
如果任務是一些列比較消耗CPU資源的任務,比如加密、解密、壓縮、計算等,那么最佳線程數(shù)是CPU核心數(shù)的1~2倍,過多很導致占用大量CPU資源,這時每個CPU的核心工作基本都是滿負荷,設置過多的線程會造成不必要的上下文切換,而導致性能下降,而且在同一臺機器上,我們還要考慮到其他會占用較多CPU資源的程序運行,然后做整體平衡.
耗時IO任務
例如數(shù)據(jù)庫、文件的讀寫,網(wǎng)絡通信等任務,這種任務的特點是不會消耗很多CPU資源,但是IO操作很費時.這個時候可以設置最大線程數(shù)一般會大于CPU核心線程數(shù)很多倍,因為IO速度相比于CPU速度比較慢,我們設置較少的線程數(shù),就會浪費CPU資源,如果設置更多的線程數(shù),那么一部分線程正在等待IO的時候,他們此時不需要CPU計算,就能有更多線程去執(zhí)行IO操作,任務隊列中的等待任務就會減少,更合理的利用了資源.
java并發(fā)編程實戰(zhàn)中有推薦:線程數(shù) = CPU核心數(shù) *(1+平均等待時間/平均工作時間),我們可以通過這個式子計算出一個合理的線程數(shù)量,同時也可以根據(jù)進行壓測、監(jiān)控jvm的線程情況等方式,確定線程數(shù),更合理的利用資源.
總結以上特點可以得出以下幾點:
線程的平均工作時間所占比例越高,就需要越少線程.
線程的平均等待時間所占比例越高,就需要越多的線程
針對不同的程序,進行對應的實際測試就可以獲得更合適的選擇.
如何正確關閉線程池
首先有5種在ThreadPoolExecutor中涉及的關閉線程的方法,我們挨個來分析。
void shutdown()
它可以安全的關閉一個線程池,調(diào)用shutdown()方法后,線程池不會立刻關閉,而是等執(zhí)行完正在執(zhí)行的任務和隊列中等待的任務后才徹底關閉,而且調(diào)用shutdown()方法后,如果還有新的任務繼續(xù)到來,那么線程池會根據(jù)拒絕策略直接拒絕后面來的新任務.
boolean isShutdown()
這個方法可以返回ture或者false來判斷是否已經(jīng)開始了關閉工作,也就是是否執(zhí)行了shutdown或者shutdownNow方法,調(diào)用isShutdown()方法后如果返回true,并不代表線程池已經(jīng)徹底關閉了,僅僅代表開始了關閉流程,仍然可能有線程正在執(zhí)行任務,隊列里也可能有任務等待被執(zhí)行.
boolean isTerminated()
這個方法可以檢測是否真正關閉了,不僅代表線程池是否已經(jīng)關閉,同時也代表線程池中的所有任務是否已經(jīng)都執(zhí)行完畢,比如已經(jīng)調(diào)用了shutdown()方法,但是有一個線程正在執(zhí)行任務,則此時調(diào)用isShutdown方法返回true,而調(diào)用isTerminated方法便返回false,,因為線程池中還有任務再執(zhí)行,線程池沒有真正關閉,直到所有線程都執(zhí)行完畢,任務都執(zhí)行完畢,再調(diào)用isTermainted就返回ture.
boolean awaitTermination(long timeout,TimeUnit unit),throws IntereuptedException
awaitTermination并不是用來關閉線程池的,而是用來判斷線程池狀態(tài)的,參數(shù)需要傳入一個時間,如果我們設置10秒鐘,那么會有以下幾種情況:
等待期間,線程池已經(jīng)關閉且所有提交的任務都執(zhí)行完畢,那么方法就返回ture,相當于線程池真正關閉了.
等待時間超時后,第一種情況未發(fā)生,那么方法返回false.
等待時間中,執(zhí)行任務的線程被中斷了,方法會拋出InterruptedException異常.
所以綜上可以看出,調(diào)用 awaitTermination 方法后當前線程會嘗試等待一段指定的時間,如果在等待時間內(nèi),線程池已關閉并且內(nèi)部的任務都執(zhí)行完畢了,也就是說線程池真正“終結”了,那么方法就返回 true,否則超時返回 fasle,我們則可以根據(jù) awaitTermination() 返回的布爾值來判斷下一步應該執(zhí)行的操作。
List<Runnable> shutdownNow()
調(diào)用shutdownNow()方法后,首先會給所有線程池中的線程發(fā)送interrupt中斷信號,嘗試中斷這些任務的執(zhí)行,然后就任務隊列中在等待被執(zhí)行的任務轉(zhuǎn)移到一個List中并返回,我們可以再根據(jù)List做一些操作,shutdownNow() 的源碼如下所示:
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//轉(zhuǎn)換線程運行狀態(tài)
advanceRunState(STOP);
//讓每一個已經(jīng)啟動的線程都中斷,如果被中斷的線程對于中斷信號不理不睬
//那么依然有可能導致任務不會停止
interruptWorkers();
//將隊列中任務放入tasks集合中,并返回.
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
線程池復用原理
線程池復用原理
線程池可以把線程和任務進行解耦,線程歸線程,任務歸任務,擺脫了之前通過 Thread 創(chuàng)建線程時的一個線程必須對應一個任務的限制。在線程池中,同一個線程可以從 BlockingQueue 中不斷提取新任務來執(zhí)行,其核心原理在于線程池對 Thread 進行了封裝,并不是每次執(zhí)行任務都會調(diào)用 Thread.start() 來創(chuàng)建新線程,而是讓每個線程去執(zhí)行一個“循環(huán)任務”,在這個“循環(huán)任務”中,不停地檢查是否還有任務等待被執(zhí)行,如果有則直接去執(zhí)行這個任務,也就是調(diào)用任務的 run 方法,把 run 方法當作和普通方法一樣的地位去調(diào)用,相當于把每個任務的 run() 方法串聯(lián)了起來,所以線程數(shù)量并不增加。其中execute代碼如下:
public void execute(Runnable command) {
//如果傳入的Runnable的空,就拋出異常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/**
* 當前線程數(shù)是否小于核心線程數(shù),如果小于核心線程數(shù)就調(diào)用 addWorker()
* 方法增加一個 Worker,這里的 Worker 就可以理解為一個線程
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
我們先來分析上面部分代碼,我們先分析下面這一段代碼:
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
我們主要看addWorK(comond,tue)方法,addWorker 方法的主要作用是在線程池中創(chuàng)建一個線程并執(zhí)行第一個參數(shù)傳入的任務,它的第二個參數(shù)是個布爾值,如果布爾值傳入 true 代表增加線程時判斷當前線程是否少于 corePoolSize,小于則增加新線程,大于等于則不增加;同理,如果傳入 false 代表增加線程時判斷當前線程是否少于 maxPoolSize,小于則增加新線程,大于等于則不增加,所以這里的布爾值的含義是以核心線程數(shù)為界限還是以最大線程數(shù)為界限進行是否新增線程的判斷。addWorker() 方法如果返回 true 代表添加成功,如果返回 false 代表添加失敗。
接下來我們看下面這部分代碼
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
//執(zhí)行拒絕策略
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
如果代碼執(zhí)行到這里,說明當前線程數(shù)大于或等于核心線程數(shù)或者 addWorker 失敗了,那么就需要通過 if (isRunning(c) && workQueue.offer(command)) 檢查線程池狀態(tài)是否為 Running,如果線程池狀態(tài)是 Running 就把任務放入任務隊列中,也就是 workQueue.offer(command)。如果線程池已經(jīng)不處于 Running 狀態(tài),說明線程池被關閉,那么就移除剛剛添加到任務隊列中的任務,并執(zhí)行拒絕策略。
接下來我們上面這部分代碼的else分支邏輯:
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
能進入這個 else 說明前面判斷到線程池狀態(tài)為 Running,那么當任務被添加進來之后就需要防止沒有可執(zhí)行線程的情況發(fā)生(比如之前的線程被回收了或意外終止了),所以此時如果檢查當前線程數(shù)為 0,也就是 workerCountOf**(recheck) == 0,那就執(zhí)行 addWorker() 方法新建線程。
接著我們再看最后一部分代碼:
else if (!addWorker(command, false))
reject(command);
執(zhí)行到這里,說明線程池不是 Running 狀態(tài)或線程數(shù)大于或等于核心線程數(shù)并且任務隊列已經(jīng)滿了,所以此時需要添加新線程,直到線程數(shù)達到“最大線程數(shù)”,所以此時就會再次調(diào)用 addWorker 方法并將第二個參數(shù)傳入 false,傳入 false 代表增加線程時判斷當前線程數(shù)是否少于 maxPoolSize,小于則增加新線程,大于等于則不增加,也就是以 maxPoolSize 為上限創(chuàng)建新的 worker;addWorker 方法如果返回 true 代表添加成功,如果返回 false 代表任務添加失敗,說明當前線程數(shù)已經(jīng)達到 maxPoolSize,然后執(zhí)行拒絕策略 reject 方法。如果執(zhí)行到這里線程池的狀態(tài)不是 Running,那么 addWorker 會失敗并返回 false,所以也會執(zhí)行拒絕策略 reject 方法。
所以看到這里我們需要著重分析addWorker()方法,這里的 Worker 可以理解為是對 Thread 的包裝,Worker 內(nèi)部有一個 Thread 對象,它正是最終真正執(zhí)行任務的線程,所以一個 Worker 就對應線程池中的一個線程,addWorker 就代表增加線程。我們看部分addWorker內(nèi)的方法:
boolean workerStarted = false;
boolean workerAdded = false;
//worker是內(nèi)部類實現(xiàn)了接口Runnable,封裝了Thread
Worker w = null;
try {
//獲取隊列第一個任務
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//集合,包含池中的所有工作線程。只有當持有主鎖時才能訪問。
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//調(diào)用線程的start方法。
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
通過上圖中的注釋我們可以看出,addWork方法實際上是調(diào)用自己封裝的線程的start方法來啟動線程,我們繼續(xù)看worker內(nèi)部類的run方法是如何實現(xiàn)的:
public void run() {
runWorker(this);
}
?
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//獲取第一個要執(zhí)行的任務,先進先出
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//實現(xiàn)線程復用的邏輯主要在一個不停循環(huán)的 while 循環(huán)體中
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//直接調(diào)用task的run方法,而不是新建線程
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
從上圖中我們可以看到內(nèi)部類worker的run()方法實際上是調(diào)用runWorker(this)方法,實現(xiàn)線程復用的邏輯主要是在一個不同的循環(huán)體while中進行,所以在runWorker(this)方法中主要做了兩件事:
通過取 Worker 的 firstTask 或者通過 getTask 方法從 workQueue 中獲取待執(zhí)行的任務。
直接調(diào)用 task 的 run 方法來執(zhí)行具體的任務(而不是新建線程,調(diào)用線程的start()方法)。
好了,本篇文章主要分析了線程池的基本概念和核心原理,也是作者對線程池學習的各方面的總結,基本上看完本篇文章能應對很多線程池的相關面試以及日常開發(fā)需求,如果有什么不足或者錯誤的地方希望讀者能給出建議!