
點(diǎn)贊再看,養(yǎng)成習(xí)慣,搜一搜【一角錢技術(shù)】關(guān)注更多原創(chuàng)技術(shù)文章。本文 GitHub org_hejianhui/JavaStudy 已收錄,有我的系列文章。
前言
前面我們介紹了線程池框架(ExecutorService)的兩個具體實(shí)現(xiàn):
線程池為線程生命周期的開銷和資源不足問題提供了解決方案。通過對多個任務(wù)重用線程,線程創(chuàng)建的開銷被分?jǐn)偟蕉鄠€任務(wù)上。Java7 又提供了的一個用于并行執(zhí)行的任務(wù)的框架 Fork/Join ,是一個把大任務(wù)分割成若干個小任務(wù),最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。在介紹Fork/Join 框架之前我們先了解幾個概念:CPU密集型、IO密集型,再逐步深入去認(rèn)識Fork/Join 框架。
任務(wù)性質(zhì)類型
CPU密集型(CPU bound)
CPU密集型也叫計算密集型,指的是系統(tǒng)的硬盤、內(nèi)存性能相對于CPU要好很好多,此時,系統(tǒng)運(yùn)作大部分的狀況是 CPU Loading 100%,CPU要讀/寫 I/O(硬盤/內(nèi)存),I/O在很短的時間就可以完成,而CPU還有許多運(yùn)算要處理,CPU Loading很高。
在多重程序系統(tǒng)中,大部分時間用來做計算、邏輯判斷等CPU動作的程序稱之 CPU bound。例如一個計算圓周率至小數(shù)點(diǎn)一千位以下的程序,在執(zhí)行的過程當(dāng)中絕大部分時間在用三角函數(shù)和開根號的計算,便是屬于CPU bound的程序。
CPU bound的程序一般而言CPU占用率相當(dāng)高。這可能是因為任務(wù)本身不太需要訪問I/O設(shè)備,也可能是因為程序是多線程實(shí)現(xiàn)因此屏蔽了等待I/O的時間。
線程數(shù)一般設(shè)置為:線程數(shù) = CPU核數(shù) + 1(現(xiàn)代CPU支持超線程)
IO密集型(I/O bound)
I/O密集型指的是系統(tǒng)的CPU性能相對硬盤、內(nèi)存要好很多,此時,系統(tǒng)運(yùn)作,大部分的狀況是 CPU 在等 I/O(硬盤/內(nèi)存)的讀/寫操作,此時 CPU Loading 并不高。
I/O bound的程序一般在達(dá)到性能極限時,CPU占用率仍然較低。這可能是因為任務(wù)本身需要大量I/O操作,而 pipeline 做的不是很好,沒有充分利用處理器能力。
線程數(shù)一般設(shè)置為:線程數(shù) = ((線程等待時間 + 線程CPU時間) / 線程CPU時間) * CPU數(shù)目
CPU密集型 VS I/O密集型
我們可以把任務(wù)分為計算密集型和I/O密集型。
計算密集型任務(wù)的特點(diǎn)是要進(jìn)行大量的計算,消耗CPU資源,比如計算圓周率、對視頻進(jìn)行高清解碼等等,全靠CPU的運(yùn)算能力。這種計算密集型任務(wù)雖然也可以用多任務(wù)完成,但是任務(wù)越多,花在任務(wù)切換的時間就越多,CPU執(zhí)行任務(wù)的效率就越低,所以,要最高效地利用CPU,計算密集型任務(wù)同時進(jìn)行的數(shù)量應(yīng)當(dāng)?shù)扔贑PU的核心數(shù)。
計算密集型任務(wù)由于主要消耗CPU資源,因此,代碼運(yùn)行效率至關(guān)重要。Python這樣的腳本語言運(yùn)行效率很低,完全不適合計算密集型任務(wù)。對于計算密集型任務(wù),最好用C語言編寫。
第二種任務(wù)的類型是I/O密集型,涉及到網(wǎng)絡(luò)、磁盤I/O的任務(wù)都是I/O密集型任務(wù),這類任務(wù)的特點(diǎn)是CPU消耗很少,任務(wù)的大部分時間都在等待I/O操作完成(因為I/O的速度遠(yuǎn)遠(yuǎn)低于CPU和內(nèi)存的速度)。對于I/O密集型任務(wù),任務(wù)越多,CPU效率越高,但也有一個限度。常見的大部分任務(wù)都是I/O密集型任務(wù),比如Web應(yīng)用。
I/O密集型任務(wù)執(zhí)行期間,99%的時間都花在I/O上,花在CPU上的時間很少,因此,用運(yùn)行速度極快的C語言替換用Python這樣運(yùn)行速度極低的腳本語言,完全無法提升運(yùn)行效率。對于I/O密集型任務(wù),最合適的語言就是開發(fā)效率最高(代碼量最少)的語言,腳本語言是首選,C語言最差。
什么是 Fork/Join 框架?
Fork/Join 框架是 Java7 提供了的一個用于并行執(zhí)行的任務(wù)的框架,是一個把大任務(wù)分割成若干個小任務(wù),最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。
Fork 就是把一個大任務(wù)切分為若干個子任務(wù)并行的執(zhí)行,Join 就是合并這些子任務(wù)的執(zhí)行結(jié)果,最后得到這個大任務(wù)的結(jié)果。比如計算 1+2+......+10000,可以分割成10個子任務(wù),每個子任務(wù)對1000個數(shù)進(jìn)行求和,最終匯總這10個子任務(wù)的結(jié)果。如下圖所示:

Fork/Join的特性:
- ForkJoinPool 不是為了替代 ExecutorService,而是它的補(bǔ)充,在某些應(yīng)用場景下性能比 ExecutorService 更好。(見 Java Tip: When to use ForkJoinPool vs ExecutorService )
- ForkJoinPool 主要用于實(shí)現(xiàn)“分而治之”的算法,特別是分治之后遞歸調(diào)用的函數(shù),例如 quick sort 等;
- ForkJoinPool 最適合的是計算密集型的任務(wù),如果存在 I/O、線程間同步、sleep() 等會造成線程長時間阻塞的情況時,最好配合 MangedBlocker。
關(guān)于“分而治之”的算法,可以查看《分治、回溯的實(shí)現(xiàn)和特性》
工作竊取算法
工作竊?。╳ork-stealing)算法 是指某個線程從其他隊列里竊取任務(wù)來執(zhí)行。
我們需要做一個比較大的任務(wù),我們可以把這個任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競爭,于是把這些子任務(wù)分別放到不同的隊列里,并為每個隊列創(chuàng)建一個單獨(dú)的線程來執(zhí)行隊列里的任務(wù),線程和隊列一一對應(yīng),比如A線程負(fù)責(zé)處理A隊列里的任務(wù)。
但是有的線程會先把自己隊列里的任務(wù)干完,而其他線程對應(yīng)的隊列里還有任務(wù)等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務(wù)來執(zhí)行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競爭,通常會使用雙端隊列,被竊取任務(wù)線程永遠(yuǎn)從雙端隊列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊列的尾部拿任務(wù)執(zhí)行。

工作竊取算法的優(yōu)點(diǎn)是充分利用線程進(jìn)行并行計算,并減少了線程間的競爭,其缺點(diǎn)是在某些情況下還是存在競爭,比如雙端隊列里只有一個任務(wù)時。并且消耗了更多的系統(tǒng)資源,比如創(chuàng)建多個線程和多個雙端隊列。

