線程池源碼分析-execute()方法

  • 初始化
// 初始化線程池的狀態(tài)和當(dāng)前線程數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 線程池的狀態(tài)和數(shù)量分別由一個(gè)32位的整形前3位和后29位表示
// 這個(gè)COUNT_BITS=29
private static final int COUNT_BITS = Integer.SIZE - 3;
// COUNT_MASK二進(jìn)制數(shù)值:0001 1111 1111 1111 1111 1111 1111 1111
// 29個(gè)1,基本上是用來(lái)做位運(yùn)算計(jì)算線程數(shù)量的
// 還有一點(diǎn)就是,這個(gè)數(shù)代表最大線程數(shù),因?yàn)?9個(gè)位置都是1
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

// 下面的5個(gè)狀態(tài),只需要關(guān)注二進(jìn)制的前3位即可

// RUNNING = 1110 0000 0000 0000 0000 0000 0000 0000
// -1的話,需要轉(zhuǎn)換成補(bǔ)碼才能向右移29位
private static final int RUNNING    = -1 << COUNT_BITS;
// SHUTDOWN = 0000 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// STOP = 0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP       =  1 << COUNT_BITS;
// TIDYING = 0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING    =  2 << COUNT_BITS;
// TERMINATED = 0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED =  3 << COUNT_BITS;

// 計(jì)算c這個(gè)數(shù)值和1110 0000 0000 0000 0000 0000 0000 0000相與的結(jié)果,其實(shí)就是計(jì)算線程池的狀態(tài)
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
// 計(jì)算當(dāng)前線程池的線程數(shù)量;c與0001 1111 1111 1111 1111 1111 1111 1111相與
private static int workerCountOf(int c)  { return c & COUNT_MASK; }
// rs為線程狀態(tài),wc是線程數(shù)量;一般使用這個(gè)方法計(jì)算ctl值,也就是狀態(tài)和數(shù)量共同值
private static int ctlOf(int rs, int wc) { return rs | wc; }

線程池對(duì)象初始化時(shí)執(zhí)行ctlOf(RUNNING,0)返回一個(gè)int值作為ctl的初始化值。RUNNING是前3位全部為1,與0執(zhí)行位運(yùn)算|,這個(gè)0就是初始化的時(shí)候是0個(gè)線程,得到的結(jié)果如下。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int ctlOf(int rs, int wc) { return rs | wc; }
image.png
  • 添加任務(wù)
