并發(fā)編程之ForkJoin框架原理分析

點(diǎn)贊再看,養(yǎng)成習(xí)慣,搜一搜【一角錢技術(shù)】關(guān)注更多原創(chuàng)技術(shù)文章。本文 GitHub org_hejianhui/JavaStudy 已收錄,有我的系列文章。

前言

前面我們介紹了線程池框架(ExecutorService)的兩個具體實(shí)現(xiàn):

線程池為線程生命周期的開銷和資源不足問題提供了解決方案。通過對多個任務(wù)重用線程,線程創(chuàng)建的開銷被分?jǐn)偟蕉鄠€任務(wù)上。Java7 又提供了的一個用于并行執(zhí)行的任務(wù)的框架 Fork/Join ,是一個把大任務(wù)分割成若干個小任務(wù),最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。在介紹Fork/Join 框架之前我們先了解幾個概念:CPU密集型、IO密集型,再逐步深入去認(rèn)識Fork/Join 框架。

任務(wù)性質(zhì)類型

CPU密集型(CPU bound)

CPU密集型也叫計算密集型,指的是系統(tǒng)的硬盤、內(nèi)存性能相對于CPU要好很好多,此時,系統(tǒng)運(yùn)作大部分的狀況是 CPU Loading 100%,CPU要讀/寫 I/O(硬盤/內(nèi)存),I/O在很短的時間就可以完成,而CPU還有許多運(yùn)算要處理,CPU Loading很高。

在多重程序系統(tǒng)中,大部分時間用來做計算、邏輯判斷等CPU動作的程序稱之 CPU bound。例如一個計算圓周率至小數(shù)點(diǎn)一千位以下的程序,在執(zhí)行的過程當(dāng)中絕大部分時間在用三角函數(shù)和開根號的計算,便是屬于CPU bound的程序。

CPU bound的程序一般而言CPU占用率相當(dāng)高。這可能是因為任務(wù)本身不太需要訪問I/O設(shè)備,也可能是因為程序是多線程實(shí)現(xiàn)因此屏蔽了等待I/O的時間。

線程數(shù)一般設(shè)置為:線程數(shù) = CPU核數(shù) + 1(現(xiàn)代CPU支持超線程)

IO密集型(I/O bound)

I/O密集型指的是系統(tǒng)的CPU性能相對硬盤、內(nèi)存要好很多,此時,系統(tǒng)運(yùn)作,大部分的狀況是 CPU 在等 I/O(硬盤/內(nèi)存)的讀/寫操作,此時 CPU Loading 并不高。

I/O bound的程序一般在達(dá)到性能極限時,CPU占用率仍然較低。這可能是因為任務(wù)本身需要大量I/O操作,而 pipeline 做的不是很好,沒有充分利用處理器能力。

線程數(shù)一般設(shè)置為:線程數(shù) = ((線程等待時間 + 線程CPU時間) / 線程CPU時間) * CPU數(shù)目

CPU密集型 VS I/O密集型

我們可以把任務(wù)分為計算密集型和I/O密集型。

計算密集型任務(wù)的特點(diǎn)是要進(jìn)行大量的計算,消耗CPU資源,比如計算圓周率、對視頻進(jìn)行高清解碼等等,全靠CPU的運(yùn)算能力。這種計算密集型任務(wù)雖然也可以用多任務(wù)完成,但是任務(wù)越多,花在任務(wù)切換的時間就越多,CPU執(zhí)行任務(wù)的效率就越低,所以,要最高效地利用CPU,計算密集型任務(wù)同時進(jìn)行的數(shù)量應(yīng)當(dāng)?shù)扔贑PU的核心數(shù)。

計算密集型任務(wù)由于主要消耗CPU資源,因此,代碼運(yùn)行效率至關(guān)重要。Python這樣的腳本語言運(yùn)行效率很低,完全不適合計算密集型任務(wù)。對于計算密集型任務(wù),最好用C語言編寫。

第二種任務(wù)的類型是I/O密集型,涉及到網(wǎng)絡(luò)、磁盤I/O的任務(wù)都是I/O密集型任務(wù),這類任務(wù)的特點(diǎn)是CPU消耗很少,任務(wù)的大部分時間都在等待I/O操作完成(因為I/O的速度遠(yuǎn)遠(yuǎn)低于CPU和內(nèi)存的速度)。對于I/O密集型任務(wù),任務(wù)越多,CPU效率越高,但也有一個限度。常見的大部分任務(wù)都是I/O密集型任務(wù),比如Web應(yīng)用。

I/O密集型任務(wù)執(zhí)行期間,99%的時間都花在I/O上,花在CPU上的時間很少,因此,用運(yùn)行速度極快的C語言替換用Python這樣運(yùn)行速度極低的腳本語言,完全無法提升運(yùn)行效率。對于I/O密集型任務(wù),最合適的語言就是開發(fā)效率最高(代碼量最少)的語言,腳本語言是首選,C語言最差。

什么是 Fork/Join 框架?

Fork/Join 框架是 Java7 提供了的一個用于并行執(zhí)行的任務(wù)的框架,是一個把大任務(wù)分割成若干個小任務(wù),最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。

Fork 就是把一個大任務(wù)切分為若干個子任務(wù)并行的執(zhí)行,Join 就是合并這些子任務(wù)的執(zhí)行結(jié)果,最后得到這個大任務(wù)的結(jié)果。比如計算 1+2+......+10000,可以分割成10個子任務(wù),每個子任務(wù)對1000個數(shù)進(jìn)行求和,最終匯總這10個子任務(wù)的結(jié)果。如下圖所示:


Fork/Join的特性

  1. ForkJoinPool 不是為了替代 ExecutorService,而是它的補(bǔ)充,在某些應(yīng)用場景下性能比 ExecutorService 更好。(見 Java Tip: When to use ForkJoinPool vs ExecutorService )
  2. ForkJoinPool 主要用于實(shí)現(xiàn)“分而治之”的算法,特別是分治之后遞歸調(diào)用的函數(shù),例如 quick sort 等;
  3. ForkJoinPool 最適合的是計算密集型的任務(wù),如果存在 I/O、線程間同步、sleep() 等會造成線程長時間阻塞的情況時,最好配合 MangedBlocker。

關(guān)于“分而治之”的算法,可以查看《分治、回溯的實(shí)現(xiàn)和特性》

工作竊取算法

工作竊?。╳ork-stealing)算法 是指某個線程從其他隊列里竊取任務(wù)來執(zhí)行。

我們需要做一個比較大的任務(wù),我們可以把這個任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競爭,于是把這些子任務(wù)分別放到不同的隊列里,并為每個隊列創(chuàng)建一個單獨(dú)的線程來執(zhí)行隊列里的任務(wù),線程和隊列一一對應(yīng),比如A線程負(fù)責(zé)處理A隊列里的任務(wù)。

但是有的線程會先把自己隊列里的任務(wù)干完,而其他線程對應(yīng)的隊列里還有任務(wù)等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務(wù)來執(zhí)行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競爭,通常會使用雙端隊列,被竊取任務(wù)線程永遠(yuǎn)從雙端隊列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊列的尾部拿任務(wù)執(zhí)行。

工作竊取算法的優(yōu)點(diǎn)是充分利用線程進(jìn)行并行計算,并減少了線程間的競爭,其缺點(diǎn)是在某些情況下還是存在競爭,比如雙端隊列里只有一個任務(wù)時。并且消耗了更多的系統(tǒng)資源,比如創(chuàng)建多個線程和多個雙端隊列。