- ForkJoinPool 的每個工作線程都維護(hù)著一個工作隊列(WorkQueue),這是一個雙端隊列(Deque),里面存放的對象是任務(wù)(ForkJoinTask)。
- 每個工作線程在運(yùn)行中產(chǎn)生新的任務(wù)(通常是因為調(diào)用了 fork())時,會放入工作隊列的隊尾,并且工作線程在處理自己的工作隊列時,使用的是 LIFO 方式,也就是說每次從隊尾取出任務(wù)來執(zhí)行。
- 每個工作線程在處理自己的工作隊列同時,會嘗試竊取一個任務(wù)(或是來自于剛剛提交到 pool 的任務(wù),或是來自于其他工作線程的工作隊列),竊取的任務(wù)位于其他線程的工作隊列的隊首,也就是說工作線程在竊取其他工作線程的任務(wù)時,使用的是 FIFO 方式。
- 在遇到 join() 時,如果需要 join 的任務(wù)尚未完成,則會先處理其他任務(wù),并等待其完成。
- 在既沒有自己的任務(wù),也沒有可以竊取的任務(wù)時,進(jìn)入休眠。
Fork/Join的使用
使用場景示例
定義fork/join任務(wù),如下示例,隨機(jī)生成2000w條數(shù)據(jù)在數(shù)組當(dāng)中,然后求和_
package com.niuh.forkjoin.recursivetask;
import java.util.concurrent.RecursiveTask;
/**
* RecursiveTask 并行計算,同步有返回值
* ForkJoin框架處理的任務(wù)基本都能使用遞歸處理,比如求斐波那契數(shù)列等,但遞歸算法的缺陷是:
* 一只會只用單線程處理,
* 二是遞歸次數(shù)過多時會導(dǎo)致堆棧溢出;
* ForkJoin解決了這兩個問題,使用多線程并發(fā)處理,充分利用計算資源來提高效率,同時避免堆棧溢出發(fā)生。
* 當(dāng)然像求斐波那契數(shù)列這種小問題直接使用線性算法搞定可能更簡單,實(shí)際應(yīng)用中完全沒必要使用ForkJoin框架,
* 所以ForkJoin是核彈,是用來對付大家伙的,比如超大數(shù)組排序。
* 最佳應(yīng)用場景:多核、多內(nèi)存、可以分割計算再合并的計算密集型任務(wù)
*/
class LongSum extends RecursiveTask<Long> {
//任務(wù)拆分的最小閥值
static final int SEQUENTIAL_THRESHOLD = 1000;
static final long NPS = (1000L * 1000 * 1000);
static final boolean extraWork = true; // change to add more than just a sum
int low;
int high;
int[] array;
LongSum(int[] arr, int lo, int hi) {
array = arr;
low = lo;
high = hi;
}
/**
* fork()方法:將任務(wù)放入隊列并安排異步執(zhí)行,一個任務(wù)應(yīng)該只調(diào)用一次fork()函數(shù),除非已經(jīng)執(zhí)行完畢并重新初始化。
* tryUnfork()方法:嘗試把任務(wù)從隊列中拿出單獨(dú)處理,但不一定成功。
* join()方法:等待計算完成并返回計算結(jié)果。
* isCompletedAbnormally()方法:用于判斷任務(wù)計算是否發(fā)生異常。
*/
protected Long compute() {
if (high - low <= SEQUENTIAL_THRESHOLD) {
long sum = 0;
for (int i = low; i < high; ++i) {
sum += array[i];
}
return sum;
} else {
int mid = low + (high - low) / 2;
LongSum left = new LongSum(array, low, mid);
LongSum right = new LongSum(array, mid, high);
left.fork();
right.fork();
long rightAns = right.join();
long leftAns = left.join();
return leftAns + rightAns;
}
}
}
執(zhí)行fork/join任務(wù)
package com.niuh.forkjoin.recursivetask;
import com.niuh.forkjoin.utils.Utils;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
public class LongSumMain {
//獲取邏輯處理器數(shù)量
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* for time conversion
*/
static final long NPS = (1000L * 1000 * 1000);
static long calcSum;
static final boolean reportSteals = true;
public static void main(String[] args) throws Exception {
int[] array = Utils.buildRandomIntArray(2000000);
System.out.println("cpu-num:" + NCPU);
//單線程下計算數(shù)組數(shù)據(jù)總和
long start = System.currentTimeMillis();
calcSum = seqSum(array);
System.out.println("seq sum=" + calcSum);
System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start));
start = System.currentTimeMillis();
//采用fork/join方式將數(shù)組求和任務(wù)進(jìn)行拆分執(zhí)行,最后合并結(jié)果
LongSum ls = new LongSum(array, 0, array.length);
ForkJoinPool fjp = new ForkJoinPool(NCPU); //使用的線程數(shù)
ForkJoinTask<Long> task = fjp.submit(ls);
System.out.println("forkjoin sum=" + task.get());
System.out.println("singgle thread sort:->" + (System.currentTimeMillis() - start));
if (task.isCompletedAbnormally()) {
System.out.println(task.getException());
}
fjp.shutdown();
}
static long seqSum(int[] array) {
long sum = 0;
for (int i = 0; i < array.length; ++i) {
sum += array[i];
}
return sum;
}
}
Fork/Join框架原理
Fork/Join 其實(shí)就是指由ForkJoinPool作為線程池、ForkJoinTask(通常實(shí)現(xiàn)其三個抽象子類)為任務(wù)、ForkJoinWorkerThread作為執(zhí)行任務(wù)的具體線程實(shí)體這三者構(gòu)成的任務(wù)調(diào)度機(jī)制。

ForkJoinWorkerThread
ForkJoinWorkerThread 直接繼承了Thread,但是僅僅是為了增加一些額外的功能,并沒有對線程的調(diào)度執(zhí)行做任何更改。

ForkJoinWorkerThread 是被ForkJoinPool管理的工作線程,在創(chuàng)建出來之后都被設(shè)置成為了守護(hù)線程,由它來執(zhí)行ForkJoinTasks。該類主要為了維護(hù)創(chuàng)建線程實(shí)例時通過ForkJoinPool為其創(chuàng)建的任務(wù)隊列,與其他兩個線程池整個線程池只有一個任務(wù)隊列不同,F(xiàn)orkJoinPool管理的所有工作線程都擁有自己的工作隊列,為了實(shí)現(xiàn)任務(wù)竊取機(jī)制,該隊列被設(shè)計成一個雙端隊列,而ForkJoinWorkerThread的首要任務(wù)就是執(zhí)行自己的這個雙端任務(wù)隊列中的任務(wù),其次是竊取其他線程的工作隊列,以下是其代碼片段:
public class ForkJoinWorkerThread extends Thread {
// 這個線程工作的ForkJoinPool池
final ForkJoinPool pool;
// 這個線程擁有的工作竊取機(jī)制的工作隊列
final ForkJoinPool.WorkQueue workQueue;
//創(chuàng)建在給定ForkJoinPool池中執(zhí)行的ForkJoinWorkerThread。
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;
//向ForkJoinPool執(zhí)行池注冊當(dāng)前工作線程,F(xiàn)orkJoinPool為其分配一個工作隊列
this.workQueue = pool.registerWorker(this);
}
//該工作線程的執(zhí)行內(nèi)容就是執(zhí)行工作隊列中的任務(wù)
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue); //執(zhí)行工作隊列中的任務(wù)
} catch (Throwable ex) {
exception = ex; //記錄異常
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception); //撤銷工作
}
}
}
}
.....
}
ForkJoinTask
ForkJoinTask :與FutureTask一樣, ForkJoinTask也是Future的子類,不過它是一個抽象類。