public void execute(Runnable command) {
    // 添加的任務(wù)不能為空
    if (command == null)
        throw new NullPointerException();
    // ctl是一個(gè)32位的整形值,前3為代表線程池的狀態(tài),后29位代表當(dāng)前線程池中線程的數(shù)量
    int c = ctl.get();
    // workerCountOf(c)這個(gè)方法是通過(guò)位運(yùn)算計(jì)算當(dāng)前線程池線程數(shù)量;當(dāng)線程數(shù)量小于核心線程數(shù),那么執(zhí)行添加工作線程的操作
    if (workerCountOf(c) < corePoolSize) {
    // 創(chuàng)建新的工作線程;如果添加成功,直接返回;
        if (addWorker(command, true))
            return;
        // 添加工作線程不成功,有可能是當(dāng)前核心線程數(shù)已經(jīng)滿了;
        //在多線程執(zhí)行任務(wù)的時(shí)候,有可能判斷的時(shí)候確實(shí)沒(méi)有達(dá)到核心線程數(shù),但是當(dāng)真正添加的時(shí)候,前面已經(jīng)有任務(wù)添加達(dá)到了核心線程數(shù)了
        c = ctl.get();
    }
    // 這個(gè)時(shí)候,核心線程數(shù)已經(jīng)滿了;
    // 如果線程池還在運(yùn)行狀態(tài),那么就把這個(gè)任務(wù)添加到隊(duì)列中去
    if (isRunning(c) && workQueue.offer(command)) {
    // 重新獲取線程池狀態(tài)
        int recheck = ctl.get();
    // 再確定一下線程池是否處于運(yùn)行狀態(tài)
    // 如果線程池此刻沒(méi)有處于運(yùn)行狀態(tài),那么就把之前隊(duì)列中的任務(wù)推出來(lái)
        if (! isRunning(recheck) && remove(command))
        // 把推出來(lái)的任務(wù)執(zhí)行拒絕策略
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 上面如果添加到隊(duì)列中也失敗了,那么就創(chuàng)建非核心線程
    else if (!addWorker(command, false))
        // 如果添加非核心線程失敗,那么執(zhí)行拒絕策略
        reject(command);
}
image.png
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        // 開(kāi)啟自旋;先獲取線程池狀態(tài)及數(shù)量
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            // 如果c=[SHUTDOWN | STOP | TIDYING | TERMINATED]其中之一,并且c = [STOP | TIDYING | TERMINATED] 或者 任務(wù)不為空 或者等待隊(duì)列為空,就返回false
            //翻譯成人話,就是
            // 1.如果線程池的狀態(tài)處于[STOP | TIDYING | TERMINATED]中,那么就直接創(chuàng)建失敗
            // 2.如果線程池的狀態(tài)處于SHUTDOWN,并且任務(wù)不為空的話,那么也創(chuàng)建失敗。
            // 3.如果線程池的狀態(tài)處于SHUTDOWN,并且等待隊(duì)列為空,同樣創(chuàng)建失敗。
            // 上述條件下,滿足其一,都會(huì)創(chuàng)建新線程失敗,走向拒絕策略。
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;
            // 走到這里,說(shuō)明此刻線程池的狀態(tài)處于RUNNING
            for (;;) {
            // 1.如果是創(chuàng)建核心線程,那么就比較一下當(dāng)前線程數(shù)量是否大于等于核心線程數(shù)
            // 2.如果是創(chuàng)建非核心線程,那么比較一下當(dāng)前線程數(shù)是否大于等于最大線程數(shù)
            // 如果上述條件確實(shí)滿足,說(shuō)明就不能創(chuàng)建工作線程了,那就要?jiǎng)?chuàng)建失敗。
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                // 此時(shí)需要通過(guò)CAS的方式實(shí)現(xiàn)線程數(shù)+1,這個(gè)操作必須滿足原子性。
                if (compareAndIncrementWorkerCount(c))
                // 如果自增成功,說(shuō)明拿到了創(chuàng)建工作線程的通行證,那么就直接跳出雙重循環(huán),進(jìn)入下一關(guān)去真正地創(chuàng)建工作線程。
                    break retry;
                //沒(méi)有拿到通行證的話,就好好地自旋
                // 重新讀取狀態(tài)
                c = ctl.get();  // Re-read ctl
                // 如果當(dāng)前狀態(tài)處于[SHUTDOWN | STOP | TIDYING | TERMINATED]中,那么最終會(huì)返回false,創(chuàng)建工作線程失敗
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // 否則重新內(nèi)部自旋直至滿足以下情況
                // 1.線程數(shù)量超過(guò)限制,創(chuàng)建失敗。
                // 2.自增成功,拿到通行證,進(jìn)入下一步創(chuàng)建工作線程
                // 3.線程池狀態(tài)發(fā)生變化,最終創(chuàng)建失敗。
            }
        }
// 有通行證的才能執(zhí)行到這里
        // 下面新創(chuàng)建的工作線程是否開(kāi)始運(yùn)行的標(biāo)識(shí)
        boolean workerStarted = false;
        // 下面新創(chuàng)建的工作線程是否被添加到工作線程集合中
        boolean workerAdded = false;
        Worker w = null;
        try {
        // 創(chuàng)建一個(gè)工作線程;
        // 一個(gè)work綁定一個(gè)線程,創(chuàng)建work的同時(shí),會(huì)創(chuàng)建一個(gè)對(duì)應(yīng)的線程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
            // 獲取一個(gè)重入鎖
                final ReentrantLock mainLock = this.mainLock;
                // 上鎖;
                //如果看不明白的話,就把這兩行代碼替換成synchronized (this)
                mainLock.lock();
                // 解鎖前的邏輯都不存在線程安全的問(wèn)題
                try {
                    // 重新檢查線程池狀態(tài)
                    int c = ctl.get();
                    // 1.如果線程池狀態(tài)位RUNNUING
                    // 2.如果線程池狀態(tài)處于[RUNNING | SHUTDOWN],并且任務(wù)為null的情況
                    // 那么都可以把當(dāng)前工作線程放進(jìn)工作線程集合中
                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        // 確保剛剛創(chuàng)建出來(lái)的線程沒(méi)有還沒(méi)有運(yùn)行。其實(shí)就是沒(méi)有執(zhí)行start()方法。這種情況基本不存在。
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 把工作線程放進(jìn)集合中。
                        // 這個(gè)集合是HashSet,在獲取重入鎖的情況下不存在線程安全問(wèn)題。
                        workers.add(w);
                        // 計(jì)算當(dāng)前線程集合中的數(shù)量
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        //修改是否添加進(jìn)集合的標(biāo)識(shí)
                        workerAdded = true;
                    }
                } finally {
                // 解鎖
                    mainLock.unlock();
                }
                // 如果已經(jīng)添加進(jìn)集合中,那么開(kāi)啟線程
                if (workerAdded) {
                    t.start();
                    // 設(shè)置工作啟動(dòng)標(biāo)識(shí)
                    workerStarted = true;
                }
            }
        } finally {
        // 如果工作線程沒(méi)有啟動(dòng),那么
            if (! workerStarted)
            // 1.從集合中移除
            // 2.線程數(shù)減1
            // 3.嘗試tryTerminate()終止操作,不一定會(huì)終止
                addWorkerFailed(w);
        }
        // 返回新創(chuàng)建的工作線程是否啟動(dòng)的結(jié)果
        // 工作線程成功啟動(dòng)也證明成功地創(chuàng)建新的工作線程。
        return workerStarted;
    }