image.jpeg

  1. ForkJoinPool 的每個工作線程都維護(hù)著一個工作隊列(WorkQueue),這是一個雙端隊列(Deque),里面存放的對象是任務(wù)(ForkJoinTask)。
  2. 每個工作線程在運(yùn)行中產(chǎn)生新的任務(wù)(通常是因為調(diào)用了 fork())時,會放入工作隊列的隊尾,并且工作線程在處理自己的工作隊列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務(wù)來執(zhí)行。
  3. 每個工作線程在處理自己的工作隊列同時,會嘗試竊取一個任務(wù)(或是來自于剛剛提交到 pool 的任務(wù),或是來自于其他工作線程的工作隊列),竊取的任務(wù)位于其他線程的工作隊列的隊首,也就是說工作線程在竊取其他工作線程的任務(wù)時,使用的是 FIFO 方式。
  4. 在遇到 join() 時,如果需要 join 的任務(wù)尚未完成,則會先處理其他任務(wù),并等待其完成。
  5. 在既沒有自己的任務(wù),也沒有可以竊取的任務(wù)時,進(jìn)入休眠。

Fork/Join的使用

使用場景示例

定義fork/join任務(wù),如下示例,隨機(jī)生成2000w條數(shù)據(jù)在數(shù)組當(dāng)中,然后求和_

package com.niuh.forkjoin.recursivetask;

import java.util.concurrent.RecursiveTask;

/**
 * RecursiveTask 并行計算,同步有返回值
 * ForkJoin框架處理的任務(wù)基本都能使用遞歸處理,比如求斐波那契數(shù)列等,但遞歸算法的缺陷是:
 * 一只會只用單線程處理,
 * 二是遞歸次數(shù)過多時會導(dǎo)致堆棧溢出;
 * ForkJoin解決了這兩個問題,使用多線程并發(fā)處理,充分利用計算資源來提高效率,同時避免堆棧溢出發(fā)生。
 * 當(dāng)然像求斐波那契數(shù)列這種小問題直接使用線性算法搞定可能更簡單,實(shí)際應(yīng)用中完全沒必要使用ForkJoin框架,
 * 所以ForkJoin是核彈,是用來對付大家伙的,比如超大數(shù)組排序。
 * 最佳應(yīng)用場景:多核、多內(nèi)存、可以分割計算再合并的計算密集型任務(wù)
 */
class LongSum extends RecursiveTask<Long> {
    //任務(wù)拆分的最小閥值
    static final int SEQUENTIAL_THRESHOLD = 1000;
    static final long NPS = (1000L * 1000 * 1000);
    static final boolean extraWork = true; // change to add more than just a sum


    int low;
    int high;
    int[] array;

    LongSum(int[] arr, int lo, int hi) {
        array = arr;
        low = lo;
        high = hi;
    }

    /**
     * fork()方法:將任務(wù)放入隊列并安排異步執(zhí)行,一個任務(wù)應(yīng)該只調(diào)用一次fork()函數(shù),除非已經(jīng)執(zhí)行完畢并重新初始化。
     * tryUnfork()方法:嘗試把任務(wù)從隊列中拿出單獨(dú)處理,但不一定成功。
     * join()方法:等待計算完成并返回計算結(jié)果。
     * isCompletedAbnormally()方法:用于判斷任務(wù)計算是否發(fā)生異常。
     */
    protected Long compute() {

        if (high - low <= SEQUENTIAL_THRESHOLD) {
            long sum = 0;
            for (int i = low; i < high; ++i) {
                sum += array[i];
            }
            return sum;

        } else {
            int mid = low + (high - low) / 2;
            LongSum left = new LongSum(array, low, mid);
            LongSum right = new LongSum(array, mid, high);
            left.fork();
            right.fork();
            long rightAns = right.join();
            long leftAns = left.join();
            return leftAns + rightAns;
        }
    }
}

執(zhí)行fork/join任務(wù)

package com.niuh.forkjoin.recursivetask;

import com.niuh.forkjoin.utils.Utils;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;

public class LongSumMain {
    //獲取邏輯處理器數(shù)量
    static final int NCPU = Runtime.getRuntime().availableProcessors();
    /**
     * for time conversion
     */
    static final long NPS = (1000L * 1000 * 1000);

    static long calcSum;

    static final boolean reportSteals = true;

    public static void main(String[] args) throws Exception {
        int[] array = Utils.buildRandomIntArray(2000000);
        System.out.println("cpu-num:" + NCPU);
        //單線程下計算數(shù)組數(shù)據(jù)總和
        long start = System.currentTimeMillis();
        calcSum = seqSum(array);
        System.out.println("seq sum=" + calcSum);
        System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start));

        start = System.currentTimeMillis();
        //采用fork/join方式將數(shù)組求和任務(wù)進(jìn)行拆分執(zhí)行,最后合并結(jié)果
        LongSum ls = new LongSum(array, 0, array.length);
        ForkJoinPool fjp = new ForkJoinPool(NCPU); //使用的線程數(shù)
        ForkJoinTask<Long> task = fjp.submit(ls);

        System.out.println("forkjoin sum=" + task.get());
        System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start));
        if (task.isCompletedAbnormally()) {
            System.out.println(task.getException());
        }

        fjp.shutdown();

    }


    static long seqSum(int[] array) {
        long sum = 0;
        for (int i = 0; i < array.length; ++i) {
            sum += array[i];
        }
        return sum;
    }
}

Fork/Join框架原理

Fork/Join 其實(shí)就是指由ForkJoinPool作為線程池、ForkJoinTask(通常實(shí)現(xiàn)其三個抽象子類)為任務(wù)、ForkJoinWorkerThread作為執(zhí)行任務(wù)的具體線程實(shí)體這三者構(gòu)成的任務(wù)調(diào)度機(jī)制。

ForkJoinWorkerThread

ForkJoinWorkerThread 直接繼承了Thread,但是僅僅是為了增加一些額外的功能,并沒有對線程的調(diào)度執(zhí)行做任何更改。


ForkJoinWorkerThread 是被ForkJoinPool管理的工作線程,在創(chuàng)建出來之后都被設(shè)置成為了
守護(hù)線程
,由它來執(zhí)行ForkJoinTasks。該類主要為了維護(hù)創(chuàng)建線程實(shí)例時通過ForkJoinPool為其創(chuàng)建的任務(wù)隊列,與其他兩個線程池整個線程池只有一個任務(wù)隊列不同,F(xiàn)orkJoinPool管理的所有工作線程都擁有自己的工作隊列,為了實(shí)現(xiàn)任務(wù)竊取機(jī)制,該隊列被設(shè)計成一個雙端隊列,而ForkJoinWorkerThread的首要任務(wù)就是執(zhí)行自己的這個雙端任務(wù)隊列中的任務(wù),其次是竊取其他線程的工作隊列,以下是其代碼片段:

public class ForkJoinWorkerThread extends Thread {
    // 這個線程工作的ForkJoinPool池
    final ForkJoinPool pool;    
    // 這個線程擁有的工作竊取機(jī)制的工作隊列
    final ForkJoinPool.WorkQueue workQueue; 

    //創(chuàng)建在給定ForkJoinPool池中執(zhí)行的ForkJoinWorkerThread。
    protected ForkJoinWorkerThread(ForkJoinPool pool) {
        // Use a placeholder until a useful name can be set in registerWorker
        super("aForkJoinWorkerThread");
        this.pool = pool;
        //向ForkJoinPool執(zhí)行池注冊當(dāng)前工作線程,F(xiàn)orkJoinPool為其分配一個工作隊列
        this.workQueue = pool.registerWorker(this); 
    }

    //該工作線程的執(zhí)行內(nèi)容就是執(zhí)行工作隊列中的任務(wù)
    public void run() {
        if (workQueue.array == null) { // only run once
            Throwable exception = null;
            try {
                onStart();
                pool.runWorker(workQueue); //執(zhí)行工作隊列中的任務(wù)
            } catch (Throwable ex) {
                exception = ex; //記錄異常
            } finally {
                try {
                    onTermination(exception);
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    pool.deregisterWorker(this, exception); //撤銷工作
                }
            }
        }
    }

    .....
}

ForkJoinTask