ForkJoinTask :我們要使用 ForkJoin 框架,必須首先創(chuàng)建一個 ForkJoin 任務(wù)。它提供在任務(wù)中執(zhí)行 fork() 和 join() 操作的機(jī)制,通常情況下我們不需要直接繼承 ForkJoinTask 類,而只需要繼承它的子類,F(xiàn)ork/Join框架提供類以下幾個子類:
- RecursiveAction:用于沒有返回結(jié)果的任務(wù)。(比如寫數(shù)據(jù)到磁盤,然后就退出。一個 RecursiveAvtion 可以把直接的工作分割成更小的幾塊,這樣它們可以由獨(dú)立的線程或者 CPU 執(zhí)行。我們可以通過繼承來實(shí)現(xiàn)一個 RecusiveAction)
- RescursiveTask:用于有返回結(jié)果的任務(wù)。(可以將自己的工作分割為若干更小任務(wù),并將這些子任務(wù)的執(zhí)行合并到一個集體結(jié)果。可以有幾個水平的分割和合并)
- CountedCompleter :在任務(wù)完成執(zhí)行后會觸發(fā)執(zhí)行一個自定義的鉤子函數(shù)。
常量介紹
ForkJoinTask 有一個int類型的status字段:
- 其高16位存儲任務(wù)執(zhí)行狀態(tài)例如NORMAL、CANCELLED或EXCEPTIONAL
- 低16位預(yù)留用于用戶自定義的標(biāo)記。
任務(wù)未完成之前status大于等于0,完成之后就是NORMAL、CANCELLED或EXCEPTIONAL這幾個小于0的值,這幾個值也是按大小順序的:0(初始狀態(tài)) > NORMAL > CANCELLED > EXCEPTIONAL.
public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/** 該任務(wù)的執(zhí)行狀態(tài) */
volatile int status; // accessed directly by pool and workers
static final int DONE_MASK = 0xf0000000; // mask out non-completion bits
static final int NORMAL = 0xf0000000; // must be negative
static final int CANCELLED = 0xc0000000; // must be < NORMAL
static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED
static final int SIGNAL = 0x00010000; // must be >= 1 << 16
static final int SMASK = 0x0000ffff; // short bits for tags
// 異常哈希表
//被任務(wù)拋出的異常數(shù)組,為了報告給調(diào)用者。因為異常很少見,所以我們不直接將它們保存在task對象中,而是使用弱引用數(shù)組。注意,取消異常不會出現(xiàn)在數(shù)組,而是記錄在statue字段中
//注意這些都是 static 類屬性,所有的ForkJoinTask共用的。
private static final ExceptionNode[] exceptionTable; //異常哈希鏈表數(shù)組
private static final ReentrantLock exceptionTableLock;
private static final ReferenceQueue<Object> exceptionTableRefQueue; //在ForkJoinTask被GC回收之后,相應(yīng)的異常節(jié)點(diǎn)對象的引用隊列
/**
* 固定容量的exceptionTable.
*/
private static final int EXCEPTION_MAP_CAPACITY = 32;
//異常數(shù)組的鍵值對節(jié)點(diǎn)。
//該哈希鏈表數(shù)組使用線程id進(jìn)行比較,該數(shù)組具有固定的容量,因為它只維護(hù)任務(wù)異常足夠長,以便參與者訪問它們,所以在持續(xù)的時間內(nèi)不應(yīng)該變得非常大。但是,由于我們不知道最后一個joiner何時完成,我們必須使用弱引用并刪除它們。我們對每個操作都這樣做(因此完全鎖定)。此外,任何ForkJoinPool池中的一些線程在其池變?yōu)閕sQuiescent時都會調(diào)用helpExpungeStaleExceptions
static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
final Throwable ex;
ExceptionNode next;
final long thrower; // 拋出異常的線程id
final int hashCode; // 在弱引用消失之前存儲hashCode
ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
super(task, exceptionTableRefQueue); //在ForkJoinTask被GC回收之后,會將該節(jié)點(diǎn)加入隊列exceptionTableRefQueue
this.ex = ex;
this.next = next;
this.thrower = Thread.currentThread().getId();
this.hashCode = System.identityHashCode(task);
}
}
.................
}
除了status記錄任務(wù)的執(zhí)行狀態(tài)之外,其他字段主要是為了對任務(wù)執(zhí)行的異常的處理,F(xiàn)orkJoinTask采用了哈希數(shù)組 + 鏈表的數(shù)據(jù)結(jié)構(gòu)(JDK8以前的HashMap實(shí)現(xiàn)方法)存放所有(因為這些字段是static)的ForkJoinTask任務(wù)的執(zhí)行異常。
fork 方法(安排任務(wù)異步執(zhí)行)
fork() 做的工作只有一件事,既是把任務(wù)推入當(dāng)前工作線程的工作隊列里(安排任務(wù)異步執(zhí)行)??梢詤⒖匆韵碌脑创a:
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
該方法其實(shí)就是將任務(wù)通過push方法加入到當(dāng)前工作線程的工作隊列或者提交隊列(外部非ForkJoinWorkerThread線程通過submit、execute方法提交的任務(wù)),等待被線程池調(diào)度執(zhí)行,這是一個非阻塞的立即返回方法。
這里需要知道,F(xiàn)orkJoinPool線程池通過哈希數(shù)組+雙端隊列的方式將所有的工作線程擁有的任務(wù)隊列和從外部提交的任務(wù)分別映射到哈希數(shù)組的不同槽位上。
join 方法(等待執(zhí)行結(jié)果)
join() 的工作則復(fù)雜得多,也是 join() 可以使得線程免于被阻塞的原因——不像同名的 Thread.join()。
- 檢查調(diào)用 join() 的線程是否是 ForkJoinThread 線程。如果不是(例如 main 線程),則阻塞當(dāng)前線程,等待任務(wù)完成。如果是,則不阻塞。
- 查看任務(wù)的完成狀態(tài),如果已經(jīng)完成,直接返回結(jié)果。
- 如果任務(wù)尚未完成,但處于自己的工作隊列內(nèi),則完成它。
- 如果任務(wù)已經(jīng)被其他的工作線程偷走,則竊取這個小偷的工作隊列內(nèi)的任務(wù)(以 FIFO 方式),執(zhí)行,以期幫助它早日完成欲 join 的任務(wù)。
- 如果偷走任務(wù)的小偷也已經(jīng)把自己的任務(wù)全部做完,正在等待需要 join 的任務(wù)時,則找到小偷的小偷,幫助它完成它的任務(wù)。
- 遞歸地執(zhí)行第5步。
將上述流程畫成序列圖的話就是這個樣子:

