java和spring都提供了線程池的框架
java提供的是Executors;
spring提供的是ThreadPoolTaskExecutor;
一、基本使用
Executors提供了4個(gè)線程池,
- FixedThreadPool
- SingleThreadExecutor
- CachedThreadPool
- ScheduledThreadPool
FixedThreadPool-固定線程池
public class FixPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
for (int i =0;i<10;i++){
executorService.execute(()->{
Thread wt = Thread.currentThread();
String format = String.format("當(dāng)前線程名稱:%s,當(dāng)前時(shí)間:%s", wt.getName(), LocalDateTime.now());
System.out.println(format);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
executorService.shutdown();
}
}
效果如下:

可以看到5個(gè)線程在接任務(wù),接完5個(gè)任務(wù)之后就停止了1秒,完成之后繼續(xù)接任務(wù);
SingleThreadExecutor-單一線程池,線程數(shù)為1的固定線程池
為什么要?jiǎng)?chuàng)造這么一個(gè)線程池出來(lái)呢?
因?yàn)橛行r(shí)候需要用到線程池的隊(duì)列任務(wù)機(jī)制,又不想多線程并發(fā)。此時(shí)就需要用單一線程池了。
以下兩種寫法完全一樣的效果
ExecutorService executorService = Executors.newSingleThreadExecutor();
ExecutorService executorService = Executors.newFixedThreadPool(1);
CachedThreadPool-緩存線程池,線程數(shù)為無(wú)窮大的一個(gè)線程池
當(dāng)有空閑線程的時(shí)候就讓空閑線程去做任務(wù);
當(dāng)沒空閑線程的時(shí)候就新建一個(gè)線程去任務(wù);
public class CashPool {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i =0;;i++){
executorService.execute(()->{
Thread wt = Thread.currentThread();
String format = String.format("緩存線程池,當(dāng)前線程名稱:%s,當(dāng)前時(shí)間:%s", wt.getName(), LocalDateTime.now());
System.out.println(format);
try {
double d = Math.random();
int sleep = (int)(d*5000);
Thread.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread.sleep(100);
}
}
}
效果如下:

由于任務(wù)耗時(shí)不確定,所以線程池會(huì)動(dòng)態(tài)根據(jù)情況去判斷是否創(chuàng)建新的線程;
ScheduledThreadPool-調(diào)度線程池,線程會(huì)根據(jù)一定的時(shí)間規(guī)律去消化任務(wù)
分別有3個(gè)
- schedule(固定延時(shí)才執(zhí)行任務(wù))
- scheduleAtFixedRate(一定的間隔執(zhí)行一次任務(wù),執(zhí)行時(shí)長(zhǎng)不影響間隔時(shí)間)
- scheduleWithFixedDelay(一定的間隔執(zhí)行一次任務(wù),從任務(wù)執(zhí)行接觸才開始計(jì)算,執(zhí)行時(shí)長(zhǎng)影響間隔時(shí)間)
public class ScheduledPool {
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
String begin = String.format("調(diào)度線程池-begin,當(dāng)前時(shí)間:%s", LocalDateTime.now());
System.out.println(begin);
// schedule(pool);
scheduleAtFixedRate(pool);
// scheduleWithFixedDelay(pool);
}
private static void scheduleAtFixedRate(ScheduledExecutorService pool){
pool.scheduleAtFixedRate(()->{
Thread wt = Thread.currentThread();
String format = String.format("scheduleAtFixedRate-調(diào)度線程池,當(dāng)前線程名稱:%s,當(dāng)前時(shí)間:%s", wt.getName(), LocalDateTime.now());
System.out.println(format);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
},1,1, TimeUnit.SECONDS);
}
private static void scheduleWithFixedDelay(ScheduledExecutorService pool){
pool.scheduleWithFixedDelay(()->{
Thread wt = Thread.currentThread();
String format = String.format("scheduleWithFixedDelay-調(diào)度線程池,當(dāng)前線程名稱:%s,當(dāng)前時(shí)間:%s", wt.getName(), LocalDateTime.now());
System.out.println(format);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
},1,1, TimeUnit.SECONDS);
}
private static void schedule(ScheduledExecutorService pool){
for (int i =0;i<10;i++){
pool.schedule(()->{
Thread wt = Thread.currentThread();
String format = String.format("調(diào)度線程池,當(dāng)前線程名稱:%s,當(dāng)前時(shí)間:%s", wt.getName(), LocalDateTime.now());
System.out.println(format);
},5, TimeUnit.SECONDS);
}
pool.shutdown();
}
}
二、原理講解
上面介紹的4個(gè)線程池工具,都是基于一個(gè)類ThreadPoolExecutor
ThreadPoolExecutor有幾個(gè)重要的參數(shù)
- corePoolSize,核心線程數(shù)
- maximumPoolSize,最大線程數(shù)
- keepAliveTime,線程空閑時(shí)間,和4組合使用
- unit
- workQueue,工作隊(duì)列,是各個(gè)工具的機(jī)制策略的核心
- threadFactory ,生成線程的工廠,可以代理run方法,還有給thread命名
- handler,拒絕策略,當(dāng)任務(wù)太多的時(shí)候會(huì)執(zhí)行的方法,框架有4個(gè)實(shí)現(xiàn)好的策略,可以自己寫自己的策略。
對(duì)于fixPool線程池,corePoolSize=maximumPoolSize=n,keepAliveTime=0,workQueue=LinkedBlockingQueue,threadFactory和handler都是默認(rèn)的。