ForkJoinTask :與FutureTask一樣, ForkJoinTask也是Future的子類,不過它是一個抽象類。


ForkJoinTask :我們要使用 ForkJoin 框架,必須首先創(chuàng)建一個 ForkJoin 任務(wù)。它提供在任務(wù)中執(zhí)行 fork()join() 操作的機(jī)制,通常情況下我們不需要直接繼承 ForkJoinTask 類,而只需要繼承它的子類,F(xiàn)ork/Join框架提供類以下幾個子類:

  • RecursiveAction:用于沒有返回結(jié)果的任務(wù)。(比如寫數(shù)據(jù)到磁盤,然后就退出。一個 RecursiveAvtion 可以把直接的工作分割成更小的幾塊,這樣它們可以由獨(dú)立的線程或者 CPU 執(zhí)行。我們可以通過繼承來實(shí)現(xiàn)一個 RecusiveAction)
  • RescursiveTask:用于有返回結(jié)果的任務(wù)。(可以將自己的工作分割為若干更小任務(wù),并將這些子任務(wù)的執(zhí)行合并到一個集體結(jié)果。可以有幾個水平的分割和合并)
  • CountedCompleter :在任務(wù)完成執(zhí)行后會觸發(fā)執(zhí)行一個自定義的鉤子函數(shù)。

常量介紹

ForkJoinTask 有一個int類型的status字段:

  • 其高16位存儲任務(wù)執(zhí)行狀態(tài)例如NORMAL、CANCELLED或EXCEPTIONAL
  • 低16位預(yù)留用于用戶自定義的標(biāo)記。

任務(wù)未完成之前status大于等于0,完成之后就是NORMAL、CANCELLED或EXCEPTIONAL這幾個小于0的值,這幾個值也是按大小順序的:0(初始狀態(tài)) > NORMAL > CANCELLED > EXCEPTIONAL.

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {

    /** 該任務(wù)的執(zhí)行狀態(tài) */
    volatile int status; // accessed directly by pool and workers
    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
    static final int NORMAL      = 0xf0000000;  // must be negative
    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
    static final int SMASK       = 0x0000ffff;  // short bits for tags

    // 異常哈希表

    //被任務(wù)拋出的異常數(shù)組,為了報告給調(diào)用者。因為異常很少見,所以我們不直接將它們保存在task對象中,而是使用弱引用數(shù)組。注意,取消異常不會出現(xiàn)在數(shù)組,而是記錄在statue字段中
    //注意這些都是 static 類屬性,所有的ForkJoinTask共用的。
    private static final ExceptionNode[] exceptionTable;        //異常哈希鏈表數(shù)組
    private static final ReentrantLock exceptionTableLock;
    private static final ReferenceQueue<Object> exceptionTableRefQueue; //在ForkJoinTask被GC回收之后,相應(yīng)的異常節(jié)點(diǎn)對象的引用隊列

    /**
    * 固定容量的exceptionTable.
    */
    private static final int EXCEPTION_MAP_CAPACITY = 32;


    //異常數(shù)組的鍵值對節(jié)點(diǎn)。
    //該哈希鏈表數(shù)組使用線程id進(jìn)行比較,該數(shù)組具有固定的容量,因為它只維護(hù)任務(wù)異常足夠長,以便參與者訪問它們,所以在持續(xù)的時間內(nèi)不應(yīng)該變得非常大。但是,由于我們不知道最后一個joiner何時完成,我們必須使用弱引用并刪除它們。我們對每個操作都這樣做(因此完全鎖定)。此外,任何ForkJoinPool池中的一些線程在其池變?yōu)閕sQuiescent時都會調(diào)用helpExpungeStaleExceptions
    static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
        final Throwable ex;
        ExceptionNode next;
        final long thrower;  // 拋出異常的線程id
        final int hashCode;  // 在弱引用消失之前存儲hashCode
        ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
            super(task, exceptionTableRefQueue); //在ForkJoinTask被GC回收之后,會將該節(jié)點(diǎn)加入隊列exceptionTableRefQueue
            this.ex = ex;
            this.next = next;
            this.thrower = Thread.currentThread().getId();
            this.hashCode = System.identityHashCode(task);
        }
    }

    .................
}

除了status記錄任務(wù)的執(zhí)行狀態(tài)之外,其他字段主要是為了對任務(wù)執(zhí)行的異常的處理,F(xiàn)orkJoinTask采用了哈希數(shù)組 + 鏈表的數(shù)據(jù)結(jié)構(gòu)(JDK8以前的HashMap實(shí)現(xiàn)方法)存放所有(因為這些字段是static)的ForkJoinTask任務(wù)的執(zhí)行異常。

fork 方法(安排任務(wù)異步執(zhí)行)

fork() 做的工作只有一件事,既是把任務(wù)推入當(dāng)前工作線程的工作隊列里(安排任務(wù)異步執(zhí)行)??梢詤⒖匆韵碌脑创a:

public final ForkJoinTask<V> fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

該方法其實(shí)就是將任務(wù)通過push方法加入到當(dāng)前工作線程的工作隊列或者提交隊列(外部非ForkJoinWorkerThread線程通過submit、execute方法提交的任務(wù)),等待被線程池調(diào)度執(zhí)行,這是一個非阻塞的立即返回方法。

這里需要知道,F(xiàn)orkJoinPool線程池通過哈希數(shù)組+雙端隊列的方式將所有的工作線程擁有的任務(wù)隊列和從外部提交的任務(wù)分別映射到哈希數(shù)組的不同槽位上。

join 方法(等待執(zhí)行結(jié)果)

join() 的工作則復(fù)雜得多,也是 join() 可以使得線程免于被阻塞的原因——不像同名的 Thread.join()。

  1. 檢查調(diào)用 join() 的線程是否是 ForkJoinThread 線程。如果不是(例如 main 線程),則阻塞當(dāng)前線程,等待任務(wù)完成。如果是,則不阻塞。
  2. 查看任務(wù)的完成狀態(tài),如果已經(jīng)完成,直接返回結(jié)果。
  3. 如果任務(wù)尚未完成,但處于自己的工作隊列內(nèi),則完成它。
  4. 如果任務(wù)已經(jīng)被其他的工作線程偷走,則竊取這個小偷的工作隊列內(nèi)的任務(wù)(以 FIFO 方式),執(zhí)行,以期幫助它早日完成欲 join 的任務(wù)。
  5. 如果偷走任務(wù)的小偷也已經(jīng)把自己的任務(wù)全部做完,正在等待需要 join 的任務(wù)時,則找到小偷的小偷,幫助它完成它的任務(wù)。
  6. 遞歸地執(zhí)行第5步。

將上述流程畫成序列圖的話就是這個樣子:


源代碼如下:

//當(dāng)計算完成時返回計算結(jié)果。此方法與get()的不同之處在于,異常完成會導(dǎo)致RuntimeException或Error,而不是ExecutionException,調(diào)用線程被中斷不會通過拋出InterruptedException導(dǎo)致方法突然返回。
public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s); //非正常結(jié)束,拋出相關(guān)的異常堆棧信息
    return getRawResult(); //正常結(jié)束,返回結(jié)果
}

//等待任務(wù)執(zhí)行結(jié)束并返回其狀態(tài)status,該方法實(shí)現(xiàn)了join, get, quietlyJoin. 直接處理已經(jīng)完成的,外部等待和unfork+exec的情況,其它情況轉(zhuǎn)發(fā)到ForkJoinPool.awaitJoin
//如果 status < 0 則返回s;
//否則,若不是ForkJoinWorkerThread ,則等待 externalAwaitDone() 返回
//否則,若 (w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 則 返回s;
//否則,返回 wt.pool.awaitJoin(w, this, 0L)
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :  //status為負(fù)數(shù)表示任務(wù)已經(jīng)執(zhí)行結(jié)束,直接返回status。
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :        //調(diào)用pool的執(zhí)行邏輯,并等待返回執(zhí)行結(jié)果狀態(tài)
        wt.pool.awaitJoin(w, this, 0L) :        //調(diào)用pool的等待機(jī)制
        externalAwaitDone();        //不是ForkJoinWorkerThread,
}