源代碼如下:
//當(dāng)計算完成時返回計算結(jié)果。此方法與get()的不同之處在于,異常完成會導(dǎo)致RuntimeException或Error,而不是ExecutionException,調(diào)用線程被中斷不會通過拋出InterruptedException導(dǎo)致方法突然返回。
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s); //非正常結(jié)束,拋出相關(guān)的異常堆棧信息
return getRawResult(); //正常結(jié)束,返回結(jié)果
}
//等待任務(wù)執(zhí)行結(jié)束并返回其狀態(tài)status,該方法實(shí)現(xiàn)了join, get, quietlyJoin. 直接處理已經(jīng)完成的,外部等待和unfork+exec的情況,其它情況轉(zhuǎn)發(fā)到ForkJoinPool.awaitJoin
//如果 status < 0 則返回s;
//否則,若不是ForkJoinWorkerThread ,則等待 externalAwaitDone() 返回
//否則,若 (w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 則 返回s;
//否則,返回 wt.pool.awaitJoin(w, this, 0L)
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s : //status為負(fù)數(shù)表示任務(wù)已經(jīng)執(zhí)行結(jié)束,直接返回status。
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s : //調(diào)用pool的執(zhí)行邏輯,并等待返回執(zhí)行結(jié)果狀態(tài)
wt.pool.awaitJoin(w, this, 0L) : //調(diào)用pool的等待機(jī)制
externalAwaitDone(); //不是ForkJoinWorkerThread,
}
//拋出與給定狀態(tài)關(guān)聯(lián)的異常(如果有),被取消是CancellationException。
private void reportException(int s) {
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL)
rethrow(getThrowableException());
}
public abstract V getRawResult();
//返回給定任務(wù)的執(zhí)行異常(如果有的話),為了提供準(zhǔn)確的異常堆棧信息,若異常不是由當(dāng)前線程拋出的,將嘗試以記錄的異常為原因創(chuàng)建一個與拋出異常類型相同的新異常。
//如果沒有那樣的構(gòu)造方法將嘗試使用無參的構(gòu)造函數(shù),并通過設(shè)置initCause方法以達(dá)到同樣的效果,盡管它可能包含誤導(dǎo)的堆棧跟蹤信息。
private Throwable getThrowableException() {
if ((status & DONE_MASK) != EXCEPTIONAL)
return null;
//1. 通過當(dāng)前任務(wù)對象的哈希值到哈希鏈表數(shù)組中找到相應(yīng)的異常節(jié)點(diǎn)
int h = System.identityHashCode(this); //當(dāng)前任務(wù)的hash值
ExceptionNode e;
final ReentrantLock lock = exceptionTableLock;
lock.lock(); //加鎖
try {
expungeStaleExceptions(); //清理被GC回收的任務(wù)的異常節(jié)點(diǎn)
ExceptionNode[] t = exceptionTable;
e = t[h & (t.length - 1)]; //通過取模對應(yīng)得索引獲取哈希數(shù)組槽位中得節(jié)點(diǎn)
while (e != null && e.get() != this)
e = e.next; //遍歷找到當(dāng)前任務(wù)對應(yīng)的異常節(jié)點(diǎn)
} finally {
lock.unlock();
}
Throwable ex;
if (e == null || (ex = e.ex) == null) //表示沒有出現(xiàn)任何異常
return null;
if (e.thrower != Thread.currentThread().getId()) { //有異常但是不是由當(dāng)前線程拋出的
Class<? extends Throwable> ec = ex.getClass();
try {
Constructor<?> noArgCtor = null;
Constructor<?>[] cs = ec.getConstructors();// public ctors only
//通過反射找到構(gòu)造方法,并構(gòu)造新異常
for (int i = 0; i < cs.length; ++i) {
Constructor<?> c = cs[i];
Class<?>[] ps = c.getParameterTypes();
if (ps.length == 0)
noArgCtor = c; //記錄下無參構(gòu)造方法,以備沒有找到期望的構(gòu)造方法時使用
else if (ps.length == 1 && ps[0] == Throwable.class) {
Throwable wx = (Throwable)c.newInstance(ex); //發(fā)現(xiàn)了我們期望的Throwable類型的參數(shù)的構(gòu)造方法
return (wx == null) ? ex : wx;
}
}
if (noArgCtor != null) { //沒有找到期望的構(gòu)造方法,只能通過無參構(gòu)造方法創(chuàng)建新異常
Throwable wx = (Throwable)(noArgCtor.newInstance());
if (wx != null) {
wx.initCause(ex); //將原始異常設(shè)置進(jìn)去
return wx;
}
}
} catch (Exception ignore) {
}
}
return ex;
}
//清除哈希鏈表數(shù)組中已經(jīng)被GC回收掉的任務(wù)的異常節(jié)點(diǎn)。從exceptionTableRefQueue節(jié)點(diǎn)引用隊列中獲取異常節(jié)點(diǎn)并移除哈希鏈表數(shù)組中得對應(yīng)節(jié)點(diǎn)
private static void expungeStaleExceptions() {
for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
if (x instanceof ExceptionNode) {
int hashCode = ((ExceptionNode)x).hashCode; //節(jié)點(diǎn)hash
ExceptionNode[] t = exceptionTable;
int i = hashCode & (t.length - 1); //取模得到哈希表索引
ExceptionNode e = t[i];
ExceptionNode pred = null;
while (e != null) {
ExceptionNode next = e.next;
if (e == x) { //找到了目標(biāo)節(jié)點(diǎn)
if (pred == null)
t[i] = next;
else
pred.next = next;
break;
}
pred = e; //往后遍歷鏈表
e = next;
}
}
}
}
//竊取任務(wù)的主要執(zhí)行方法,除非已經(jīng)完成了,否則調(diào)用exec()并記錄完成時的狀態(tài)。
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) { //任務(wù)還未完成
try {
completed = exec(); 調(diào)用exec()并記錄完成時的狀態(tài)。
} catch (Throwable rex) {
return setExceptionalCompletion(rex); //記錄異常并返回相關(guān)狀態(tài),并喚醒通過join等待此任務(wù)的線程。
}
if (completed)
s = setCompletion(NORMAL); //更新狀態(tài)為正常結(jié)束,并喚醒通過join等待此任務(wù)的線程。
}
return s;
}
//立即執(zhí)行此任務(wù)的基本操作。返回true表示該任務(wù)已經(jīng)正常完成,否則返回false表示此任務(wù)不一定完成(或不知道是否完成)。
//此方法還可能拋出(未捕獲的)異常,以指示異常退出。此方法旨在支持?jǐn)U展,一般不應(yīng)以其他方式調(diào)用。
protected abstract boolean exec();
//等待未完成的非ForkJoinWorkerThread線程提交的任務(wù)執(zhí)行結(jié)束,并返回任務(wù)狀態(tài)status
private int externalAwaitDone() {
//若是CountedCompleter任務(wù),等待ForkJoinPool.common.externalHelpComplete((CountedCompleter<?>)this, 0) 返回
//否則,若ForkJoinPool.common.tryExternalUnpush(this),返回 doExec() 結(jié)果;
//否則,返回0
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) : //輔助完成外部提交的CountedCompleter任務(wù)
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); //輔助完成外部提交的非CountedCompleter任務(wù)
if (s >= 0 && (s = status) >= 0) { //表示任務(wù)還沒結(jié)束,需要阻塞等待。
boolean interrupted = false;
do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { //標(biāo)記有線程需要被喚醒
synchronized (this) {
if (status >= 0) {
try {
wait(0L); //任務(wù)還沒結(jié)束,無限期阻塞直到被喚醒
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
notifyAll(); //已經(jīng)結(jié)束了喚醒所有阻塞的線程
}
}
} while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt(); //恢復(fù)中斷標(biāo)識
}
return s;
}
//記錄異常,更新status狀態(tài),喚醒所有等待線程
private int setExceptionalCompletion(Throwable ex) {
int s = recordExceptionalCompletion(ex);
if ((s & DONE_MASK) == EXCEPTIONAL)
internalPropagateException(ex); //調(diào)用鉤子函數(shù)傳播異常
return s;
}
/**
* 對任務(wù)異常結(jié)束的異常傳播支持的鉤子函數(shù)
*/
void internalPropagateException(Throwable ex) {
}
//記錄異常并設(shè)置狀態(tài)status
final int recordExceptionalCompletion(Throwable ex) {
int s;
if ((s = status) >= 0) {
int h = System.identityHashCode(this); //哈希值
final ReentrantLock lock = exceptionTableLock;
lock.lock(); //加鎖
try {
expungeStaleExceptions();
ExceptionNode[] t = exceptionTable;
int i = h & (t.length - 1);
for (ExceptionNode e = t[i]; ; e = e.next) {
if (e == null) { //遍歷完了都沒找到,說明哈希鏈表數(shù)組中不存在該任務(wù)對于的異常節(jié)點(diǎn)
t[i] = new ExceptionNode(this, ex, t[i]); //創(chuàng)建一個異常節(jié)點(diǎn)用頭插法插入哈希鏈表數(shù)組
break;
}
if (e.get() == this) // 哈希鏈表數(shù)組中已經(jīng)存在相應(yīng)的異常節(jié)點(diǎn),退出
break;
}
} finally {
lock.unlock();
}
s = setCompletion(EXCEPTIONAL);
}
return s;
}
//標(biāo)記任務(wù)完成標(biāo)志,并喚醒通過join等待此任務(wù)的線程。
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { //更新狀態(tài)
if ((s >>> 16) != 0)
synchronized (this) { notifyAll(); } //喚醒所有等待線程
return completion;
}
}
}
get 方法(獲取異步任務(wù)結(jié)果)
既然ForkJoinTask也是Future的子類,那么Future最重要的獲取異步任務(wù)結(jié)果的get方法也必然要實(shí)現(xiàn):
//如果需要,等待計算完成,然后檢索其結(jié)果。
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? doJoin() : //是ForkJoinWorkerThread,執(zhí)行doJoin
externalInterruptibleAwaitDone(); //執(zhí)行externalInterruptibleAwaitDone
Throwable ex;
if ((s &= DONE_MASK) == CANCELLED)
throw new CancellationException(); //被取消的拋出CancellationException
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
throw new ExecutionException(ex); //執(zhí)行中出現(xiàn)異常的拋出相應(yīng)的異常
return getRawResult(); //返回正常結(jié)果
}
//阻塞非ForkJoinWorkerThread線程,直到完成或中斷。
private int externalInterruptibleAwaitDone() throws InterruptedException {
int s;
if (Thread.interrupted())
throw new InterruptedException();
if ((s = status) >= 0 &&
(s = ((this instanceof CountedCompleter) ?
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
0)) >= 0) { //根據(jù)不同的任務(wù)類型 返回執(zhí)行或暫時等待被執(zhí)行的狀態(tài)
while ((s = status) >= 0) { //需要阻塞等待
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0)
wait(0L); //阻塞等待
else
notifyAll(); //喚醒所有等待線程
}
}
}
}
return s;
}
get方法也是通過實(shí)現(xiàn)join方法的doJoin方法實(shí)現(xiàn)的,不同的是,調(diào)用get方法的線程如果被中斷的話,get方法會立即拋出InterruptedException異常,而join方法則不會;另外任務(wù)異常完成的的相關(guān)異常,get方法會將相關(guān)異常都封裝成ExecutionException異常,而join方法則是原樣拋出相關(guān)的異常不會被封裝成ExecutionException異常。get方法采用的wait/notifyAll這種線程通信機(jī)制來實(shí)現(xiàn)阻塞與喚醒。另外還有超時版本的get方法也類似,由此可見get支持可中斷和/或定時等待完成。
invoke 方法(立即執(zhí)行任務(wù),并等待返回結(jié)果)
//開始執(zhí)行此任務(wù),如果需要等待其完成,并返回其結(jié)果,如果底層執(zhí)行此任務(wù)時出現(xiàn)異常,則拋出相應(yīng)的(未捕獲的)RuntimeException或Error。
public final V invoke() {
int s;
if ((s = doInvoke() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
// invoke, quietlyInvoke的實(shí)現(xiàn)
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s : //執(zhí)行此任務(wù),完成返回其status
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? //若未完成或需要等待就根據(jù)不同任務(wù)類型執(zhí)行不同的等待邏輯
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
invoke的實(shí)現(xiàn)會利用當(dāng)前調(diào)用invoke的線程立即執(zhí)行exec方法,當(dāng)然如果exec方法的實(shí)現(xiàn)使用了fork/join,其還是會利用ForkJoinPool線程池的遞歸調(diào)度執(zhí)行策略,等待子任務(wù)執(zhí)行完成,一步步的合并成最終的任務(wù)結(jié)果,并返回。值得注意的是,該方法不會因為線程被中斷而立即返回,而必須在等到任務(wù)執(zhí)行有了結(jié)果之后才會對中斷狀態(tài)進(jìn)行補(bǔ)償。
invokeAll 方法(批量執(zhí)行任務(wù),并等待它們執(zhí)行結(jié)束)
//執(zhí)行兩個任務(wù)
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
int s1, s2;
t2.fork(); //t2任務(wù)交給線程池調(diào)度執(zhí)行
if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) //t1任務(wù)立即由當(dāng)前線程執(zhí)行
t1.reportException(s1); //若t1異常結(jié)束,則拋出異常,包括被取消的CancellationException
if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) //等待t2執(zhí)行結(jié)束
t2.reportException(s2); //若t2異常結(jié)束,則拋出異常,包括被取消的CancellationException
}
//執(zhí)行任務(wù)數(shù)組
public static void invokeAll(ForkJoinTask<?>... tasks) {
Throwable ex = null;
int last = tasks.length - 1;
for (int i = last; i >= 0; --i) {
ForkJoinTask<?> t = tasks[i];
if (t == null) {
if (ex == null) //都不能為null
ex = new NullPointerException();
}
else if (i != 0)
t.fork(); //除了第一個任務(wù)都交給線程池調(diào)度執(zhí)行
else if (t.doInvoke() < NORMAL && ex == null) //由當(dāng)前線程執(zhí)行第一個任務(wù)
ex = t.getException(); //記錄第一個任務(wù)的異常
}
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = tasks[i];
if (t != null) {
if (ex != null) //第一個任務(wù)異常結(jié)束,取消其他所有任務(wù)
t.cancel(false);
else if (t.doJoin() < NORMAL) //有任務(wù)異常結(jié)束,記錄異常
ex = t.getException();
}
}
if (ex != null)
rethrow(ex); //若有任務(wù)異常結(jié)束,拋出數(shù)組最前面那個異常結(jié)束的任務(wù)的異常
}
//批量執(zhí)行任務(wù),返回每個任務(wù)對應(yīng)的ForkJoinTask實(shí)例,
public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()])); //將任務(wù)封裝成ForkJoinTask,調(diào)用上面那個方法實(shí)現(xiàn)
return tasks;
}
//下面的邏輯與上面那個invokeAll也是一樣的。
@SuppressWarnings("unchecked")
List<? extends ForkJoinTask<?>> ts = (List<? extends ForkJoinTask<?>>) tasks;
Throwable ex = null;
int last = ts.size() - 1;
for (int i = last; i >= 0; --i) {
ForkJoinTask<?> t = ts.get(i);
if (t == null) {
if (ex == null)
ex = new NullPointerException();
}
else if (i != 0)
t.fork();
else if (t.doInvoke() < NORMAL && ex == null)
ex = t.getException();
}
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = ts.get(i);
if (t != null) {
if (ex != null)
t.cancel(false);
else if (t.doJoin() < NORMAL)
ex = t.getException();
}
}
if (ex != null)
rethrow(ex);
return tasks;
}
批量任務(wù)的執(zhí)行其實(shí)現(xiàn)都是排在前面的任務(wù)(只有兩個參數(shù)是,第一個參數(shù)就是排在前面的任務(wù),是數(shù)組或者隊列時,索引越小的就是排在越前面的)由當(dāng)前線程執(zhí)行,后面的任務(wù)交給線程池調(diào)度執(zhí)行,如果有多個任務(wù)都出現(xiàn)異常,只會拋出排在最前面那個任務(wù)的異常。
quietlyInvoke、quietlyJoin 方法(不需要執(zhí)行結(jié)果的invoke和join)
public final void quietlyJoin() {
doJoin();
}
public final void quietlyInvoke() {
doInvoke();
}
quietlyInvoke(),quietlyJoin()這兩個方法就僅僅了是調(diào)用了doInvoke和doJoin,然后就沒有然后了,它們就是不關(guān)心執(zhí)行結(jié)果版本的invoke和Join,當(dāng)然異常結(jié)束的也不會將異常拋出來,當(dāng)執(zhí)行一組任務(wù)并且需要將結(jié)果或異常的處理延遲到全部任務(wù)完成時,這可能很有用。
cancel 方法 (嘗試取消任務(wù)的執(zhí)行)
public boolean cancel(boolean mayInterruptIfRunning) {
return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
}
其主要通過setCompletion標(biāo)記尚未完成的任務(wù)的狀態(tài)為CANCELLED,并喚醒通過join等待此任務(wù)的線程。已經(jīng)執(zhí)行完成的任務(wù)無法被取消,返回true表示取消成功。注意該方法傳入的mayInterruptIfRunning并沒有使用,因此,F(xiàn)orkJoinTask不支持在取消任務(wù)時中斷已經(jīng)開始執(zhí)行的任務(wù),當(dāng)然ForkJoinTask的子類可以重寫實(shí)現(xiàn)。
tryUnfork 方法(取消fork,即從任務(wù)隊列中移除任務(wù))
//取消任務(wù)的執(zhí)行計劃。如果此任務(wù)是當(dāng)前線程最近才剛剛通過fork安排執(zhí)行,并且尚未在另一個線程中開始執(zhí)行,則此方法通常會成功,但也不是100%保證會成功。
public boolean tryUnfork() {
Thread t;
return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : //針對ForkJoinWorkerThread的取消邏輯
ForkJoinPool.common.tryExternalUnpush(this)); //針對外部提交任務(wù)的取消邏輯
}
tryUnfork嘗試將該任務(wù)從任務(wù)隊列中彈出,彈出之后線程池自然不會再調(diào)度該任務(wù)。該方法的實(shí)現(xiàn)只會在任務(wù)剛剛被推入任務(wù)隊列,并且還處于任務(wù)隊列的棧頂時才可能會成功,否則100%失敗。
reinitialize 方法(重新初始化該任務(wù))
public void reinitialize() {
if ((status & DONE_MASK) == EXCEPTIONAL) //有異常
clearExceptionalCompletion(); //從哈希鏈表數(shù)組中移除當(dāng)前任務(wù)的異常節(jié)點(diǎn),并將status重置為0
else
status = 0;
}
如果任務(wù)異常結(jié)束,會從異常哈希表中清除該任務(wù)的異常記錄,該方法僅僅是將任務(wù)狀態(tài)status重置為0,使得該任務(wù)可以被重新執(zhí)行。
isDone、isCompletedNormally、isCancelled、isCompletedAbnormally 方法(任務(wù)的完成狀態(tài)查詢)
任務(wù)的執(zhí)行狀態(tài)可以在多個級別上查詢:
- 如果任務(wù)以任何方式完成(包括任務(wù)在未執(zhí)行的情況下被取消),則isDone為true。
- 如果任務(wù)在沒有取消或沒有遇到異常的情況下完成,則 isCompletedNormally 為true。
- 如果任務(wù)被取消(在這種情況下getException方法返回一個CancellationException),則 isCancelled 為true。
- 如果任務(wù)被取消或遇到異常,則isCompletedAbnormally異常為true,在這種情況下,getException將返回遇到的異?;騤ava.util.concurrent.CancellationException。
ForkJoinTask 在執(zhí)行的時候可能會拋出異常,但是我們沒辦法在主線程里直接捕獲異常,所以 ForkJoinTask 提供了 isCompletedAbnormally() 方法來檢查任務(wù)是否已經(jīng)拋出異?;蛞呀?jīng)被取消了,并且可以通過 ForkJoinTask 的 getException 方法獲取異常。示例如下:
if(task.isCompletedAbnormally()){
System.out.println(task.getException());
}
getException 方法返回 Throwable 對象,如果任務(wù)被取消了則返回CancellationException。如果任務(wù)沒有完成或者沒有拋出異常則返回 null。
為Runnable和Callable提供的adapt方法
adapt方法主要是為了兼容傳統(tǒng)的Runnable和Callable任務(wù),通過adapt方法可以將它們封裝成ForkJoinTask任務(wù),當(dāng)將 ForkJoinTask 與其他類型的任務(wù)混合執(zhí)行時,可以使用這些方法。
其他一些方法
getPool可以返回執(zhí)行該任務(wù)的線程所在的線程池實(shí)例,inForkJonPool可以判定當(dāng)前任務(wù)是否是由ForkJoinWorkerThread線程提交的,一般來說這意味著當(dāng)前任務(wù)是內(nèi)部拆分之后的子任務(wù)。
getQueuedTaskCount方法返回已經(jīng)通過fork安排給當(dāng)前工作線程執(zhí)行,但還沒有被執(zhí)行的任務(wù)數(shù)量,該值是一個瞬間值。因為工作線程調(diào)度執(zhí)行的任務(wù)通過fork提交的任務(wù)還是進(jìn)入的該工作線程的任務(wù)隊列,因此可以通過該任務(wù)得知該值。
其它一些方法:
//可能會在承載當(dāng)前任務(wù)的執(zhí)行池處于靜默(空閑)狀態(tài)時執(zhí)行任務(wù)。這個方法可能在有很多任務(wù)都通過fork被安排執(zhí)行,但是一個顯示的join調(diào)用都沒有,直到它們都被執(zhí)行完的設(shè)計中使用。
//其實(shí)就是如果有一批任務(wù)被安排執(zhí)行,并且不知道它們什么時候結(jié)束,如果希望在這些任務(wù)都執(zhí)行結(jié)束之后再安排一個任務(wù),就可以使用helpQuiesce。
public static void helpQuiesce() {
Thread t;
//根據(jù)執(zhí)行線程的不同類型,調(diào)用不同的靜默執(zhí)行邏輯
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
wt.pool.helpQuiescePool(wt.workQueue);
}
else
ForkJoinPool.quiesceCommonPool();
}
//返回被當(dāng)前工作線程持有的任務(wù)數(shù)a比其它可能竊取其任務(wù)的其它工作線程持有的任務(wù)數(shù)b多多少的估計值,就是 a - b 的差值。若當(dāng)前工作線程不是在ForkJoinPool中,則返回0
//通常該值被恒定在一個很小的值3,若超過這個閾值,則就在本地處理。
public static int getSurplusQueuedTaskCount() {
return ForkJoinPool.getSurplusQueuedTaskCount();
}
//獲取但不移除(即不取消執(zhí)行計劃)安排給當(dāng)前線程的可能即將被執(zhí)行的下一個任務(wù)。但不能保證該任務(wù)將在接下來實(shí)際被立即執(zhí)行。該方法可能在即使任務(wù)存在但因為競爭而不可訪問而返回null
//該方法主要是為了支持?jǐn)U展,否則可能不會被使用。
protected static ForkJoinTask<?> peekNextLocalTask() {
Thread t; ForkJoinPool.WorkQueue q;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
q = ((ForkJoinWorkerThread)t).workQueue;
else
q = ForkJoinPool.commonSubmitterQueue();
return (q == null) ? null : q.peek();
}
//獲取并且移除(即取消執(zhí)行)安排給當(dāng)前線程的可能即將被執(zhí)行的下一個任務(wù)。
//該方法主要是為了支持?jǐn)U展,否則可能不會被使用。
protected static ForkJoinTask<?> pollNextLocalTask() {
Thread t;
return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread)t).workQueue.nextLocalTask() :
null;
}
//如果當(dāng)前線程被ForkJoinPool運(yùn)行,獲取并且移除(即取消執(zhí)行)當(dāng)前線程即將可能執(zhí)行的下一個任務(wù)。該任務(wù)可能是從其它線程中竊取來的。
//返回nulll并不一定意味著此任務(wù)正在操作的ForkJoinPool處于靜止?fàn)顟B(tài)。該方法主要是為了支持?jǐn)U展,否則可能不會被使用。
protected static ForkJoinTask<?> pollTask() {
Thread t; ForkJoinWorkerThread wt;
return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) :
null;
}
小結(jié)
通常ForkJoinTask只適用于非循環(huán)依賴的純函數(shù)的計算或孤立對象的操作,否則,執(zhí)行可能會遇到某種形式的死鎖,因為任務(wù)循環(huán)地等待彼此。但是,這個框架支持其他方法和技術(shù)(例如使用Phaser、helpQuiesce和complete),這些方法和技術(shù)可用于構(gòu)造解決這種依賴任務(wù)的ForkJoinTask子類,為了支持這些用法,可以使用setForkJoinTaskTag或compareAndSetForkJoinTaskTag原子性地標(biāo)記一個short類型的值,并使用getForkJoinTaskTag進(jìn)行檢查。ForkJoinTask實(shí)現(xiàn)沒有將這些受保護(hù)的方法或標(biāo)記用于任何目的,但是它們可以用于構(gòu)造專門的子類,由此可以使用提供的方法來避免重新訪問已經(jīng)處理過的節(jié)點(diǎn)/任務(wù)。
ForkJoinTask應(yīng)該執(zhí)行相對較少的計算,并且應(yīng)該避免不確定的循環(huán)。大任務(wù)應(yīng)該被分解成更小的子任務(wù),通常通過遞歸分解。如果任務(wù)太大,那么并行性就不能提高吞吐量。如果太小,那么內(nèi)存和內(nèi)部任務(wù)維護(hù)開銷可能會超過處理開銷。
ForkJoinTask是可序列化的,這使它們能夠在諸如遠(yuǎn)程執(zhí)行框架之類的擴(kuò)展中使用。只在執(zhí)行之前或之后序列化任務(wù)才是明智的,而不是在執(zhí)行期間。
ForkJoinPool
ForkJoinPool:ForkJoinTask 需要通過 ForkJoinPool 來執(zhí)行,任務(wù)分割出的子任務(wù)會添加到當(dāng)前工作線程所維護(hù)的雙端隊列中,進(jìn)入隊列的頭部。當(dāng)一個工作線程的隊列里暫時沒有任務(wù)時,它會隨機(jī)從其他工作線程的隊列的尾部獲取一個任務(wù)。

