Java中的線程池在各種開(kāi)源庫(kù)中頻頻出現(xiàn),是程序員必須掌握的一個(gè)知識(shí)點(diǎn),今日終于下定決心學(xué)習(xí)一下其源代碼。
首先大致說(shuō)一下線程池的工作原理:
- 如果Worker線程的數(shù)量小于核心池的容量corePoolSize,那么新建一個(gè)Worker線程,并加入到線程池
- 如果Worker線程的數(shù)量大于等于corePoolSize,且等待隊(duì)列workerQueue沒(méi)有滿的情況,那么會(huì)將該任務(wù)放入到等待隊(duì)列中
- 如果Worker線程數(shù)大于等于corePoolSize,但小于最大線程容量maxPoolSize,而且等待隊(duì)里已經(jīng)滿了,那么會(huì)新建一個(gè)Worker線程放入到線程池中
- 如果Worker線程數(shù)大于等于maxPoolSize,那么會(huì)執(zhí)行rejecet拒絕策略
相信上面的工作原理,大家從很多地方都看到過(guò),接下來(lái)我們就一步一步按照這個(gè)思路來(lái)看看線程池是如何實(shí)現(xiàn)這種思路的。
一、基礎(chǔ)開(kāi)篇
在進(jìn)行學(xué)習(xí)時(shí),這里現(xiàn)需要了解幾個(gè)重要的成員,提前理解這些成員在工作中的作用很有幫助。
1. 狀態(tài)變量
下面定義的一系列變量用來(lái)表明當(dāng)前線程池所處于的狀態(tài)。
private static final int COUNT_BITS = Integer.SIZE - 3; // 其值為29
private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 高三位為000,剩余位為1
private static final int RUNNING = -1 << COUNT_BITS; // 高三位:111,剩余位為0
private static final int SHUTDOWN = 0 << COUNT_BITS; // 高三位:000,剩余位為0
private static final int STOP = 1 << COUNT_BITS; // 高三位:001,剩余位為0
private static final int TIDYING = 2 << COUNT_BITS; // 高三位:010,剩余位為0
private static final int TERMINATED = 3 << COUNT_BITS; // 高三位:011,剩余位為0
// 從c中解析處高三位
private static int runStateOf(int c) { return c & ~CAPACITY; } // 得到c的高三位
private static int workerCountOf(int c) { return c & CAPACITY; } // 得到c的低29位
private static int ctlOf(int rs, int wc) { return rs | wc; } // rs代表runState、wc代表workCount,該方法將他們包裝成ctl
關(guān)于線程池的不同狀態(tài),有如下幾個(gè)說(shuō)明需要了解:
- RUNNING:該狀態(tài)下線程池可以接受新的task任務(wù),也是唯一一種可接受新的task的狀態(tài)
- SHUTDOWN:該狀態(tài)下線程池不會(huì)接受新的task任務(wù),但是會(huì)等待正在進(jìn)行中的以及workerQueue中處于等待狀態(tài)的任務(wù)完成(注意:這是是指SHUTDOWN狀態(tài),并不是調(diào)用相關(guān)的方法),調(diào)用shutdown()方法會(huì)進(jìn)入該狀態(tài)
- STOP:該狀態(tài)下線程池既不會(huì)接受新的task任務(wù),會(huì)中斷所有Worker線程,也會(huì)舍棄處于workerQueue中的、等待狀態(tài)的任務(wù);調(diào)用shutdownNow()方法會(huì)進(jìn)入該狀態(tài)
- TIDYING:同上;當(dāng)workerCount為0、等待隊(duì)列為空時(shí),執(zhí)行tryTerminate方法中會(huì)變成此狀態(tài)
- TERMINATED:同上;當(dāng)執(zhí)行tryTerminate方法過(guò)程中調(diào)用了terminated方法會(huì)變?yōu)榇藸顟B(tài)
關(guān)于上面的結(jié)論,我們?cè)诜治鲈创a的過(guò)程中一一進(jìn)行說(shuō)明。
需要說(shuō)明的是,一個(gè)int的前三位被用來(lái)區(qū)分線程池不同的狀態(tài),而后面的29位用來(lái)保存worker線程的數(shù)量,注意,因?yàn)橛脕?lái)表示worker線程數(shù)量的bit位只有29位了,所以線程池可以持有worker線程理論上最大的數(shù)量就是CAPACITY。而后面的三個(gè)方法就很明顯可以看出其功能分別為:從一個(gè)int值中解析出當(dāng)前的state狀態(tài)、從一個(gè)int值中解析出當(dāng)前worker線程的數(shù)量、將一個(gè)表示state的int和一個(gè)表示worker數(shù)量的int合成一個(gè)int。
說(shuō)了這么多,沒(méi)錯(cuò),在線程池中就定義了這么一個(gè)int值一上面所說(shuō)的形式保存了state和worker數(shù)量:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
而且可以發(fā)現(xiàn),其初始化就是調(diào)用的ctlOf()方法,其初始狀態(tài)下state為RUNNING、worker線程數(shù)量為0,很符合場(chǎng)景。
2. 關(guān)鍵成員
上面一直提到worker線程,這里有必要介紹一下,實(shí)際上Worker是一個(gè)內(nèi)部類,并不是一個(gè)線程。說(shuō)來(lái)慚愧,直到現(xiàn)在才明白了線程與Runnable的區(qū)別。線程就是一個(gè)線程,而Runnable(不如說(shuō)是Task)就是一個(gè)“任務(wù)”,這個(gè)任務(wù)我們通常是將它放到線程中去執(zhí)行,從而營(yíng)造一種“異步”的錯(cuò)覺(jué);說(shuō)白了,線程就是程序向進(jìn)程申請(qǐng)的一塊資源,他可以用這個(gè)線程去做很多很多的、不同的任務(wù),二者并不能用一個(gè)等號(hào)相連。在線程池中,二者這種分離的關(guān)系非常明顯。
在線程池中,Worker的定義如下:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
首先,Worker實(shí)現(xiàn)了兩個(gè)接口:AbstractQueuedSynchronizer、Runnable。這就意味著,Worker有兩種角色,首先由于AQS這個(gè)接口,Worker自身是一把鎖,其可以實(shí)現(xiàn)加解鎖的功能(Java中的可重入鎖、讀寫鎖等都是這個(gè)接口的實(shí)現(xiàn)類);其次,Worker也是一個(gè)Runnable的實(shí)現(xiàn)類,這就說(shuō)其可以作為一個(gè)參數(shù)傳遞給Thread的構(gòu)造函數(shù),從而將其run方法作為Thread的run方法實(shí)現(xiàn)。
更關(guān)鍵的是,Worker有一個(gè)Thread成員,沒(méi)錯(cuò),之所以叫他Worker線程就是因?yàn)榫€程池中的線程就是封裝在了一個(gè)個(gè)Worker中,在線程池自始至終,Worker和線程的數(shù)量是一一對(duì)應(yīng)的,所以在理解上可以將二者畫上等號(hào)??梢钥吹?,在Worker的構(gòu)造函數(shù)中,thread在初始化的時(shí)候傳遞的就是Worker本身,上面剛說(shuō)過(guò),Worker是Runnable的實(shí)現(xiàn)類,所以這個(gè)thread的run方法實(shí)現(xiàn)就是Worker的方法實(shí)現(xiàn),所以后面可以看到,啟動(dòng)一個(gè)Worker只需要簡(jiǎn)單的調(diào)用該Worker內(nèi)部的thread.start()方法即可,而Worker的run方法實(shí)現(xiàn)中的runWorker方法蘊(yùn)含了線程池復(fù)用線程的奧秘。
3. 構(gòu)造函數(shù)
ThreadPoolExecutor有多種構(gòu)造函數(shù),但是歸根結(jié)底是調(diào)用的如下的:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
其中的threadFactory和handler分別可以不顯式指定,采用默認(rèn)值,threadFactory的默認(rèn)值為:Executors.defaultThreadFactory(),該成員用來(lái)創(chuàng)建Worker內(nèi)部的thread,如果你留心的話會(huì)發(fā)現(xiàn)上面在介紹Worker時(shí),其構(gòu)造函數(shù)中對(duì)thread的初始化就是使用的threadFactory實(shí)現(xiàn)的;handler的默認(rèn)值為:new AbortPolicy(),該類的拒絕策略是直接拋棄對(duì)應(yīng)的task,不做任何處理。
二、工作方法
線程池有一系列public方法供我們使用,接下來(lái)就一步一步分析。
1. execute方法
線程池的execute方法用來(lái)接收一個(gè)Runnable對(duì)象,之后針對(duì)當(dāng)前線程池的狀態(tài)對(duì)Runnable對(duì)象采取不同的處理:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 注意:這里第一個(gè)參數(shù)為null
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
// 綜上可以發(fā)現(xiàn),如果線程池的狀態(tài)runState >= SHUTDOWN;那么其總會(huì)執(zhí)行拒絕策略
// 因此只有RUNNING狀態(tài)才可以addWorker
}
接下來(lái)我們就來(lái)分析一下這個(gè)方法中所蘊(yùn)含著的、我們開(kāi)篇就說(shuō)到的線程池的工作原理。首先要說(shuō)明的一點(diǎn),該方法開(kāi)始就對(duì)command進(jìn)行了非空校驗(yàn),所以后面的command都是非空的,這一點(diǎn)對(duì)于addWorker()方法執(zhí)行成功與否有一定影響。
原理一:如果Worker線程的數(shù)量小于核心池的容量corePoolSize,那么新建一個(gè)Worker線程,并加入到線程池
可以發(fā)現(xiàn)第一個(gè)if語(yǔ)句:
if (workerCountOf(c) < corePoolSize) {
/**
* add失敗原因有兩個(gè):
* 1. 由于其他線程的并發(fā)操作,突然使得workCount到達(dá)了corePoolSize
* 2. 由于其他線程的并發(fā)操作,突然使得線程池的state >= SHUTDOWN
*/
if (addWorker(command, true))
return;
c = ctl.get();
}
從ctl中得到當(dāng)前的worker線程的數(shù)量,判斷如果此時(shí)小于corePoolSize,那么會(huì)直接進(jìn)行addWorker新建一個(gè)線程并且直接承載該command任務(wù)。
注意:這里只是說(shuō)明線程池的工作思想,大家不要拘泥于execute每一步的結(jié)果。因?yàn)椋谂袛鄔orker線程數(shù)量小于corePoolSize后,執(zhí)行的addWorker是有失敗的可能的,即返回false。但是后面你就會(huì)明白,除了其他地方并發(fā)調(diào)用了線程池的shutdown()方法,導(dǎo)致線程池不再接收新的task而拒絕添加worker之外;另外的情況是其他地方也是并發(fā)調(diào)用了addWorker方法并成功添加了,導(dǎo)致此時(shí)worker線程數(shù)量大于等于corePoolSize了,從而使得本次addWorker調(diào)用失敗。但是,那個(gè)并發(fā)調(diào)用addWorker成功的場(chǎng)景同樣是workerCount < corePoolSize,與線程池的原理一并不矛盾。
(關(guān)于addWorker失敗的情況我們下文會(huì)分析)
原理二:如果Worker線程的數(shù)量大于等于corePoolSize,且等待隊(duì)列workerQueue沒(méi)有滿的情況,那么會(huì)將該任務(wù)放入到等待隊(duì)列中
這塊兒的內(nèi)容位于第二個(gè)if語(yǔ)句,首先,能夠執(zhí)行到這里的前提是第一個(gè)if語(yǔ)句條件不成立,或者第一個(gè)if語(yǔ)句addWorker失敗,返回了false;不管是哪種條件,都蘊(yùn)含著當(dāng)前線程池的狀態(tài)說(shuō)明了:workerCount >= corePoolSize:
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
首先能看到首先對(duì)線程池的狀態(tài)進(jìn)行判斷,如果不是處于RUNNING狀態(tài),那么會(huì)拒絕接受新的任務(wù),這里正說(shuō)明了:只有RUNNING狀態(tài)的線程池才可以接受添加新的worker、接受新的task。如果處于RUNNING狀態(tài),那么會(huì)首先將其加入任務(wù)隊(duì)列。注意,該if代碼塊中,雖然有采取addWorker的情況,但是可以發(fā)現(xiàn)該addWorker在調(diào)用的時(shí)候,傳遞的Runnable參數(shù)為null,并不是command,所以線程池的原理二中沒(méi)說(shuō)添加一個(gè)worker線程是準(zhǔn)確的、成立的。
讀者可能會(huì)對(duì)這個(gè)代碼塊里面的處理感覺(jué)有些迷惑,接下來(lái)我們就來(lái)詳細(xì)分析一下。首先來(lái)看第一個(gè)if語(yǔ)句:
if (! isRunning(recheck) && remove(command))
reject(command);
首先解釋一下為何還要recheck一下線程池的狀態(tài),同原理一的時(shí)候一樣,就怕其他地方并發(fā)調(diào)用了線程池的shutdown()方法,所以這里的if語(yǔ)句第一個(gè)判斷就是判斷是否還是RUNNING狀態(tài),如果此時(shí)線程池不是RUNNING了,那么if的第二個(gè)條件負(fù)責(zé)將已經(jīng)加入到等待隊(duì)列中的任務(wù)移除,這也符合處于SHUTDOWN狀態(tài)的線程池拒絕接受新的任務(wù)的設(shè)定。
好了,如果線程池不處于RUNNING狀態(tài),且成功的將已經(jīng)添加到等待隊(duì)列中的任務(wù)吐了出來(lái),那么就可以執(zhí)行reject策略了。再來(lái)看看else if的處理:
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
在這里我們主要分析兩個(gè)點(diǎn):
- 為何要調(diào)用addWorker創(chuàng)建新的worker線程?
- 為何參數(shù)是null而不是command?(關(guān)于這個(gè)問(wèn)題首先肯定一個(gè)原因是任務(wù)已經(jīng)被添加到等待隊(duì)列中了,如果在傳遞給addWorker,如果成功的話那么該task會(huì)被執(zhí)行兩次)
首先回答第一個(gè)疑惑,我們回想一下上面的if語(yǔ)句,何時(shí)回進(jìn)入到elseif的判斷。答案是有以下幾種情況:
- isRunning方法返回true,線程池處于RUNNING狀態(tài),可以接受新的task,這樣一來(lái)第一個(gè)判斷結(jié)果就導(dǎo)致整個(gè)if條件直接就返回false了,所以第二個(gè)判斷沒(méi)有必要執(zhí)行
- isRunning方法返回false,線程池處于其他狀態(tài),拒絕接受新的task,這樣一來(lái)就有必要執(zhí)行第二個(gè)判斷了,但是不幸的是第二個(gè)也返回false,也就是將task從等待隊(duì)列中移除失敗了
對(duì)于第一種情況,我們冷靜地想一想,當(dāng)前線程池處于RUNNING,但是workerCount >= corePoolSize,為什么還要有workerCount==0這么一個(gè)判斷啊?答案就是,線程池的corePoolSize允許被設(shè)置為0?。∷?,如果corePoolSize真的被設(shè)置為了0,又是首次執(zhí)行execute方法,那么當(dāng)前workerCount就真的為0,所以需要添加一個(gè)worker來(lái)處理這個(gè)task。那樣就會(huì)產(chǎn)生了問(wèn)題二,為啥不將addWorker的第一個(gè)參數(shù)設(shè)置為command,而是null?答案還在于并發(fā)操作的問(wèn)題,在進(jìn)行完if判斷之后,可能由于并發(fā)操作,線程池突然不是RUNNING狀態(tài)了,這就等價(jià)于情況二了,我們接下來(lái)分析。
對(duì)于情況二產(chǎn)生一種后果,那就是:由于線程不處于RUNNING狀態(tài),本應(yīng)該拒絕接受新的task,所以該從等待隊(duì)列中移除的task沒(méi)有被成功移除。對(duì)于runState > SHUTDOWN的情況我們不用擔(dān)心,線程池在調(diào)用相應(yīng)的改變線程池狀態(tài)的方法時(shí)會(huì)處理掉等待隊(duì)列中的task。唯有SHUTDOWN狀態(tài)是個(gè)特殊點(diǎn),如果您記憶好的話,應(yīng)該會(huì)記得上面在介紹線程池的狀態(tài)的時(shí)候,對(duì)于SHUTDOWN狀態(tài),雖然不會(huì)接受新的task,但是他會(huì)等待隊(duì)列中待處理的任務(wù)都處理完畢,沒(méi)錯(cuò),這里既然有一個(gè)沒(méi)有被移除的task,如果你是SHUTDOWN狀態(tài)的話需要處理掉該task。如果此時(shí)workerCount不為0的話,那么該task只要等待就好,早晚會(huì)輪到他;但是如果此時(shí)workerCount為0的話,那么就需要調(diào)用addWorker來(lái)專門添加一個(gè)worker線程來(lái)處理這個(gè)漏網(wǎng)之魚了。
接下來(lái)就來(lái)回答第二個(gè)問(wèn)題,為啥addWorker的時(shí)候Runnable不傳遞command而是null?答案就在addWorker方法的實(shí)現(xiàn)中,在方法實(shí)現(xiàn)中會(huì)有判斷:如果當(dāng)前是SHUTDOWN狀態(tài),那么只有worker的firstTask為null才可以將worker線程添加到線程池中,才能執(zhí)行該方法鏈:worker.thread.run()->worker.runWorker()->getTask().run();從而完成這個(gè)漏網(wǎng)之魚。
不僅感嘆作者思維之縝密?。?/p>
原理三:如果Worker線程數(shù)大于等于corePoolSize,但小于最大線程容量maxPoolSize,而且等待隊(duì)里已經(jīng)滿了,那么會(huì)新建一個(gè)Worker線程放入到線程池中
該原理蘊(yùn)含在最后一個(gè)elseif語(yǔ)句中,首先同樣說(shuō)一下,能夠執(zhí)行到這里的隱含條件:“線程池狀態(tài)>=SHUTDOWN || workerQueue.offer()返回false,即等待隊(duì)列已滿”:
/**
* 首先,能夠執(zhí)行到這里的條件是:
* 1. 線程池的狀態(tài) >= SHUTDOWN
* 2. 線程池的狀態(tài)為RUNNING,但是workQueue的offer方法為false,即隊(duì)列已滿
*
* 其次,else if返回true的條件,也即最終執(zhí)行拒絕策略的情況:
* 1. 線程池的狀態(tài) >= SHUTDOWN
* 2. workerCount >= maxPoolSize
*
* 結(jié)合上面兩塊分析,可以發(fā)現(xiàn)如果workQueue是滿的導(dǎo)致第二個(gè)if中offer方法失敗
* 但是workerCount < maxPoolSize的情況下,addWorker方法會(huì)成功增加一個(gè)Worker
* 線程來(lái)處理任務(wù)
*/
else if (!addWorker(command, false))
reject(command);
可以很直觀的看到,addWorker方法的副作用就是新創(chuàng)建一個(gè)Worker線程并且承載當(dāng)前的command任務(wù),所以原理三也是正確的。
實(shí)際上原理三和原理一是一模一樣的,addWorker第二個(gè)參數(shù)為true說(shuō)明將workerCount與corePoolSize進(jìn)行比較,為false說(shuō)明與maxPoolSize進(jìn)行比較;同樣addWorker失敗的場(chǎng)景也是:并發(fā)導(dǎo)致線程池的狀態(tài)>=SHUTDOWN或者并發(fā)導(dǎo)致workerCount>=maxPoolSize;但這些都是execute步驟的具體結(jié)果,同原理一一樣,這里也同樣不與原理三的思想相違背。
原理四:如果Worker線程數(shù)大于等于maxPoolSize,那么會(huì)執(zhí)行rejecet拒絕策略
原理四同樣蘊(yùn)含在原理三中,正是原理三elseif條件失敗的結(jié)果,執(zhí)行reject策略。失敗的結(jié)果有兩種:workerCount>=maxPoolSize或者當(dāng)前線程池的狀態(tài)>=SHUTDOWN。
2. addWorker方法
addWorker方法在執(zhí)行execute的時(shí)候在一些場(chǎng)景使用到了,該方法的作用就是創(chuàng)建一個(gè)Worker線程,并且交給線程池維護(hù),返回true說(shuō)明操作成功:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
該方法的代碼很長(zhǎng),但是可以分成兩個(gè)部分:
- retry代表的雙重自旋鎖檢查:該部分的作用是檢查線程池的ctl的值,看看是否支持創(chuàng)建先的Worker線程;但是這一步只是查看是否允許,并沒(méi)有真的創(chuàng)建Worker線程,更別說(shuō)交給線程池維護(hù)。
- 剩下的就是新建Worker線程,并且添加到線程池中,最后啟動(dòng)Worker線程。
retry雙重鎖檢查
retry是一個(gè)嵌套的自旋代碼,外層自旋負(fù)責(zé)檢查線程池的狀態(tài)runState,內(nèi)層循環(huán)負(fù)責(zé)檢查workerCount是否滿足限制、并且能夠在被允許的情況下更新線程池的ctl的值。
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
首先我們來(lái)看看,外層自旋是如何通過(guò)線程池的runState來(lái)進(jìn)行判斷的:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
...
}
這個(gè)if條件寫的太過(guò)抽象,我們將它等價(jià)的轉(zhuǎn)化一下:
(rs > SHUTDOWN || (rs == SHUTDOWN && firstTask != null) || rs == SHUTDOWN && workQueue.isEmpty())
很明顯,如果當(dāng)前線程池的狀態(tài)為上面的三個(gè)中的一個(gè),那么直接會(huì)返回false表示addWorker失敗,接下來(lái)我們就挨個(gè)分析這三個(gè)條件。
第一個(gè)是rs>SHUTDOWN:這正是第一部分介紹的線程池的狀態(tài)runState>SHUTDOWN都拒絕接受新的task的來(lái)源。
第二個(gè)是(rs == SHUTDOWN && firstTask != null),初看大家可能會(huì)覺(jué)得有些摸不著頭腦,但是結(jié)合我們上面分析execute方法時(shí)的場(chǎng)景就會(huì)明白了。在上面我們說(shuō)過(guò)了,SHUTDOWN狀態(tài)的線程池按理說(shuō)是不會(huì)接受新的task的,而通過(guò)addWorker傳遞的task就表明線程池準(zhǔn)備接受該task,所以一旦addWorker最終返回true了,那么就真的表明SHUTDOWN狀態(tài)下的線程池允許接受新的task;這顯然就矛盾了,所以就直接返回false,并不會(huì)往下執(zhí)行給你機(jī)會(huì)。
注意:對(duì)于execute方法傳遞的Runnable任務(wù),該任務(wù)的歸宿有三種:被rejecet策略處理;放入等待隊(duì)列中等待worker處理,雖然這樣會(huì)出現(xiàn)被空閑的Worker線程立刻拿去處理,但是我們還是將它當(dāng)做是先進(jìn)入等待隊(duì)列中等待;被一個(gè)新的Worker線程承載直接處理
第三個(gè)就是線程池處于SHUTDOWN狀態(tài),同時(shí)當(dāng)前等待隊(duì)列為空,此時(shí)就更不允許addWorker成功了,直接返回false。
看完了外層的自旋鎖檢查,接下來(lái)就看看內(nèi)層的自旋鎖是干什么的:
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
內(nèi)層自旋鎖的檢查很簡(jiǎn)單,就是看不同情況下當(dāng)前worker線程的數(shù)量是否達(dá)到了上限,值得注意的是最后的一個(gè)if代碼,這里再次檢查就是為了怕別的地方并發(fā)操作更改了線程池的runState。
經(jīng)過(guò)了這么一趟折騰,只是知道了線程池可以準(zhǔn)許添加新的worker線程,并且已經(jīng)修改好了線程池的ctl字段,并沒(méi)有一些實(shí)質(zhì)性的進(jìn)展;就好比蓋房子之前要向有關(guān)部門辦理很多證明一樣,此時(shí)還未開(kāi)工。
新建Worker并交給線程池維護(hù)
上面的兩個(gè)自旋鎖進(jìn)行新建Worker的申請(qǐng),接下來(lái)就是新建Worker線程并且交給線程池處理的代碼了:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
注意,可以看到該方法最終返回的是workerStarted,這就是說(shuō)明雖然兩重自旋鎖獲取了線程池的準(zhǔn)許,但是并不能保證一定能成功,原因還是因?yàn)槠渌胤讲l(fā)調(diào)用了線程池的某些方法導(dǎo)致線程池的狀態(tài)發(fā)生了改變;注意,由于上面的雙自旋檢查中通過(guò)CAS操作更改了線程池的workerCount,也就是本次addWorker操作已經(jīng)預(yù)定了一個(gè)Worker的位置,所以不會(huì)存在其他地方并發(fā)操作導(dǎo)致workerCount的數(shù)量突然超出了限制,從而因此本次addWorker的失敗的情況。因此,這里的情況一般是由于其他地方調(diào)用了shutdown、shutdownNow等方法改變了線程池的runState。
上述代碼可以簡(jiǎn)化成下面的偽代碼:
新建Worker;
lock();
if(符合條件):
將Worker放入線程池;
設(shè)置標(biāo)記為true;
unlock();
if(Worker放入線程池成功):
啟動(dòng)Worker;
if(Worker放入線程池失敗):
恢復(fù)線程池的狀態(tài)ctl中的workerCount
實(shí)際上根據(jù)上面的偽代碼就很容易明白了,首先這里著重給出了加鎖和解鎖兩個(gè)動(dòng)作,這很重要,因?yàn)?strong>想要將Worker線程放入線程池中,需要檢查線程池的runState狀態(tài),此時(shí)需要獲取線程池的全局鎖,而能夠改變線程池的runState的方法都需要獲取線程池的全局鎖,所以這里加鎖就能保證在將Worker線程添加入線程池的過(guò)程中不會(huì)發(fā)生線程池runState的突變。
其次,這里的if條件為:
(rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null))
第一個(gè)線程池的runState狀態(tài)為RUNNING很顯然符合線程池的原理;而第二個(gè)條件就很有趣了,正是我們?cè)诜治鰁xecute的原理二的時(shí)候說(shuō)到的,此時(shí)線程池的狀態(tài)為SHUTDOWN,但是此時(shí)addWorker調(diào)用時(shí)傳遞的Runnable為null,這保證了雖然會(huì)新建一個(gè)Worker線程,但是這個(gè)線程并沒(méi)有直接承載task任務(wù),而是用來(lái)處理等待隊(duì)列中哪些已經(jīng)存在的、處于等待狀態(tài)的task,并不違背SHUTDOWN狀態(tài)下的線程池不允許接受新task的原則。
注意當(dāng)添加Worker失敗之后,finally代碼塊中會(huì)執(zhí)行一個(gè)方法addWorkerFailed():
private void addWorkerFailed(Worker w) {
// 該方法也需要獲取全局鎖
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
這里的decrementWorkerCount方法將雙自旋檢查中對(duì)workerCount的增1操作回滾了。關(guān)于該方法的其他代碼這里先不做分析。
Worker線程的啟動(dòng)
第一部分介紹Worker這個(gè)類的時(shí)候提到過(guò),Worker實(shí)現(xiàn)了Runnable接口,其被作為一個(gè)參數(shù)傳遞給了ThreadFactory的newThread方法,所以其run方法就是thread的run方法的實(shí)現(xiàn),因此這里啟動(dòng)Worker的代碼就是簡(jiǎn)單的一句t.start():
if (workerAdded) {
t.start();
workerStarted = true;
}
3. runWorker方法
runWorker方法是線程池中Worker線程的工作方法,其不停地從等待隊(duì)列中取出任務(wù)處理:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
很顯眼,Worker線程不停地從等待隊(duì)列中取task就是通過(guò)一個(gè)自旋實(shí)現(xiàn)的;值得注意的是,在自旋代碼之前有一個(gè)w.unlock(),因此,在沒(méi)有任務(wù)執(zhí)行期間,一個(gè)Worker線程是允許被中斷的;而由于得到一個(gè)任務(wù)之后,worker就加鎖了,所以在一個(gè)Worker執(zhí)行任務(wù)期間是無(wú)法被中斷的。
while循環(huán)中的getTask就是不停的從等待隊(duì)列中取task,如果隊(duì)列為空的話,那么該Worker線程會(huì)阻塞在getTask()處,并且成為了一個(gè)空閑的Worker線程。
第一個(gè)if語(yǔ)句負(fù)責(zé)在線程池的runState>=STOP的時(shí)候中斷Worker內(nèi)部的線程。
需要說(shuō)的一句話是,上面有兩個(gè)方法beforeExecute和afterExecute,線程池并沒(méi)有實(shí)現(xiàn)這兩個(gè)方法,開(kāi)發(fā)人員可以根據(jù)需要實(shí)現(xiàn)這兩個(gè)方法做一些處理。
而如果該方法能從while循環(huán)中跳出去,說(shuō)明getTask()方法返回了null,在后面分析getTask()方法時(shí)會(huì)知道,返回null說(shuō)明該線程空閑太多時(shí)間,需要被清除掉了。而負(fù)責(zé)清除Worker線程的方法就是finally塊中的processWorkerExit方法,該方法負(fù)責(zé)將參數(shù)中的Worker線程從線程池的容器中移除。
4. getTask方法
該方法負(fù)責(zé)從等待隊(duì)列中取出一個(gè)task交給空閑的Worker線程處理,其返回null的情況有以下幾種:
- runState >= STOP:和剛開(kāi)始介紹線程池狀態(tài)時(shí)說(shuō)道的相一致,該狀態(tài)下的線程池不會(huì)等待隊(duì)列中待處理的task
- runState == SHUTDOWN && workerQueue.isEmpty():由于SHUTDOWN下線程池不會(huì)接受新的task,而此時(shí)隊(duì)列為空,所以空閑的Worker永遠(yuǎn)不會(huì)有task處理了,直接返回null
- workerCount > maxPoolSize:符合線程池的原則,超出最大容量的空閑線程需要被處理(注意要保證此時(shí)最少有一個(gè)Worker線程能夠處理task或者等待隊(duì)列里面沒(méi)有task了)
- Worker線程空閑時(shí)間超時(shí)(注意要保證此時(shí)最少有一個(gè)Worker線程能夠處理task或者等待隊(duì)列里面沒(méi)有task了):符合線程池的機(jī)制
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
首先可以看出,這里又是一個(gè)無(wú)限循環(huán)。但是值得注意的一點(diǎn)是:getTask會(huì)阻塞并不是因?yàn)檫@個(gè)無(wú)限循環(huán),而是workQueue的take方法;對(duì)于需要考慮超時(shí)的情況下workQueue.poll返回null緊接著就會(huì)導(dǎo)致getTask返回null從而超時(shí)的空閑Worker會(huì)被回收;但是對(duì)于那些沒(méi)有超時(shí)限制的Worker,當(dāng)隊(duì)列為空時(shí)他們會(huì)阻塞在workQueue.take()方法處。
對(duì)于大部分情況的代碼很簡(jiǎn)單,唯一需要分析的只有對(duì)于空閑超時(shí)的處理,其中的timed變量的作用是表示當(dāng)前是否需要處理空閑超時(shí)的問(wèn)題,如果timed為false,那么即使出現(xiàn)了空閑超時(shí)的Worker線程那么也不會(huì)去理會(huì)。而timed為true的情況有兩種:
- allowCoreThreadTimeOut為true,只要這個(gè)為true,不管什么線程都要進(jìn)行超時(shí)監(jiān)控
- wc > corePoolSize:該情況下如果allowCoreThreadTimeOut為false的話,會(huì)保證非核心線程會(huì)被超時(shí)回收
分析了超時(shí)的情況,再來(lái)看看超時(shí)是怎么產(chǎn)生的。
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;
如果沒(méi)能return成功,說(shuō)明r為null,說(shuō)明超時(shí)了。
5. shutdown方法
上面很多地方都要進(jìn)行線程狀態(tài)runState的判斷,而SHUTDOWN狀態(tài)又是一個(gè)很特殊的狀態(tài),我們這就看看線程池關(guān)閉時(shí)的動(dòng)作有哪些:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}
可以看到,調(diào)用shutdown方法需要獲取全局鎖,因此addWorker中往線程池中添加Worker線程的步驟不可能與shutdown并發(fā),所以保證了二者的操作是安全的。
其中chechShutdownAccess用來(lái)檢查各個(gè)Worker線程的中thread是否是可訪問(wèn)的。
而advanceRunState方法使用自旋鎖+CAS的方式將runState設(shè)置為SHUTDOWN。
interruptIdleWorkers方法負(fù)責(zé)將空閑的Worker線程中的thread執(zhí)行interrupt操作,有意思的是該方法內(nèi)部對(duì)Worker線程是否是空閑狀態(tài)的判斷為執(zhí)行worker的tryLock()方法,上面在分析runWorker方法是提到,對(duì)于有task需要執(zhí)行的Worker線程,在執(zhí)行task之前Worker會(huì)進(jìn)行加鎖操作,因此可以通過(guò)tryLock()方法的返回值是true判斷Worker處于工作狀態(tài)。
onShutdown方法是一個(gè)空方法,留給開(kāi)發(fā)者根據(jù)需求自己實(shí)現(xiàn)。
回收空閑線程的原理
shutdown方法會(huì)調(diào)用interruptIdleWorkers方法來(lái)回收空閑線程:
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 對(duì)于處于工作狀態(tài)的Worker線程,tryLock會(huì)失敗
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
代碼符合我們上面的描述,但是問(wèn)題來(lái)了,這是怎么回收的呢?因?yàn)樯鲜龃a只是將所有空閑的Worker的線程執(zhí)行了interrupt方法,只是將其中斷標(biāo)記為設(shè)置為了true,并不影響線程的運(yùn)行啊?
答案就在getTask()這個(gè)方法中,如下代碼會(huì)拋出中斷異常,導(dǎo)致在getTask的自旋鎖中重新開(kāi)始進(jìn)行判斷:
try {
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
還記得getTask方法返回null的場(chǎng)景中一個(gè)是runState為SHUTDOWN而且此時(shí)等待隊(duì)列中沒(méi)有task,沒(méi)錯(cuò),由于執(zhí)行了shutdown方法,此時(shí)線程池的狀態(tài)已經(jīng)變?yōu)榱薙HUTDOWN,而空閑Worker之所以空閑,不就是因?yàn)榇藭r(shí)等待隊(duì)列為空嗎,所以getTask返回null的條件直接成立,而在runWorker中如果getTask返回的是null,那么會(huì)執(zhí)行processWorkerExit方法將Worker移除,繞了一圈shutdown方法是通過(guò)getTask返回null來(lái)觸發(fā)processWorkerExit方法移除空閑線程的。
6. shutdownNow
該方法就比較狠了,就連等待隊(duì)列中的任務(wù)也會(huì)被舍棄。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
// 這個(gè)方法比較狠:
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
該方法的大致流程與shutdown方法一樣,只不過(guò)這里是將線程池的runState設(shè)置為STOP。
中斷所有線程的原理
最大的不同就在于這里執(zhí)行的是interruptWorkers方法,該方法負(fù)責(zé)將所有的Worker中斷,即使你是active的,這樣符合我們?cè)陂_(kāi)篇中介紹線程池不同runState含義時(shí)的描述。
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
由于涉及到中斷工作狀態(tài)的Worker,所以該方法內(nèi)部調(diào)用的是worker的interruptIfStarted方法,而不是調(diào)用thread.interrupt()方法。
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
可見(jiàn),結(jié)合interruptWorkers和interruptIfStarted方法實(shí)現(xiàn)將所有Worker線程執(zhí)行interrupt操作。
而shutdownNow方法中斷所有線程的原理也是利用getTask方法返回null的特點(diǎn),只不過(guò)這里所有的Worker線程都被中斷了,而且此時(shí)線程池的runState為STOP,所以直接滿足getTask返回null的第一個(gè)條件,所以回收所有Worker的原理為:“Worker線程被中斷-getTask中等待隊(duì)列取出任務(wù)的代碼發(fā)生中斷異常-getTask自旋檢查時(shí)發(fā)現(xiàn)runState為STOP直接返回null-觸發(fā)runWorker中的processWorkerExit方法移除Worker線程”。
而最后的drainQueue()方法負(fù)責(zé)將等待隊(duì)列中等待的task取出,并最終返回給開(kāi)發(fā)者。
7. Worker的回收
上面講過(guò),回收Worker是使用的processWorkerExit方法,而在我們的場(chǎng)景中,通常是runWorker方法中的getTask方法返回null時(shí)最終會(huì)回調(diào)processWorkerExit方法,此時(shí)傳遞的第二個(gè)參數(shù)為false:
final void runWorker(Worker w) {
...
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
...
}
completedAbruptly = false;
} finally {
// 用來(lái)將沒(méi)有剩余價(jià)值的worker從線程池中移除
processWorkerExit(w, completedAbruptly);
}
}
接下來(lái)我們就看一看processWorkerExit方法是如何回收Worker線程的:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
可見(jiàn),該方法首先做的事情是將該Worker線程從線程池中移除,之后調(diào)用了tryTerminate()方法,最后判斷是否需要返回;注意,這里并沒(méi)有維護(hù)WorkerCount的值的處理,是因?yàn)樵撎幚碓趃etTask方法中return null之前做了。值得提醒的就是最后這一段判斷:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
...
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
不管是調(diào)用shutdown方法還是日常維護(hù)Worker線程的數(shù)量,這里的min參數(shù)在一些情況下會(huì)保證Worker線程的數(shù)量在corePoolSize。這在平常的場(chǎng)景是必要的,但是如果我們調(diào)用了shutdown方法,那么就會(huì)產(chǎn)生一些問(wèn)題,當(dāng)隊(duì)列中只剩下一個(gè)task時(shí),還有兩個(gè)Worker線程去getTask請(qǐng)求,都到達(dá)了等待隊(duì)列的take方法處,此時(shí)會(huì)有一個(gè)一直阻塞到這里,那么也就導(dǎo)致被阻塞的Worker永遠(yuǎn)阻塞,同時(shí)也無(wú)法被回收了。
沒(méi)錯(cuò),針對(duì)該問(wèn)題,解決的方案就在tryTerminate方法中:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
首先這是一個(gè)無(wú)限循環(huán),然后我們看看開(kāi)始的兩個(gè)if語(yǔ)句是如何判斷終止terminate的:
- 線程池的runState為RUNNING或者大于等于TIDYING
- 線程池的runState為SHUTDOWN且等待隊(duì)列不為空
- workerCount不為0,此時(shí)嘗試中斷一個(gè)空閑的Worker線程,之后return()
該方法正是通過(guò)不停的中斷一個(gè)空閑的Worker線程來(lái)解決上面的阻塞在隊(duì)列的take方法的Worker線程無(wú)法回收的問(wèn)題的。
接下來(lái)我們來(lái)看看后面的一堆代碼的工作:注意,代碼能夠執(zhí)行到這里,隱含了上述三個(gè)條件都為false的情況,此時(shí)線程池的狀態(tài)為:runState是SHUTDOWN、等待隊(duì)列為空、workerCount為0,這種狀態(tài)明顯線程池沒(méi)有任何任務(wù)、也不會(huì)有新的任務(wù)、同時(shí)Worker線程都被回收了,終于到了結(jié)束線程池的時(shí)候了?。?/p>
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
第一個(gè)if就是更改線程池的ctl狀態(tài),然后執(zhí)行terminated方法,最后更改線程池的狀態(tài)為TERMINATED表明線程池已經(jīng)結(jié)束了。terminated方法是一個(gè)空方法,留給開(kāi)發(fā)者自己根據(jù)需求實(shí)現(xiàn)。
至此,線程池的生命算是結(jié)束了。