//拋出與給定狀態(tài)關(guān)聯(lián)的異常(如果有),被取消是CancellationException。
private void reportException(int s) {
    if (s == CANCELLED)
        throw new CancellationException();
    if (s == EXCEPTIONAL)
        rethrow(getThrowableException());
}

public abstract V getRawResult();

//返回給定任務(wù)的執(zhí)行異常(如果有的話),為了提供準(zhǔn)確的異常堆棧信息,若異常不是由當(dāng)前線程拋出的,將嘗試以記錄的異常為原因創(chuàng)建一個與拋出異常類型相同的新異常。
//如果沒有那樣的構(gòu)造方法將嘗試使用無參的構(gòu)造函數(shù),并通過設(shè)置initCause方法以達(dá)到同樣的效果,盡管它可能包含誤導(dǎo)的堆棧跟蹤信息。
private Throwable getThrowableException() {
    if ((status & DONE_MASK) != EXCEPTIONAL)
        return null;

    //1. 通過當(dāng)前任務(wù)對象的哈希值到哈希鏈表數(shù)組中找到相應(yīng)的異常節(jié)點(diǎn)
    int h = System.identityHashCode(this); //當(dāng)前任務(wù)的hash值
    ExceptionNode e;
    final ReentrantLock lock = exceptionTableLock;
    lock.lock(); //加鎖
    try {
        expungeStaleExceptions(); //清理被GC回收的任務(wù)的異常節(jié)點(diǎn)
        ExceptionNode[] t = exceptionTable;
        e = t[h & (t.length - 1)]; //通過取模對應(yīng)得索引獲取哈希數(shù)組槽位中得節(jié)點(diǎn)
        while (e != null && e.get() != this)
            e = e.next;        //遍歷找到當(dāng)前任務(wù)對應(yīng)的異常節(jié)點(diǎn)
    } finally {
        lock.unlock();
    }
    Throwable ex;
    if (e == null || (ex = e.ex) == null) //表示沒有出現(xiàn)任何異常
        return null;
    if (e.thrower != Thread.currentThread().getId()) { //有異常但是不是由當(dāng)前線程拋出的
        Class<? extends Throwable> ec = ex.getClass();
        try {
            Constructor<?> noArgCtor = null;
            Constructor<?>[] cs = ec.getConstructors();// public ctors only
            //通過反射找到構(gòu)造方法,并構(gòu)造新異常
            for (int i = 0; i < cs.length; ++i) {
                Constructor<?> c = cs[i];
                Class<?>[] ps = c.getParameterTypes();
                if (ps.length == 0)
                    noArgCtor = c; //記錄下無參構(gòu)造方法,以備沒有找到期望的構(gòu)造方法時使用
                else if (ps.length == 1 && ps[0] == Throwable.class) {
                    Throwable wx = (Throwable)c.newInstance(ex); //發(fā)現(xiàn)了我們期望的Throwable類型的參數(shù)的構(gòu)造方法
                    return (wx == null) ? ex : wx;
                }
            }
            if (noArgCtor != null) { //沒有找到期望的構(gòu)造方法,只能通過無參構(gòu)造方法創(chuàng)建新異常
                Throwable wx = (Throwable)(noArgCtor.newInstance());
                if (wx != null) {
                    wx.initCause(ex); //將原始異常設(shè)置進(jìn)去
                    return wx;
                }
            }
        } catch (Exception ignore) {
        }
    }
    return ex;
}



//清除哈希鏈表數(shù)組中已經(jīng)被GC回收掉的任務(wù)的異常節(jié)點(diǎn)。從exceptionTableRefQueue節(jié)點(diǎn)引用隊列中獲取異常節(jié)點(diǎn)并移除哈希鏈表數(shù)組中得對應(yīng)節(jié)點(diǎn)
private static void expungeStaleExceptions() {
    for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
        if (x instanceof ExceptionNode) {
            int hashCode = ((ExceptionNode)x).hashCode; //節(jié)點(diǎn)hash
            ExceptionNode[] t = exceptionTable;
            int i = hashCode & (t.length - 1); //取模得到哈希表索引
            ExceptionNode e = t[i];
            ExceptionNode pred = null;
            while (e != null) {
                ExceptionNode next = e.next;
                if (e == x) { //找到了目標(biāo)節(jié)點(diǎn)
                    if (pred == null)
                        t[i] = next;
                    else
                        pred.next = next;
                    break;
                }
                pred = e; //往后遍歷鏈表
                e = next;
            }
        }
    }
}


//竊取任務(wù)的主要執(zhí)行方法,除非已經(jīng)完成了,否則調(diào)用exec()并記錄完成時的狀態(tài)。
final int doExec() {
    int s; boolean completed;
    if ((s = status) >= 0) { //任務(wù)還未完成
        try {
            completed = exec(); 調(diào)用exec()并記錄完成時的狀態(tài)。
        } catch (Throwable rex) {
            return setExceptionalCompletion(rex); //記錄異常并返回相關(guān)狀態(tài),并喚醒通過join等待此任務(wù)的線程。
        }
        if (completed)
            s = setCompletion(NORMAL); //更新狀態(tài)為正常結(jié)束,并喚醒通過join等待此任務(wù)的線程。
    }
    return s;
}

//立即執(zhí)行此任務(wù)的基本操作。返回true表示該任務(wù)已經(jīng)正常完成,否則返回false表示此任務(wù)不一定完成(或不知道是否完成)。
//此方法還可能拋出(未捕獲的)異常,以指示異常退出。此方法旨在支持?jǐn)U展,一般不應(yīng)以其他方式調(diào)用。
protected abstract boolean exec();

//等待未完成的非ForkJoinWorkerThread線程提交的任務(wù)執(zhí)行結(jié)束,并返回任務(wù)狀態(tài)status
private int externalAwaitDone() {

    //若是CountedCompleter任務(wù),等待ForkJoinPool.common.externalHelpComplete((CountedCompleter<?>)this, 0) 返回
    //否則,若ForkJoinPool.common.tryExternalUnpush(this),返回 doExec() 結(jié)果;
    //否則,返回0
    int s = ((this instanceof CountedCompleter) ? // try helping
             ForkJoinPool.common.externalHelpComplete(
                 (CountedCompleter<?>)this, 0) :                             //輔助完成外部提交的CountedCompleter任務(wù)
             ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);    //輔助完成外部提交的非CountedCompleter任務(wù)
    if (s >= 0 && (s = status) >= 0) { //表示任務(wù)還沒結(jié)束,需要阻塞等待。
        boolean interrupted = false;
        do {
            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { //標(biāo)記有線程需要被喚醒
                synchronized (this) {
                    if (status >= 0) {
                        try {
                            wait(0L); //任務(wù)還沒結(jié)束,無限期阻塞直到被喚醒
                        } catch (InterruptedException ie) {
                            interrupted = true;
                        }
                    }
                    else
                        notifyAll(); //已經(jīng)結(jié)束了喚醒所有阻塞的線程
                }
            }
        } while ((s = status) >= 0);
        if (interrupted)
            Thread.currentThread().interrupt(); //恢復(fù)中斷標(biāo)識
    }
    return s;
}


//記錄異常,更新status狀態(tài),喚醒所有等待線程
private int setExceptionalCompletion(Throwable ex) {
    int s = recordExceptionalCompletion(ex);
    if ((s & DONE_MASK) == EXCEPTIONAL)
        internalPropagateException(ex); //調(diào)用鉤子函數(shù)傳播異常
    return s;
}

/**
 * 對任務(wù)異常結(jié)束的異常傳播支持的鉤子函數(shù)
 */