常量介紹
ForkJoinPool 與 內(nèi)部類 WorkQueue 共享的一些常量
// Constants shared across ForkJoinPool and WorkQueue
// 限定參數(shù)
static final int SMASK = 0xffff; // 低位掩碼,也是最大索引位
static final int MAX_CAP = 0x7fff; // 工作線程最大容量
static final int EVENMASK = 0xfffe; // 偶數(shù)低位掩碼
static final int SQMASK = 0x007e; // workQueues 數(shù)組最多64個槽位
// ctl 子域和 WorkQueue.scanState 的掩碼和標(biāo)志位
static final int SCANNING = 1; // 標(biāo)記是否正在運(yùn)行任務(wù)
static final int INACTIVE = 1 << 31; // 失活狀態(tài) 負(fù)數(shù)
static final int SS_SEQ = 1 << 16; // 版本戳,防止ABA問題
// ForkJoinPool.config 和 WorkQueue.config 的配置信息標(biāo)記
static final int MODE_MASK = 0xffff << 16; // 模式掩碼
static final int LIFO_QUEUE = 0; // LIFO隊列
static final int FIFO_QUEUE = 1 << 16; // FIFO隊列
static final int SHARED_QUEUE = 1 << 31; // 共享模式隊列,負(fù)數(shù) ForkJoinPool 中的相關(guān)常量和實(shí)例字段:
ForkJoinPool 中的相關(guān)常量和實(shí)例字段
// 低位和高位掩碼
private static final long SP_MASK = 0xffffffffL;
private static final long UC_MASK = ~SP_MASK;
// 活躍線程數(shù)
private static final int AC_SHIFT = 48;
private static final long AC_UNIT = 0x0001L << AC_SHIFT; //活躍線程數(shù)增量
private static final long AC_MASK = 0xffffL << AC_SHIFT; //活躍線程數(shù)掩碼
// 工作線程數(shù)
private static final int TC_SHIFT = 32;
private static final long TC_UNIT = 0x0001L << TC_SHIFT; //工作線程數(shù)增量
private static final long TC_MASK = 0xffffL << TC_SHIFT; //掩碼
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // 創(chuàng)建工作線程標(biāo)志
// 池狀態(tài)
private static final int RSLOCK = 1;
private static final int RSIGNAL = 1 << 1;
private static final int STARTED = 1 << 2;
private static final int STOP = 1 << 29;
private static final int TERMINATED = 1 << 30;
private static final int SHUTDOWN = 1 << 31;
// 實(shí)例字段
volatile long ctl; // 主控制參數(shù)
volatile int runState; // 運(yùn)行狀態(tài)鎖
final int config; // 并行度|模式
int indexSeed; // 用于生成工作線程索引
volatile WorkQueue[] workQueues; // 主對象注冊信息,workQueue
final ForkJoinWorkerThreadFactory factory;// 線程工廠
final UncaughtExceptionHandler ueh; // 每個工作線程的異常信息
final String workerNamePrefix; // 用于創(chuàng)建工作線程的名稱
volatile AtomicLong stealCounter; // 偷取任務(wù)總數(shù),也可作為同步監(jiān)視器
/** 靜態(tài)初始化字段 */
//線程工廠
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;
//啟動或殺死線程的方法調(diào)用者的權(quán)限
private static final RuntimePermission modifyThreadPermission;
// 公共靜態(tài)pool
static final ForkJoinPool common;
//并行度,對應(yīng)內(nèi)部common池
static final int commonParallelism;
//備用線程數(shù),在tryCompensate中使用
private static int commonMaxSpares;
//創(chuàng)建workerNamePrefix(工作線程名稱前綴)時的序號
private static int poolNumberSequence;
//線程阻塞等待新的任務(wù)的超時值(以納秒為單位),默認(rèn)2秒
private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
//空閑超時時間,防止timer未命中
private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms
//默認(rèn)備用線程數(shù)
private static final int DEFAULT_COMMON_MAX_SPARES = 256;
//阻塞前自旋的次數(shù),用在在awaitRunStateLock和awaitWork中
private static final int SPINS = 0;
//indexSeed的增量
private static final int SEED_INCREMENT = 0x9e3779b9;
ForkJoinPool 的內(nèi)部狀態(tài)都是通過一個64位的 long 型 變量ctl來存儲,它由四個16位的子域組成:
- AC: 正在運(yùn)行工作線程數(shù)減去目標(biāo)并行度,高16位
- TC: 總工作線程數(shù)減去目標(biāo)并行度,中高16位
- SS: 棧頂?shù)却€程的版本計數(shù)和狀態(tài),中低16位
- ID: 棧頂 WorkQueue 在池中的索引(poolIndex),低16位
ForkJoinPool.WorkQueue 中的相關(guān)屬性:
//初始隊列容量,2的冪
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
//最大隊列容量
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// 實(shí)例字段
volatile int scanState; // Woker狀態(tài), <0: inactive; odd:scanning
int stackPred; // 記錄前一個棧頂?shù)腸tl
int nsteals; // 偷取任務(wù)數(shù)
int hint; // 記錄偷取者索引,初始為隨機(jī)索引
int config; // 池索引和模式
volatile int qlock; // 1: locked, < 0: terminate; else 0
volatile int base; // 下一個poll操作的索引(棧底/隊列頭)
int top; // 一個push操作的索引(棧頂/隊列尾)
ForkJoinTask<?>[] array; // 任務(wù)數(shù)組
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // 當(dāng)前工作隊列的工作線程,共享模式下為null
volatile Thread parker; // 調(diào)用park阻塞期間為owner,其他情況為null
volatile ForkJoinTask<?> currentJoin; // 記錄被join過來的任務(wù)
volatile ForkJoinTask<?> currentSteal; // 記錄從其他工作隊列偷取過來的任務(wù)
內(nèi)部數(shù)據(jù)結(jié)構(gòu)
ForkJoinPool采用了哈希數(shù)組 + 雙端隊列的方式存放任務(wù),但這里的任務(wù)分為兩類:
- 一類是通過execute、submit 提交的外部任務(wù)
- 另一類是ForkJoinWorkerThread工作線程通過fork/join分解出來的工作任務(wù)
ForkJoinPool并沒有把這兩種任務(wù)混在一個任務(wù)隊列中,對于外部任務(wù),會利用Thread內(nèi)部的隨機(jī)probe值映射到哈希數(shù)組的偶數(shù)槽位中的提交隊列中,這種提交隊列是一種數(shù)組實(shí)現(xiàn)的雙端隊列稱之為Submission Queue,專門存放外部提交的任務(wù)。
對于ForkJoinWorkerThread工作線程,每一個工作線程都分配了一個工作隊列,這也是一個雙端隊列,稱之為Work Queue,這種隊列都會被映射到哈希數(shù)組的奇數(shù)槽位,每一個工作線程fork/join分解的任務(wù)都會被添加到自己擁有的那個工作隊列中。
在ForkJoinPool中的屬性 WorkQueue[] workQueues 就是我們所說的哈希數(shù)組,其元素就是內(nèi)部類WorkQueue實(shí)現(xiàn)的基于數(shù)組的雙端隊列。該哈希數(shù)組的長度為2的冪,并且支持?jǐn)U容。如下就是該哈希數(shù)組的示意結(jié)構(gòu)圖:

