- 初始化
// 初始化線程池的狀態(tài)和當(dāng)前線程數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 線程池的狀態(tài)和數(shù)量分別由一個(gè)32位的整形前3位和后29位表示
// 這個(gè)COUNT_BITS=29
private static final int COUNT_BITS = Integer.SIZE - 3;
// COUNT_MASK二進(jìn)制數(shù)值:0001 1111 1111 1111 1111 1111 1111 1111
// 29個(gè)1,基本上是用來(lái)做位運(yùn)算計(jì)算線程數(shù)量的
// 還有一點(diǎn)就是,這個(gè)數(shù)代表最大線程數(shù),因?yàn)?9個(gè)位置都是1
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;
// 下面的5個(gè)狀態(tài),只需要關(guān)注二進(jìn)制的前3位即可
// RUNNING = 1110 0000 0000 0000 0000 0000 0000 0000
// -1的話,需要轉(zhuǎn)換成補(bǔ)碼才能向右移29位
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN = 0000 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// STOP = 0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS;
// TIDYING = 0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS;
// TERMINATED = 0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS;
// 計(jì)算c這個(gè)數(shù)值和1110 0000 0000 0000 0000 0000 0000 0000相與的結(jié)果,其實(shí)就是計(jì)算線程池的狀態(tài)
private static int runStateOf(int c) { return c & ~COUNT_MASK; }
// 計(jì)算當(dāng)前線程池的線程數(shù)量;c與0001 1111 1111 1111 1111 1111 1111 1111相與
private static int workerCountOf(int c) { return c & COUNT_MASK; }
// rs為線程狀態(tài),wc是線程數(shù)量;一般使用這個(gè)方法計(jì)算ctl值,也就是狀態(tài)和數(shù)量共同值
private static int ctlOf(int rs, int wc) { return rs | wc; }
線程池對(duì)象初始化時(shí)執(zhí)行ctlOf(RUNNING,0)返回一個(gè)int值作為ctl的初始化值。RUNNING是前3位全部為1,與0執(zhí)行位運(yùn)算|,這個(gè)0就是初始化的時(shí)候是0個(gè)線程,得到的結(jié)果如下。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }

image.png
- 添加任務(wù)
public void execute(Runnable command) {
// 添加的任務(wù)不能為空
if (command == null)
throw new NullPointerException();
// ctl是一個(gè)32位的整形值,前3為代表線程池的狀態(tài),后29位代表當(dāng)前線程池中線程的數(shù)量
int c = ctl.get();
// workerCountOf(c)這個(gè)方法是通過(guò)位運(yùn)算計(jì)算當(dāng)前線程池線程數(shù)量;當(dāng)線程數(shù)量小于核心線程數(shù),那么執(zhí)行添加工作線程的操作
if (workerCountOf(c) < corePoolSize) {
// 創(chuàng)建新的工作線程;如果添加成功,直接返回;
if (addWorker(command, true))
return;
// 添加工作線程不成功,有可能是當(dāng)前核心線程數(shù)已經(jīng)滿了;
//在多線程執(zhí)行任務(wù)的時(shí)候,有可能判斷的時(shí)候確實(shí)沒(méi)有達(dá)到核心線程數(shù),但是當(dāng)真正添加的時(shí)候,前面已經(jīng)有任務(wù)添加達(dá)到了核心線程數(shù)了
c = ctl.get();
}
// 這個(gè)時(shí)候,核心線程數(shù)已經(jīng)滿了;
// 如果線程池還在運(yùn)行狀態(tài),那么就把這個(gè)任務(wù)添加到隊(duì)列中去
if (isRunning(c) && workQueue.offer(command)) {
// 重新獲取線程池狀態(tài)
int recheck = ctl.get();
// 再確定一下線程池是否處于運(yùn)行狀態(tài)
// 如果線程池此刻沒(méi)有處于運(yùn)行狀態(tài),那么就把之前隊(duì)列中的任務(wù)推出來(lái)
if (! isRunning(recheck) && remove(command))
// 把推出來(lái)的任務(wù)執(zhí)行拒絕策略
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 上面如果添加到隊(duì)列中也失敗了,那么就創(chuàng)建非核心線程
else if (!addWorker(command, false))
// 如果添加非核心線程失敗,那么執(zhí)行拒絕策略
reject(command);
}

image.png
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 開(kāi)啟自旋;先獲取線程池狀態(tài)及數(shù)量
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
// 如果c=[SHUTDOWN | STOP | TIDYING | TERMINATED]其中之一,并且c = [STOP | TIDYING | TERMINATED] 或者 任務(wù)不為空 或者等待隊(duì)列為空,就返回false
//翻譯成人話,就是
// 1.如果線程池的狀態(tài)處于[STOP | TIDYING | TERMINATED]中,那么就直接創(chuàng)建失敗
// 2.如果線程池的狀態(tài)處于SHUTDOWN,并且任務(wù)不為空的話,那么也創(chuàng)建失敗。
// 3.如果線程池的狀態(tài)處于SHUTDOWN,并且等待隊(duì)列為空,同樣創(chuàng)建失敗。
// 上述條件下,滿足其一,都會(huì)創(chuàng)建新線程失敗,走向拒絕策略。
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
// 走到這里,說(shuō)明此刻線程池的狀態(tài)處于RUNNING
for (;;) {
// 1.如果是創(chuàng)建核心線程,那么就比較一下當(dāng)前線程數(shù)量是否大于等于核心線程數(shù)
// 2.如果是創(chuàng)建非核心線程,那么比較一下當(dāng)前線程數(shù)是否大于等于最大線程數(shù)
// 如果上述條件確實(shí)滿足,說(shuō)明就不能創(chuàng)建工作線程了,那就要?jiǎng)?chuàng)建失敗。
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
// 此時(shí)需要通過(guò)CAS的方式實(shí)現(xiàn)線程數(shù)+1,這個(gè)操作必須滿足原子性。
if (compareAndIncrementWorkerCount(c))
// 如果自增成功,說(shuō)明拿到了創(chuàng)建工作線程的通行證,那么就直接跳出雙重循環(huán),進(jìn)入下一關(guān)去真正地創(chuàng)建工作線程。
break retry;
//沒(méi)有拿到通行證的話,就好好地自旋
// 重新讀取狀態(tài)
c = ctl.get(); // Re-read ctl
// 如果當(dāng)前狀態(tài)處于[SHUTDOWN | STOP | TIDYING | TERMINATED]中,那么最終會(huì)返回false,創(chuàng)建工作線程失敗
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// 否則重新內(nèi)部自旋直至滿足以下情況
// 1.線程數(shù)量超過(guò)限制,創(chuàng)建失敗。
// 2.自增成功,拿到通行證,進(jìn)入下一步創(chuàng)建工作線程
// 3.線程池狀態(tài)發(fā)生變化,最終創(chuàng)建失敗。
}
}
// 有通行證的才能執(zhí)行到這里
// 下面新創(chuàng)建的工作線程是否開(kāi)始運(yùn)行的標(biāo)識(shí)
boolean workerStarted = false;
// 下面新創(chuàng)建的工作線程是否被添加到工作線程集合中
boolean workerAdded = false;
Worker w = null;
try {
// 創(chuàng)建一個(gè)工作線程;
// 一個(gè)work綁定一個(gè)線程,創(chuàng)建work的同時(shí),會(huì)創(chuàng)建一個(gè)對(duì)應(yīng)的線程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 獲取一個(gè)重入鎖
final ReentrantLock mainLock = this.mainLock;
// 上鎖;
//如果看不明白的話,就把這兩行代碼替換成synchronized (this)
mainLock.lock();
// 解鎖前的邏輯都不存在線程安全的問(wèn)題
try {
// 重新檢查線程池狀態(tài)
int c = ctl.get();
// 1.如果線程池狀態(tài)位RUNNUING
// 2.如果線程池狀態(tài)處于[RUNNING | SHUTDOWN],并且任務(wù)為null的情況
// 那么都可以把當(dāng)前工作線程放進(jìn)工作線程集合中
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
// 確保剛剛創(chuàng)建出來(lái)的線程沒(méi)有還沒(méi)有運(yùn)行。其實(shí)就是沒(méi)有執(zhí)行start()方法。這種情況基本不存在。
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 把工作線程放進(jìn)集合中。
// 這個(gè)集合是HashSet,在獲取重入鎖的情況下不存在線程安全問(wèn)題。
workers.add(w);
// 計(jì)算當(dāng)前線程集合中的數(shù)量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//修改是否添加進(jìn)集合的標(biāo)識(shí)
workerAdded = true;
}
} finally {
// 解鎖
mainLock.unlock();
}
// 如果已經(jīng)添加進(jìn)集合中,那么開(kāi)啟線程
if (workerAdded) {
t.start();
// 設(shè)置工作啟動(dòng)標(biāo)識(shí)
workerStarted = true;
}
}
} finally {
// 如果工作線程沒(méi)有啟動(dòng),那么
if (! workerStarted)
// 1.從集合中移除
// 2.線程數(shù)減1
// 3.嘗試tryTerminate()終止操作,不一定會(huì)終止
addWorkerFailed(w);
}
// 返回新創(chuàng)建的工作線程是否啟動(dòng)的結(jié)果
// 工作線程成功啟動(dòng)也證明成功地創(chuàng)建新的工作線程。
return workerStarted;
}
// 1.如果線程池的狀態(tài)處于[STOP | TIDYING | TERMINATED]中,那么就直接創(chuàng)建失敗
// 2.如果線程池的狀態(tài)處于SHUTDOWN,并且任務(wù)不為空的話,那么也創(chuàng)建失敗。
// 3.如果線程池的狀態(tài)處于SHUTDOWN,并且等待隊(duì)列為空,同樣創(chuàng)建失敗。
// 上述條件下,滿足其一,都會(huì)創(chuàng)建新線程失敗,走向拒絕策略。
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
上述代碼需要仔細(xì)解釋一下。
- 首先如果線程池的狀態(tài)處于[STOP | TIDYING | TERMINATED]中,是不能再創(chuàng)建新的線程了,也不會(huì)接受新的任務(wù),直接走向拒絕策略
- 其次如果是處于SHUTDOWN狀態(tài)的話,如果等待隊(duì)列不為空的話,可以接受創(chuàng)建新的工作線程來(lái)幫忙,但是不能接受新的任務(wù)。
- 最后,如果處于SHUTDOWN狀態(tài),并且等待隊(duì)列中沒(méi)有任務(wù)的話,那么不接受創(chuàng)建新的線程,更不接受新的任務(wù)。
所以SHUTDOWN屬于軟關(guān)閉狀態(tài)。
// 1.如果是創(chuàng)建核心線程,那么就比較一下當(dāng)前線程數(shù)量是否大于等于核心線程數(shù)
// 2.如果是創(chuàng)建非核心線程,那么比較一下當(dāng)前線程數(shù)是否大于等于最大線程數(shù)
// 如果上述條件確實(shí)滿足,說(shuō)明就不能創(chuàng)建工作線程了,那就要?jiǎng)?chuàng)建失敗。
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
其實(shí)addWorker(Runnable firstTask, boolean core)這個(gè)core的作用就是用來(lái)確定是否是創(chuàng)建核心線程,根據(jù)這個(gè)值來(lái)間接判斷對(duì)應(yīng)的線程數(shù)量是否超標(biāo)了。并且在后續(xù)創(chuàng)建工作線程的代碼中并沒(méi)有起到作用,所以可以判斷,核心線程與非核心線程并沒(méi)有本質(zhì)上的差別,在創(chuàng)建工作線程時(shí)并不會(huì)標(biāo)記某個(gè)線程是核心線程。
// 1.如果線程池狀態(tài)位RUNNUING
// 2.如果線程池狀態(tài)處于[RUNNING | SHUTDOWN],并且任務(wù)為null的情況
// 那么都可以把當(dāng)前工作線程放進(jìn)工作線程集合中
if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null))
上面的代碼強(qiáng)調(diào)兩點(diǎn):
- RUNNING狀態(tài)可以創(chuàng)建工作線程和處理任務(wù)
-
SHUTDOWN狀態(tài),等待隊(duì)列還有任務(wù)的話,只允許創(chuàng)建新的線程幫忙處理剩下的任務(wù),不能添加新任務(wù)。
image.png
execute()方法基本分析完畢,下一篇分析Work這個(gè)類。