void internalPropagateException(Throwable ex) {
}

//記錄異常并設(shè)置狀態(tài)status
final int recordExceptionalCompletion(Throwable ex) {
    int s;
    if ((s = status) >= 0) {
        int h = System.identityHashCode(this); //哈希值
        final ReentrantLock lock = exceptionTableLock;
        lock.lock();    //加鎖
        try {
            expungeStaleExceptions();
            ExceptionNode[] t = exceptionTable;
            int i = h & (t.length - 1);
            for (ExceptionNode e = t[i]; ; e = e.next) {
                if (e == null) { //遍歷完了都沒找到,說明哈希鏈表數(shù)組中不存在該任務(wù)對于的異常節(jié)點(diǎn)
                    t[i] = new ExceptionNode(this, ex, t[i]); //創(chuàng)建一個異常節(jié)點(diǎn)用頭插法插入哈希鏈表數(shù)組
                    break;
                }
                if (e.get() == this) // 哈希鏈表數(shù)組中已經(jīng)存在相應(yīng)的異常節(jié)點(diǎn),退出
                    break;
            }
        } finally {
            lock.unlock();
        }
        s = setCompletion(EXCEPTIONAL);
    }
    return s;
}

//標(biāo)記任務(wù)完成標(biāo)志,并喚醒通過join等待此任務(wù)的線程。
private int setCompletion(int completion) {
    for (int s;;) {
        if ((s = status) < 0)
            return s;
        if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { //更新狀態(tài)
            if ((s >>> 16) != 0)
                synchronized (this) { notifyAll(); } //喚醒所有等待線程
            return completion;
        }
    }
}

get 方法(獲取異步任務(wù)結(jié)果)

既然ForkJoinTask也是Future的子類,那么Future最重要的獲取異步任務(wù)結(jié)果的get方法也必然要實(shí)現(xiàn):

//如果需要,等待計算完成,然后檢索其結(jié)果。
public final V get() throws InterruptedException, ExecutionException {
    int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? doJoin() : //是ForkJoinWorkerThread,執(zhí)行doJoin
            externalInterruptibleAwaitDone();    //執(zhí)行externalInterruptibleAwaitDone
    Throwable ex;
    if ((s &= DONE_MASK) == CANCELLED)
        throw new CancellationException();    //被取消的拋出CancellationException
    if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
        throw new ExecutionException(ex);    //執(zhí)行中出現(xiàn)異常的拋出相應(yīng)的異常
    return getRawResult();                    //返回正常結(jié)果
}

//阻塞非ForkJoinWorkerThread線程,直到完成或中斷。
private int externalInterruptibleAwaitDone() throws InterruptedException {
    int s;
    if (Thread.interrupted())
        throw new InterruptedException();
    if ((s = status) >= 0 &&
        (s = ((this instanceof CountedCompleter) ?
              ForkJoinPool.common.externalHelpComplete(
                  (CountedCompleter<?>)this, 0) :
              ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
              0)) >= 0) { //根據(jù)不同的任務(wù)類型 返回執(zhí)行或暫時等待被執(zhí)行的狀態(tài)
        while ((s = status) >= 0) { //需要阻塞等待
            if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                synchronized (this) {
                    if (status >= 0)
                        wait(0L);     //阻塞等待
                    else
                        notifyAll(); //喚醒所有等待線程
                }
            }
        }
    }
    return s;
}

get方法也是通過實(shí)現(xiàn)join方法的doJoin方法實(shí)現(xiàn)的,不同的是,調(diào)用get方法的線程如果被中斷的話,get方法會立即拋出InterruptedException異常,而join方法則不會;另外任務(wù)異常完成的的相關(guān)異常,get方法會將相關(guān)異常都封裝成ExecutionException異常,而join方法則是原樣拋出相關(guān)的異常不會被封裝成ExecutionException異常。get方法采用的wait/notifyAll這種線程通信機(jī)制來實(shí)現(xiàn)阻塞與喚醒。另外還有超時版本的get方法也類似,由此可見get支持可中斷和/或定時等待完成。

invoke 方法(立即執(zhí)行任務(wù),并等待返回結(jié)果)

//開始執(zhí)行此任務(wù),如果需要等待其完成,并返回其結(jié)果,如果底層執(zhí)行此任務(wù)時出現(xiàn)異常,則拋出相應(yīng)的(未捕獲的)RuntimeException或Error。
public final V invoke() {
    int s;
    if ((s = doInvoke() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}

// invoke, quietlyInvoke的實(shí)現(xiàn)
private int doInvoke() {
    int s; Thread t; ForkJoinWorkerThread wt;
    return (s = doExec()) < 0 ? s :      //執(zhí)行此任務(wù),完成返回其status
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? //若未完成或需要等待就根據(jù)不同任務(wù)類型執(zhí)行不同的等待邏輯
        (wt = (ForkJoinWorkerThread)t).pool.
        awaitJoin(wt.workQueue, this, 0L) :
        externalAwaitDone();
}

invoke的實(shí)現(xiàn)會利用當(dāng)前調(diào)用invoke的線程立即執(zhí)行exec方法,當(dāng)然如果exec方法的實(shí)現(xiàn)使用了fork/join,其還是會利用ForkJoinPool線程池的遞歸調(diào)度執(zhí)行策略,等待子任務(wù)執(zhí)行完成,一步步的合并成最終的任務(wù)結(jié)果,并返回。值得注意的是,該方法不會因為線程被中斷而立即返回,而必須在等到任務(wù)執(zhí)行有了結(jié)果之后才會對中斷狀態(tài)進(jìn)行補(bǔ)償。

invokeAll 方法(批量執(zhí)行任務(wù),并等待它們執(zhí)行結(jié)束)

//執(zhí)行兩個任務(wù)
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
    int s1, s2;
    t2.fork(); //t2任務(wù)交給線程池調(diào)度執(zhí)行
    if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) //t1任務(wù)立即由當(dāng)前線程執(zhí)行
        t1.reportException(s1);         //若t1異常結(jié)束,則拋出異常,包括被取消的CancellationException
    if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) //等待t2執(zhí)行結(jié)束
        t2.reportException(s2);            //若t2異常結(jié)束,則拋出異常,包括被取消的CancellationException
}

//執(zhí)行任務(wù)數(shù)組
public static void invokeAll(ForkJoinTask<?>... tasks) {
    Throwable ex = null;
    int last = tasks.length - 1;
    for (int i = last; i >= 0; --i) {
        ForkJoinTask<?> t = tasks[i];
        if (t == null) {
            if (ex == null) //都不能為null
                ex = new NullPointerException();
        }
        else if (i != 0)
            t.fork(); //除了第一個任務(wù)都交給線程池調(diào)度執(zhí)行
        else if (t.doInvoke() < NORMAL && ex == null) //由當(dāng)前線程執(zhí)行第一個任務(wù)
            ex = t.getException();  //記錄第一個任務(wù)的異常
    }
    for (int i = 1; i <= last; ++i) {
        ForkJoinTask<?> t = tasks[i];
        if (t != null) {
            if (ex != null) //第一個任務(wù)異常結(jié)束,取消其他所有任務(wù)
                t.cancel(false);
            else if (t.doJoin() < NORMAL) //有任務(wù)異常結(jié)束,記錄異常
                ex = t.getException();
        }
    }
    if (ex != null)
        rethrow(ex);  //若有任務(wù)異常結(jié)束,拋出數(shù)組最前面那個異常結(jié)束的任務(wù)的異常
}