如圖,提交隊列位于哈希數(shù)組workQueue的奇數(shù)索引槽位,工作線程的工作隊列位于偶數(shù)槽位。
- 默認(rèn)情況下,asyncMode為false時:
- 因此工作線程把工作隊列當(dāng)著棧一樣使用(后進(jìn)先出),將分解的子任務(wù)推入工作隊列的top端,取任務(wù)的時候也從top端取(凡是雙端隊列都會有兩個分別指向隊列兩端的指針,這里就是圖上畫出的base和top);
- 而當(dāng)某些工作線程的任務(wù)為空的時候,就會從其他隊列(不限于workQueue,也會是提交隊列)竊取(steal)任務(wù),如圖示擁有workQueue2的工作線程從workQueue1中竊取了一個任務(wù),竊取任務(wù)的時候采用的是先進(jìn)先出FIFO的策略(即從base端竊取任務(wù)),這樣不但可以避免在取任務(wù)的時候與擁有其隊列的工作線程發(fā)生沖突,從而減小競爭,還可以輔助其完成比較大的任務(wù)。
- asyncMode為true的話,擁有該工作隊列的工作線程將按照先進(jìn)先出的策略從base端取任務(wù),這一般只用于不需要返回結(jié)果的任務(wù),或者事件消息傳遞框架。
ForkJoinPool構(gòu)造函數(shù)
其完整構(gòu)造方法如下
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
重要參數(shù)解釋
- parallelism:并行度( the parallelism level),默認(rèn)情況下跟我們機(jī)器的cpu個數(shù)保持一致,使用 Runtime.getRuntime().availableProcessors()可以得到我們機(jī)器運(yùn)行時可用的CPU個數(shù)。
- factory:創(chuàng)建新線程的工廠( the factory for creating new threads)。默認(rèn)情況下使用ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory。
- handler:線程異常情況下的處理器(Thread.UncaughtExceptionHandler handler),該處理器在線程執(zhí)行任務(wù)時由于某些無法預(yù)料到的錯誤而導(dǎo)致任務(wù)線程中斷時進(jìn)行一些處理,默認(rèn)情況為null。
- asyncMode:這個參數(shù)要注意,在ForkJoinPool中,每一個工作線程都有一個獨(dú)立的任務(wù)隊列
asyncMode表示工作線程內(nèi)的任務(wù)隊列是采用何種方式進(jìn)行調(diào)度,可以是先進(jìn)先出FIFO,也可以是后進(jìn)先出LIFO。如果為true,則線程池中的工作線程則使用先進(jìn)先出方式進(jìn)行任務(wù)調(diào)度,默認(rèn)情況下是false。
ForkJoinPool.submit 方法
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
//提交到工作隊列
externalPush(task);
return task;
}
ForkJoinPool 自身擁有工作隊列,這些工作隊列的作用是用來接收由外部線程(非 ForkJoinThread 線程)提交過來的任務(wù),而這些工作隊列被稱為 submitting queue 。
submit() 和 fork() 其實(shí)沒有本質(zhì)區(qū)別,只是提交對象變成了 submitting queue 而已(還有一些同步,初始化的操作)。submitting queue 和其他 work queue 一樣,是工作線程”竊取“的對象,因此當(dāng)其中的任務(wù)被一個工作線程成功竊取時,就意味著提交的任務(wù)真正開始進(jìn)入執(zhí)行階段。
相關(guān)文章
- 《并發(fā)編程之Executor線程池原理與源碼解讀》
- 《并發(fā)編程之定時任務(wù)&定時線程池原理解析》
- 《并發(fā)編程之Future&FutureTask深入解析》
- 《并發(fā)編程之ThreadLocal深入理解》
PS:以上代碼提交在 Github :https://github.com/Niuh-Study/niuh-juc-final.git
文章持續(xù)更新,可以公眾號搜一搜「 一角錢技術(shù) 」第一時間閱讀, 本文 GitHub org_hejianhui/JavaStudy 已經(jīng)收錄,歡迎 Star。