// 1.如果線程池的狀態(tài)處于[STOP | TIDYING | TERMINATED]中,那么就直接創(chuàng)建失敗
            // 2.如果線程池的狀態(tài)處于SHUTDOWN,并且任務(wù)不為空的話,那么也創(chuàng)建失敗。
            // 3.如果線程池的狀態(tài)處于SHUTDOWN,并且等待隊(duì)列為空,同樣創(chuàng)建失敗。
            // 上述條件下,滿足其一,都會(huì)創(chuàng)建新線程失敗,走向拒絕策略。
if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

上述代碼需要仔細(xì)解釋一下。

  • 首先如果線程池的狀態(tài)處于[STOP | TIDYING | TERMINATED]中,是不能再創(chuàng)建新的線程了,也不會(huì)接受新的任務(wù),直接走向拒絕策略
  • 其次如果是處于SHUTDOWN狀態(tài)的話,如果等待隊(duì)列不為空的話,可以接受創(chuàng)建新的工作線程來(lái)幫忙,但是不能接受新的任務(wù)。
  • 最后,如果處于SHUTDOWN狀態(tài),并且等待隊(duì)列中沒(méi)有任務(wù)的話,那么不接受創(chuàng)建新的線程,更不接受新的任務(wù)。

所以SHUTDOWN屬于軟關(guān)閉狀態(tài)。

// 1.如果是創(chuàng)建核心線程,那么就比較一下當(dāng)前線程數(shù)量是否大于等于核心線程數(shù)
            // 2.如果是創(chuàng)建非核心線程,那么比較一下當(dāng)前線程數(shù)是否大于等于最大線程數(shù)
            // 如果上述條件確實(shí)滿足,說(shuō)明就不能創(chuàng)建工作線程了,那就要?jiǎng)?chuàng)建失敗。
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;

其實(shí)addWorker(Runnable firstTask, boolean core)這個(gè)core的作用就是用來(lái)確定是否是創(chuàng)建核心線程,根據(jù)這個(gè)值來(lái)間接判斷對(duì)應(yīng)的線程數(shù)量是否超標(biāo)了。并且在后續(xù)創(chuàng)建工作線程的代碼中并沒(méi)有起到作用,所以可以判斷,核心線程與非核心線程并沒(méi)有本質(zhì)上的差別,在創(chuàng)建工作線程時(shí)并不會(huì)標(biāo)記某個(gè)線程是核心線程。

// 1.如果線程池狀態(tài)位RUNNUING
// 2.如果線程池狀態(tài)處于[RUNNING | SHUTDOWN],并且任務(wù)為null的情況
// 那么都可以把當(dāng)前工作線程放進(jìn)工作線程集合中
if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) 

上面的代碼強(qiáng)調(diào)兩點(diǎn):

  • RUNNING狀態(tài)可以創(chuàng)建工作線程和處理任務(wù)
  • SHUTDOWN狀態(tài),等待隊(duì)列還有任務(wù)的話,只允許創(chuàng)建新的線程幫忙處理剩下的任務(wù),不能添加新任務(wù)。


    image.png

    execute()方法基本分析完畢,下一篇分析Work這個(gè)類。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容