//批量執(zhí)行任務(wù),返回每個任務(wù)對應(yīng)的ForkJoinTask實(shí)例,
public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
    if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
        invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()])); //將任務(wù)封裝成ForkJoinTask,調(diào)用上面那個方法實(shí)現(xiàn)
        return tasks;
    }
    //下面的邏輯與上面那個invokeAll也是一樣的。
    @SuppressWarnings("unchecked")
    List<? extends ForkJoinTask<?>> ts = (List<? extends ForkJoinTask<?>>) tasks;
    Throwable ex = null;
    int last = ts.size() - 1;
    for (int i = last; i >= 0; --i) {
        ForkJoinTask<?> t = ts.get(i);
        if (t == null) {
            if (ex == null)
                ex = new NullPointerException();
        }
        else if (i != 0)
            t.fork();
        else if (t.doInvoke() < NORMAL && ex == null)
            ex = t.getException();
    }
    for (int i = 1; i <= last; ++i) {
        ForkJoinTask<?> t = ts.get(i);
        if (t != null) {
            if (ex != null)
                t.cancel(false);
            else if (t.doJoin() < NORMAL)
                ex = t.getException();
        }
    }
    if (ex != null)
        rethrow(ex);
    return tasks;
}

批量任務(wù)的執(zhí)行其實(shí)現(xiàn)都是排在前面的任務(wù)(只有兩個參數(shù)是,第一個參數(shù)就是排在前面的任務(wù),是數(shù)組或者隊列時,索引越小的就是排在越前面的)由當(dāng)前線程執(zhí)行,后面的任務(wù)交給線程池調(diào)度執(zhí)行,如果有多個任務(wù)都出現(xiàn)異常,只會拋出排在最前面那個任務(wù)的異常。

quietlyInvoke、quietlyJoin 方法(不需要執(zhí)行結(jié)果的invoke和join)

public final void quietlyJoin() {
    doJoin();
}

public final void quietlyInvoke() {
    doInvoke();
}

quietlyInvoke(),quietlyJoin()這兩個方法就僅僅了是調(diào)用了doInvoke和doJoin,然后就沒有然后了,它們就是不關(guān)心執(zhí)行結(jié)果版本的invoke和Join,當(dāng)然異常結(jié)束的也不會將異常拋出來,當(dāng)執(zhí)行一組任務(wù)并且需要將結(jié)果或異常的處理延遲到全部任務(wù)完成時,這可能很有用。

cancel 方法 (嘗試取消任務(wù)的執(zhí)行)

public boolean cancel(boolean mayInterruptIfRunning) {
    return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}

其主要通過setCompletion標(biāo)記尚未完成的任務(wù)的狀態(tài)為CANCELLED,并喚醒通過join等待此任務(wù)的線程。已經(jīng)執(zhí)行完成的任務(wù)無法被取消,返回true表示取消成功。注意該方法傳入的mayInterruptIfRunning并沒有使用,因此,F(xiàn)orkJoinTask不支持在取消任務(wù)時中斷已經(jīng)開始執(zhí)行的任務(wù),當(dāng)然ForkJoinTask的子類可以重寫實(shí)現(xiàn)。

tryUnfork 方法(取消fork,即從任務(wù)隊列中移除任務(wù))

//取消任務(wù)的執(zhí)行計劃。如果此任務(wù)是當(dāng)前線程最近才剛剛通過fork安排執(zhí)行,并且尚未在另一個線程中開始執(zhí)行,則此方法通常會成功,但也不是100%保證會成功。
public boolean tryUnfork() {
    Thread t;
    return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :    //針對ForkJoinWorkerThread的取消邏輯
            ForkJoinPool.common.tryExternalUnpush(this));            //針對外部提交任務(wù)的取消邏輯
}

tryUnfork嘗試將該任務(wù)從任務(wù)隊列中彈出,彈出之后線程池自然不會再調(diào)度該任務(wù)。該方法的實(shí)現(xiàn)只會在任務(wù)剛剛被推入任務(wù)隊列,并且還處于任務(wù)隊列的棧頂時才可能會成功,否則100%失敗。

reinitialize 方法(重新初始化該任務(wù))

public void reinitialize() {
    if ((status & DONE_MASK) == EXCEPTIONAL) //有異常
        clearExceptionalCompletion(); //從哈希鏈表數(shù)組中移除當(dāng)前任務(wù)的異常節(jié)點(diǎn),并將status重置為0
    else
        status = 0;
}

如果任務(wù)異常結(jié)束,會從異常哈希表中清除該任務(wù)的異常記錄,該方法僅僅是將任務(wù)狀態(tài)status重置為0,使得該任務(wù)可以被重新執(zhí)行。

isDone、isCompletedNormally、isCancelled、isCompletedAbnormally 方法(任務(wù)的完成狀態(tài)查詢)

任務(wù)的執(zhí)行狀態(tài)可以在多個級別上查詢:

  1. 如果任務(wù)以任何方式完成(包括任務(wù)在未執(zhí)行的情況下被取消),則isDone為true。
  2. 如果任務(wù)在沒有取消或沒有遇到異常的情況下完成,則 isCompletedNormally 為true。
  3. 如果任務(wù)被取消(在這種情況下getException方法返回一個CancellationException),則 isCancelled 為true。
  4. 如果任務(wù)被取消或遇到異常,則isCompletedAbnormally異常為true,在這種情況下,getException將返回遇到的異?;騤ava.util.concurrent.CancellationException。

ForkJoinTask 在執(zhí)行的時候可能會拋出異常,但是我們沒辦法在主線程里直接捕獲異常,所以 ForkJoinTask 提供了 isCompletedAbnormally() 方法來檢查任務(wù)是否已經(jīng)拋出異?;蛞呀?jīng)被取消了,并且可以通過 ForkJoinTask 的 getException 方法獲取異常。示例如下:

if(task.isCompletedAbnormally()){
   System.out.println(task.getException());
}

getException 方法返回 Throwable 對象,如果任務(wù)被取消了則返回CancellationException。如果任務(wù)沒有完成或者沒有拋出異常則返回 null。

為Runnable和Callable提供的adapt方法

adapt方法主要是為了兼容傳統(tǒng)的Runnable和Callable任務(wù),通過adapt方法可以將它們封裝成ForkJoinTask任務(wù),當(dāng)將 ForkJoinTask 與其他類型的任務(wù)混合執(zhí)行時,可以使用這些方法。

其他一些方法

getPool可以返回執(zhí)行該任務(wù)的線程所在的線程池實(shí)例,inForkJonPool可以判定當(dāng)前任務(wù)是否是由ForkJoinWorkerThread線程提交的,一般來說這意味著當(dāng)前任務(wù)是內(nèi)部拆分之后的子任務(wù)。

getQueuedTaskCount方法返回已經(jīng)通過fork安排給當(dāng)前工作線程執(zhí)行,但還沒有被執(zhí)行的任務(wù)數(shù)量,該值是一個瞬間值。因為工作線程調(diào)度執(zhí)行的任務(wù)通過fork提交的任務(wù)還是進(jìn)入的該工作線程的任務(wù)隊列,因此可以通過該任務(wù)得知該值。

其它一些方法:

//可能會在承載當(dāng)前任務(wù)的執(zhí)行池處于靜默(空閑)狀態(tài)時執(zhí)行任務(wù)。這個方法可能在有很多任務(wù)都通過fork被安排執(zhí)行,但是一個顯示的join調(diào)用都沒有,直到它們都被執(zhí)行完的設(shè)計中使用。
//其實(shí)就是如果有一批任務(wù)被安排執(zhí)行,并且不知道它們什么時候結(jié)束,如果希望在這些任務(wù)都執(zhí)行結(jié)束之后再安排一個任務(wù),就可以使用helpQuiesce。
public static void helpQuiesce() {
    Thread t;
    //根據(jù)執(zhí)行線程的不同類型,調(diào)用不同的靜默執(zhí)行邏輯
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
        ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
        wt.pool.helpQuiescePool(wt.workQueue);
    }
    else
        ForkJoinPool.quiesceCommonPool();
}