對(duì)于cashPool線程池,corePoolSize=0,maximumPoolSize=2^32,keepAliveTime=60s,workQueue=SynchronousQueue,threadFactory和handler都是默認(rèn)的。

我們先看一看execute方法

其中ctl參數(shù)是一個(gè)核心參數(shù),保存著線程池的運(yùn)行狀態(tài)和線程數(shù)量,通過workerCountOf()獲取當(dāng)前的工作線程數(shù)量。
execute整個(gè)過程分成3個(gè)部分。
- 當(dāng)工作線程數(shù)少于核心線程數(shù),創(chuàng)建一個(gè)工作線程去消化任務(wù)。
- 當(dāng)線程池在運(yùn)行狀態(tài)而且能把任務(wù)放到隊(duì)列,則接受任務(wù),調(diào)用addWorker讓線程去消化隊(duì)列的任務(wù)。
- 讓線程獲取消化任務(wù)失敗,拒絕任務(wù)。

核心一 workQueue.offer
對(duì)于fixPool,由于workQueue是LinkedBlockingQueue,所以offer方法基本會(huì)返回true。
對(duì)于cashpool,workQueue是SynchronousQueue,如果沒有消費(fèi)者在take,則會(huì)立馬返回false,然后立馬新建一個(gè)線程。
核心二 getTask
每個(gè)線程都被包裝成worker對(duì)象,worker對(duì)象會(huì)執(zhí)行自己的runWorker方法,方法在死循環(huán)不停得調(diào)用getTask方法去消化任務(wù)。

getTask里面最核心的是take和poll方法,這個(gè)是跟你傳入的隊(duì)列特性有關(guān)。

對(duì)于spring提供的ThreadPoolTaskExecutor,其實(shí)也是對(duì)ThreadPoolExecutor的一個(gè)封裝。
具體看initializeExecutor方法

在執(zhí)行execute方法的時(shí)候,也是執(zhí)行ThreadPoolExecutor的execute方法。
結(jié)論,線程池的核心是ThreadPoolExecutor類,根據(jù)傳入的workQueue的offer、poll、take方法特性的不同而誕生緩存線程池,固定線程池,調(diào)度線程等各種線程策略。
github地址:https://github.com/hd-eujian/threadpool.git
碼云地址: https://gitee.com/guoeryyj/threadpool.git