//返回被當(dāng)前工作線程持有的任務(wù)數(shù)a比其它可能竊取其任務(wù)的其它工作線程持有的任務(wù)數(shù)b多多少的估計值,就是 a - b 的差值。若當(dāng)前工作線程不是在ForkJoinPool中,則返回0
//通常該值被恒定在一個很小的值3,若超過這個閾值,則就在本地處理。
public static int getSurplusQueuedTaskCount() {
    return ForkJoinPool.getSurplusQueuedTaskCount();
}

//獲取但不移除(即不取消執(zhí)行計劃)安排給當(dāng)前線程的可能即將被執(zhí)行的下一個任務(wù)。但不能保證該任務(wù)將在接下來實(shí)際被立即執(zhí)行。該方法可能在即使任務(wù)存在但因為競爭而不可訪問而返回null
//該方法主要是為了支持?jǐn)U展,否則可能不會被使用。
protected static ForkJoinTask<?> peekNextLocalTask() {
    Thread t; ForkJoinPool.WorkQueue q;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        q = ((ForkJoinWorkerThread)t).workQueue;
    else
        q = ForkJoinPool.commonSubmitterQueue();
    return (q == null) ? null : q.peek();
}

//獲取并且移除(即取消執(zhí)行)安排給當(dāng)前線程的可能即將被執(zhí)行的下一個任務(wù)。
//該方法主要是為了支持?jǐn)U展,否則可能不會被使用。
protected static ForkJoinTask<?> pollNextLocalTask() {
    Thread t;
    return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() :
        null;
}

//如果當(dāng)前線程被ForkJoinPool運(yùn)行,獲取并且移除(即取消執(zhí)行)當(dāng)前線程即將可能執(zhí)行的下一個任務(wù)。該任務(wù)可能是從其它線程中竊取來的。
//返回nulll并不一定意味著此任務(wù)正在操作的ForkJoinPool處于靜止?fàn)顟B(tài)。該方法主要是為了支持?jǐn)U展,否則可能不會被使用。
protected static ForkJoinTask<?> pollTask() {
    Thread t; ForkJoinWorkerThread wt;
    return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) :
        null;
}

小結(jié)

通常ForkJoinTask只適用于非循環(huán)依賴的純函數(shù)的計算或孤立對象的操作,否則,執(zhí)行可能會遇到某種形式的死鎖,因為任務(wù)循環(huán)地等待彼此。但是,這個框架支持其他方法和技術(shù)(例如使用Phaser、helpQuiesce和complete),這些方法和技術(shù)可用于構(gòu)造解決這種依賴任務(wù)的ForkJoinTask子類,為了支持這些用法,可以使用setForkJoinTaskTag或compareAndSetForkJoinTaskTag原子性地標(biāo)記一個short類型的值,并使用getForkJoinTaskTag進(jìn)行檢查。ForkJoinTask實(shí)現(xiàn)沒有將這些受保護(hù)的方法或標(biāo)記用于任何目的,但是它們可以用于構(gòu)造專門的子類,由此可以使用提供的方法來避免重新訪問已經(jīng)處理過的節(jié)點(diǎn)/任務(wù)。

ForkJoinTask應(yīng)該執(zhí)行相對較少的計算,并且應(yīng)該避免不確定的循環(huán)。大任務(wù)應(yīng)該被分解成更小的子任務(wù),通常通過遞歸分解。如果任務(wù)太大,那么并行性就不能提高吞吐量。如果太小,那么內(nèi)存和內(nèi)部任務(wù)維護(hù)開銷可能會超過處理開銷。

ForkJoinTask是可序列化的,這使它們能夠在諸如遠(yuǎn)程執(zhí)行框架之類的擴(kuò)展中使用。只在執(zhí)行之前或之后序列化任務(wù)才是明智的,而不是在執(zhí)行期間。

ForkJoinPool

ForkJoinPool:ForkJoinTask 需要通過 ForkJoinPool 來執(zhí)行,任務(wù)分割出的子任務(wù)會添加到當(dāng)前工作線程所維護(hù)的雙端隊列中,進(jìn)入隊列的頭部。當(dāng)一個工作線程的隊列里暫時沒有任務(wù)時,它會隨機(jī)從其他工作線程的隊列的尾部獲取一個任務(wù)。

常量介紹

ForkJoinPool 與 內(nèi)部類 WorkQueue 共享的一些常量

// Constants shared across ForkJoinPool and WorkQueue

// 限定參數(shù)
static final int SMASK = 0xffff;        //  低位掩碼,也是最大索引位
static final int MAX_CAP = 0x7fff;      //  工作線程最大容量
static final int EVENMASK = 0xfffe;     //  偶數(shù)低位掩碼
static final int SQMASK = 0x007e;       //  workQueues 數(shù)組最多64個槽位

// ctl 子域和 WorkQueue.scanState 的掩碼和標(biāo)志位
static final int SCANNING = 1;          // 標(biāo)記是否正在運(yùn)行任務(wù)
static final int INACTIVE = 1 << 31;    // 失活狀態(tài)  負(fù)數(shù)
static final int SS_SEQ = 1 << 16;      // 版本戳,防止ABA問題

// ForkJoinPool.config 和 WorkQueue.config 的配置信息標(biāo)記
static final int MODE_MASK = 0xffff << 16;  // 模式掩碼
static final int LIFO_QUEUE = 0;            // LIFO隊列
static final int FIFO_QUEUE = 1 << 16;      // FIFO隊列
static final int SHARED_QUEUE = 1 << 31;    // 共享模式隊列,負(fù)數(shù) ForkJoinPool 中的相關(guān)常量和實(shí)例字段:

ForkJoinPool 中的相關(guān)常量和實(shí)例字段

// 低位和高位掩碼
private static final long SP_MASK = 0xffffffffL;
private static final long UC_MASK = ~SP_MASK;

// 活躍線程數(shù)
private static final int AC_SHIFT = 48;
private static final long AC_UNIT = 0x0001L << AC_SHIFT; //活躍線程數(shù)增量
private static final long AC_MASK = 0xffffL << AC_SHIFT; //活躍線程數(shù)掩碼

// 工作線程數(shù)
private static final int TC_SHIFT = 32;
private static final long TC_UNIT = 0x0001L << TC_SHIFT; //工作線程數(shù)增量
private static final long TC_MASK = 0xffffL << TC_SHIFT; //掩碼
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15);  // 創(chuàng)建工作線程標(biāo)志

// 池狀態(tài)
private static final int RSLOCK = 1;
private static final int RSIGNAL = 1 << 1;
private static final int STARTED = 1 << 2;
private static final int STOP = 1 << 29;
private static final int TERMINATED = 1 << 30;
private static final int SHUTDOWN = 1 << 31;

// 實(shí)例字段
volatile long ctl;                   // 主控制參數(shù)
volatile int runState;               // 運(yùn)行狀態(tài)鎖
final int config;                    // 并行度|模式
int indexSeed;                       // 用于生成工作線程索引
volatile WorkQueue[] workQueues;     // 主對象注冊信息,workQueue
final ForkJoinWorkerThreadFactory factory;// 線程工廠
final UncaughtExceptionHandler ueh;  // 每個工作線程的異常信息
final String workerNamePrefix;       // 用于創(chuàng)建工作線程的名稱
volatile AtomicLong stealCounter;    // 偷取任務(wù)總數(shù),也可作為同步監(jiān)視器

/** 靜態(tài)初始化字段 */
//線程工廠
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
//啟動或殺死線程的方法調(diào)用者的權(quán)限
private static final RuntimePermission modifyThreadPermission;
// 公共靜態(tài)pool
static final ForkJoinPool common;
//并行度,對應(yīng)內(nèi)部common池
static final int commonParallelism;
//備用線程數(shù),在tryCompensate中使用
private static int commonMaxSpares;
//創(chuàng)建workerNamePrefix(工作線程名稱前綴)時的序號
private static int poolNumberSequence;
//線程阻塞等待新的任務(wù)的超時值(以納秒為單位),默認(rèn)2秒
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
//空閑超時時間,防止timer未命中
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L;  // 20ms
//默認(rèn)備用線程數(shù)
private static final int DEFAULT_COMMON_MAX_SPARES = 256;
//阻塞前自旋的次數(shù),用在在awaitRunStateLock和awaitWork中
private static final int SPINS  = 0;
//indexSeed的增量
private static final int SEED_INCREMENT = 0x9e3779b9;

ForkJoinPool 的內(nèi)部狀態(tài)都是通過一個64位的 long 型 變量ctl來存儲,它由四個16位的子域組成:

  • AC: 正在運(yùn)行工作線程數(shù)減去目標(biāo)并行度,高16位
  • TC: 總工作線程數(shù)減去目標(biāo)并行度,中高16位
  • SS: 棧頂?shù)却€程的版本計數(shù)和狀態(tài),中低16位
  • ID: 棧頂 WorkQueue 在池中的索引(poolIndex),低16位

ForkJoinPool.WorkQueue 中的相關(guān)屬性:

//初始隊列容量,2的冪
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
//最大隊列容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

// 實(shí)例字段
volatile int scanState;    // Woker狀態(tài), <0: inactive; odd:scanning
int stackPred;             // 記錄前一個棧頂?shù)腸tl
int nsteals;               // 偷取任務(wù)數(shù)
int hint;                  // 記錄偷取者索引,初始為隨機(jī)索引
int config;                // 池索引和模式
volatile int qlock;        // 1: locked, < 0: terminate; else 0
volatile int base;         // 下一個poll操作的索引(棧底/隊列頭)
int top;                   // 一個push操作的索引(棧頂/隊列尾)
ForkJoinTask<?>[] array;   // 任務(wù)數(shù)組
final ForkJoinPool pool;   // the containing pool (may be null)
final ForkJoinWorkerThread owner; // 當(dāng)前工作隊列的工作線程,共享模式下為null
volatile Thread parker;    // 調(diào)用park阻塞期間為owner,其他情況為null
volatile ForkJoinTask<?> currentJoin;  // 記錄被join過來的任務(wù)
volatile ForkJoinTask<?> currentSteal; // 記錄從其他工作隊列偷取過來的任務(wù)

內(nèi)部數(shù)據(jù)結(jié)構(gòu)

ForkJoinPool采用了哈希數(shù)組 + 雙端隊列的方式存放任務(wù),但這里的任務(wù)分為兩類:

  • 一類是通過execute、submit 提交的外部任務(wù)
  • 另一類是ForkJoinWorkerThread工作線程通過fork/join分解出來的工作任務(wù)

ForkJoinPool并沒有把這兩種任務(wù)混在一個任務(wù)隊列中,對于外部任務(wù),會利用Thread內(nèi)部的隨機(jī)probe值映射到哈希數(shù)組的偶數(shù)槽位中的提交隊列中,這種提交隊列是一種數(shù)組實(shí)現(xiàn)的雙端隊列稱之為Submission Queue,專門存放外部提交的任務(wù)。

對于ForkJoinWorkerThread工作線程,每一個工作線程都分配了一個工作隊列,這也是一個雙端隊列,稱之為Work Queue,這種隊列都會被映射到哈希數(shù)組的奇數(shù)槽位,每一個工作線程fork/join分解的任務(wù)都會被添加到自己擁有的那個工作隊列中。

在ForkJoinPool中的屬性 WorkQueue[] workQueues 就是我們所說的哈希數(shù)組,其元素就是內(nèi)部類WorkQueue實(shí)現(xiàn)的基于數(shù)組的雙端隊列。該哈希數(shù)組的長度為2的冪,并且支持?jǐn)U容。如下就是該哈希數(shù)組的示意結(jié)構(gòu)圖:


如圖,提交隊列位于哈希數(shù)組workQueue的奇數(shù)索引槽位,工作線程的工作隊列位于偶數(shù)槽位。

  • 默認(rèn)情況下,asyncMode為false時:
    • 因此工作線程把工作隊列當(dāng)著棧一樣使用(后進(jìn)先出),將分解的子任務(wù)推入工作隊列的top端,取任務(wù)的時候也從top端取(凡是雙端隊列都會有兩個分別指向隊列兩端的指針,這里就是圖上畫出的base和top);
    • 而當(dāng)某些工作線程的任務(wù)為空的時候,就會從其他隊列(不限于workQueue,也會是提交隊列)竊取(steal)任務(wù),如圖示擁有workQueue2的工作線程從workQueue1中竊取了一個任務(wù),竊取任務(wù)的時候采用的是先進(jìn)先出FIFO的策略(即從base端竊取任務(wù)),這樣不但可以避免在取任務(wù)的時候與擁有其隊列的工作線程發(fā)生沖突,從而減小競爭,還可以輔助其完成比較大的任務(wù)。
  • asyncMode為true的話,擁有該工作隊列的工作線程將按照先進(jìn)先出的策略從base端取任務(wù),這一般只用于不需要返回結(jié)果的任務(wù),或者事件消息傳遞框架。

ForkJoinPool構(gòu)造函數(shù)

其完整構(gòu)造方法如下

private ForkJoinPool(int parallelism,
                     ForkJoinWorkerThreadFactory factory,
                     UncaughtExceptionHandler handler,
                     int mode,
                     String workerNamePrefix) {
    this.workerNamePrefix = workerNamePrefix;
    this.factory = factory;
    this.ueh = handler;
    this.config = (parallelism & SMASK) | mode;
    long np = (long)(-parallelism); // offset ctl counts
    this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

重要參數(shù)解釋

  1. parallelism:并行度( the parallelism level),默認(rèn)情況下跟我們機(jī)器的cpu個數(shù)保持一致,使用 Runtime.getRuntime().availableProcessors()可以得到我們機(jī)器運(yùn)行時可用的CPU個數(shù)。
  2. factory:創(chuàng)建新線程的工廠( the factory for creating new threads)。默認(rèn)情況下使用ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory。
  3. handler:線程異常情況下的處理器(Thread.UncaughtExceptionHandler handler),該處理器在線程執(zhí)行任務(wù)時由于某些無法預(yù)料到的錯誤而導(dǎo)致任務(wù)線程中斷時進(jìn)行一些處理,默認(rèn)情況為null。
  4. asyncMode:這個參數(shù)要注意,在ForkJoinPool中,每一個工作線程都有一個獨(dú)立的任務(wù)隊列

asyncMode表示工作線程內(nèi)的任務(wù)隊列是采用何種方式進(jìn)行調(diào)度,可以是先進(jìn)先出FIFO,也可以是后進(jìn)先出LIFO。如果為true,則線程池中的工作線程則使用先進(jìn)先出方式進(jìn)行任務(wù)調(diào)度,默認(rèn)情況下是false。

ForkJoinPool.submit 方法

public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
    if (task == null)
        throw new NullPointerException();
    //提交到工作隊列
    externalPush(task);
    return task;
}

ForkJoinPool 自身擁有工作隊列,這些工作隊列的作用是用來接收由外部線程(非 ForkJoinThread 線程)提交過來的任務(wù),而這些工作隊列被稱為 submitting queue 。
submit() 和 fork() 其實(shí)沒有本質(zhì)區(qū)別,只是提交對象變成了 submitting queue 而已(還有一些同步,初始化的操作)。submitting queue 和其他 work queue 一樣,是工作線程”竊取“的對象,因此當(dāng)其中的任務(wù)被一個工作線程成功竊取時,就意味著提交的任務(wù)真正開始進(jìn)入執(zhí)行階段。

相關(guān)文章

PS:以上代碼提交在 Githubhttps://github.com/Niuh-Study/niuh-juc-final.git

文章持續(xù)更新,可以公眾號搜一搜「 一角錢技術(shù) 」第一時間閱讀, 本文 GitHub org_hejianhui/JavaStudy 已經(jīng)收錄,歡迎 Star。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容