什么是Fork/Join框架
Fork/Join框架是一組允許程序員利用多核處理器支持的并行執(zhí)行的API。它使用了“分而治之”策略:把非常大的問題分成更小的部分,反過來,小部分又可以進(jìn)一步分成更小的部分,遞歸地直到一個(gè)部分可以直接解決。這被叫做“fork”。
然后所有部件在多個(gè)處理核心上并行執(zhí)行。每個(gè)部分的結(jié)果被“join”在一起以產(chǎn)生最終結(jié)果。因此,框架的名稱是“Fork/Join”。
下面的為代碼展示了分治策略如何與Fork/Join框架一起工作:
if (problemSize < threshold)
solve problem directly
else {
break problem into subproblems
recursively solve each problem
combine the results
}
Fork/Join框架在JDk7中被加入,并在JDK8中進(jìn)行了改進(jìn)。它用了Java語言中的幾個(gè)新特性,包括并行的Stream API和排序。
Fork/Join框架簡化了并行程序的原因有:
- 它簡化了線程的創(chuàng)建,在框架中線程是自動(dòng)被創(chuàng)建和管理。
- 它自動(dòng)使用多個(gè)處理器,因此程序可以擴(kuò)展到使用可用處理器。
由于支持真正的并行執(zhí)行,F(xiàn)ork/Join框架可以顯著減少計(jì)算時(shí)間,并提高解決圖像處理、視頻處理、大數(shù)據(jù)處理等非常大問題的性能。
關(guān)于Fork/Join框架的一個(gè)有趣的地方是:它使用工作竊取算法來平衡線程之間的負(fù)載:如果一個(gè)工作線程沒有事情要做,它可以從其他仍然忙碌的線程竊取任務(wù)。
理解Fork/Join框架API
Fork/Join框架在java.util.concurrent包下被實(shí)現(xiàn)。它的核心有4個(gè)類:
-
ForkJoinTask<V>: 這是一個(gè)抽象任務(wù)類,并且運(yùn)行在
ForkJoinPool中。 -
ForkJoinPool:這是一個(gè)線程池管理并運(yùn)行眾多
ForkJoinTask任務(wù)。 -
RecursiveAction:
ForkJoinTask的子類,這個(gè)類沒有返回值。 -
RecursiveTask<V>:
ForkJoinTask的子類,有返回值。
基本上,我們解決問題的代碼是在RecursiveAction或者RecursiveTask中進(jìn)行的,然后將任務(wù)提交由ForkJoinPool`執(zhí)行,F(xiàn)orkJoinPool處理從線程管理到多核處理器的利用等各種事務(wù)。
我們先來理解一下這些類中的關(guān)鍵方法。
ForkJoinTask<V>
這是一個(gè)運(yùn)行在ForkJoinPool中的抽象的任務(wù)類。類型V指定了任務(wù)的返回結(jié)果。ForkJoinTask是一個(gè)類似線程的實(shí)體,它表示任務(wù)的輕量級抽象,而不是實(shí)際的執(zhí)行線程。該機(jī)制允許由ForkJoinPool中的少量實(shí)際線程管理大量任務(wù)。其關(guān)鍵方法是:
- final ForkJoinTask<V> fork()
- final V join()
- final V invoke()
fork()方法提交并執(zhí)行異步任務(wù),該方法返回ForkJoinTask并且調(diào)用線程繼續(xù)運(yùn)行。
join()方法等待任務(wù)直到返回結(jié)果。
invoke()方法是組合了fork()和join(),它開始一個(gè)任務(wù)并等待結(jié)束返回結(jié)果。
此外,ForkJoinTask中還提供了用于一次調(diào)用多個(gè)任務(wù)的兩個(gè)靜態(tài)方法
- static void invokeAll(ForkJoinTask<?> task1, ForkJoinTask<?> task2) :執(zhí)行兩個(gè)任務(wù)
- static void invokeAll(ForkJoinTask<?>… taskList):執(zhí)行任務(wù)集合
RecursiveAction
這是一個(gè)遞歸的ForkJoinTask子類,不返回結(jié)果。Recursive意思是任務(wù)可以通過分治策略分成自己的子任務(wù)(在下面的下一節(jié)中,您將看到如何劃分代碼示例)。
我們必須重寫compute()方法,并將計(jì)算代碼寫在其中:
protected abstract void compute();
RecursiveTask<V>
和RecursiveAction一樣,但是RecursiveTask有返回結(jié)果,結(jié)果類型由V指定。我們?nèi)匀恍枰貙?code>compute()方法:
protected abstract V compute();
ForkJoinPool
這是Fork/Join框架的核心類。它負(fù)責(zé)線程的管理和ForkJoinTask的執(zhí)行,為了執(zhí)行ForkJoinTask,首先需要獲取到ForkJoinPool的實(shí)例。
有兩種構(gòu)造器方式可以獲取ForkJoinPool的實(shí)例,第一種使用構(gòu)造器創(chuàng)建:
- ForkJoinPool(): 使用默認(rèn)的構(gòu)造器創(chuàng)建實(shí)例,該構(gòu)造器創(chuàng)建出的池與系統(tǒng)中可用的處理器數(shù)量相等。
- ForkJoinPool(int parallelism):該構(gòu)造器指定處理器數(shù)量,創(chuàng)建具有自定義并行度級別的池,該級別的并行度必須大于0,且不超過可用處理器的實(shí)際數(shù)量。
并行性的級別決定了可以并發(fā)執(zhí)行的線程的數(shù)量。換句話說,它決定了可以同時(shí)執(zhí)行的任務(wù)的數(shù)量——但不能超過處理器的數(shù)量。
但是,這并不限制池可以管理的任務(wù)的數(shù)量。ForkJoinPool可以管理比其并行級別多得多的任務(wù)。
獲取ForkJoinPool實(shí)例的第二種方法是使用以下ForkJoinPool的靜態(tài)方法獲取公共池實(shí)例:
public static ForkJoinPool commonPool();
這種方式創(chuàng)建的池不受shutdown()或者shutdownNow()方法的影響,但是他會在System.exit()時(shí)會自動(dòng)中止。任何依賴異步任務(wù)處理的程序在主體程序中止前都應(yīng)該調(diào)用awaitQuiescence()方法。該方式是靜態(tài)的,可以自動(dòng)被使用。
在ForkJoinPool中執(zhí)行ForkJoinTasks
在創(chuàng)建好ForkJoinPool實(shí)例之后,可以使用下面的方法執(zhí)行任務(wù):
- <T>T invoke(ForkJoinTask<T> task):執(zhí)行指定任務(wù)并返回結(jié)果,該方法是異步的,調(diào)用的線程會一直等待直到該方法返回結(jié)果,對于RecursiveAction任務(wù)來說,參數(shù)類型是Void.
- void execute(ForkJoinTask<?> task):異步執(zhí)行指定的任務(wù),調(diào)用的線程一直等待知道任務(wù)完成才會繼續(xù)執(zhí)行。
另外,也可以通過ForkJoinTask自己擁有的方法fork()和invoke()執(zhí)行任務(wù)。在這種情況下,如果任務(wù)還沒在ForkJoinPool中運(yùn)行,那么commonPool()將會自動(dòng)被使用。
值得注意的一點(diǎn)是:ForkJoinPool使用的是守護(hù)線程,當(dāng)所有的用戶線程被終止是它也會被終止,這意味著可以不必顯示的關(guān)閉ForkPoolJoin(雖然這樣也可以)。如果是common pool的情況下,調(diào)用shutdown沒有任何效果,應(yīng)為這個(gè)池總是可用的。
好了,現(xiàn)在來看看一些例子。
案例
使用RecursiveAction
這里例子中,看一下如果使用Fork/Join框架去執(zhí)行一個(gè)沒有返回值的任務(wù)。
假設(shè)要對一個(gè)很大的數(shù)字?jǐn)?shù)組進(jìn)行變換,為了簡單簡單起見,轉(zhuǎn)換只需要將數(shù)組中的每個(gè)元素乘以指定的數(shù)字。下面的代碼用于轉(zhuǎn)換任務(wù):
import java.util.concurrent.*;
public class ArrayTransform extends RecursiveAction {
int[] array;
int number;
int threshold = 100_000;
int start;
int end;
public ArrayTransform(int[] array, int number, int start, int end) {
this.array = array;
this.number = number;
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if (end - start < threshold) {
computeDirectly();
} else {
int middle = (end + start) / 2;
ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle);
ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end);
invokeAll(subTask1, subTask2);
}
}
protected void computeDirectly() {
for (int i = start; i < end; i++) {
array[i] = array[i] * number;
}
}
}
可以看到,這是一個(gè)RecursiveAction的子類,我們重寫了compute()方法。
數(shù)組和數(shù)字從它的構(gòu)造函數(shù)傳遞。參數(shù)start和end指定要處理的數(shù)組中的元素的范圍。如果數(shù)組的大小大于閾值,這有助于將數(shù)組拆分為子數(shù)組,否則直接對整個(gè)數(shù)組執(zhí)行計(jì)算。
觀察else中的代碼片段:
protected void compute() {
if (end - start < threshold) {
computeDirectly();
} else {
int middle = (end + start) / 2;
ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle);
ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end);
invokeAll(subTask1, subTask2);
}
}
這里,將數(shù)組分成兩個(gè)部分,并分別創(chuàng)建他們的子任務(wù),反過來,子任務(wù)也可以遞歸的進(jìn)一步劃分為更小的子任務(wù),直到其大小小于直接調(diào)用computeDirectly();方法的的閾值。
然后,在main函數(shù)中創(chuàng)建ForkJoinPool執(zhí)行任務(wù):
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(mainTask);
或者使用common pool執(zhí)行任務(wù):
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
mainTask.invoke();
這里是全部的測試程序:
import java.util.*;
import java.util.concurrent.*;
public class ForkJoinRecursiveActionTest {
static final int SIZE = 10_000_000;
static int[] array = randomArray();
public static void main(String[] args) {
int number = 9;
System.out.println("數(shù)組中的初始元素: ");
print();
ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(mainTask);
System.out.println("并行計(jì)算之后的元素:");
print();
}
static int[] randomArray() {
int[] array = new int[SIZE];
Random random = new Random();
for (int i = 0; i < SIZE; i++) {
array[i] = random.nextInt(100);
}
return array;
}
static void print() {
for (int i = 0; i < 10; i++) {
System.out.print(array[i] + ", ");
}
System.out.println();
}
}
如您所見,使用隨機(jī)生成的1,000萬個(gè)元素?cái)?shù)組進(jìn)行測試。由于數(shù)組太大,我們在計(jì)算前后只打印前10個(gè)元素,看效果如何:
數(shù)組中的初始元素:
42, 98, 43, 14, 9, 92, 33, 18, 18, 76,
并行計(jì)算之后的元素:
378, 882, 387, 126, 81, 828, 297, 162, 162, 684,
使用RecursiveTask
這個(gè)例子中,展示了如何使用帶有返回值的任務(wù),下面的任務(wù)計(jì)算在一個(gè)大數(shù)組中出現(xiàn)偶數(shù)的次數(shù):
import java.util.concurrent.*;
public class ArrayCounter extends RecursiveTask<Integer> {
int[] array;
int threshold = 100_000;
int start;
int end;
public ArrayCounter(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
protected Integer compute() {
if (end - start < threshold) {
return computeDirectly();
} else {
int middle = (end + start) / 2;
ArrayCounter subTask1 = new ArrayCounter(array, start, middle);
ArrayCounter subTask2 = new ArrayCounter(array, middle, end);
invokeAll(subTask1, subTask2);
return subTask1.join() + subTask2.join();
}
}
protected Integer computeDirectly() {
Integer count = 0;
for (int i = start; i < end; i++) {
if (array[i] % 2 == 0) {
count++;
}
}
return count;
}
}
如你所見,這個(gè)類是RecursiveTask的子類并且重寫了compute()方法,并且返回了一個(gè)整型的結(jié)果。
這里還使用了join()方法去合并子任務(wù)的結(jié)果:
return subTask1.join() + subTask2.join();
測試程序就和RecursiveAction的一樣:
import java.util.*;
import java.util.concurrent.*;
public class ForkJoinRecursiveTaskTest {
static final int SIZE = 10_000_000;
static int[] array = randomArray();
public static void main(String[] args) {
ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE);
ForkJoinPool pool = new ForkJoinPool();
Integer evenNumberCount = pool.invoke(mainTask);
System.out.println("偶數(shù)的個(gè)數(shù): " + evenNumberCount);
}
static int[] randomArray() {
int[] array = new int[SIZE];
Random random = new Random();
for (int i = 0; i < SIZE; i++) {
array[i] = random.nextInt(100);
}
return array;
}
}
運(yùn)行程序就會看到如下的結(jié)果:
偶數(shù)的個(gè)數(shù): 5000045
并行性試驗(yàn)
這個(gè)例子展示并行性的級別如何影響計(jì)算時(shí)間:
ArrayCounter類讓閾值可以通過構(gòu)造器傳入:
import java.util.concurrent.*;
public class ArrayCounter extends RecursiveTask<Integer> {
int[] array;
int threshold;
int start;
int end;
public ArrayCounter(int[] array, int start, int end, int threshold) {
this.array = array;
this.start = start;
this.end = end;
this.threshold = threshold;
}
protected Integer compute() {
if (end - start < threshold) {
return computeDirectly();
} else {
int middle = (end + start) / 2;
ArrayCounter subTask1 = new ArrayCounter(array, start, middle, threshold);
ArrayCounter subTask2 = new ArrayCounter(array, middle, end, threshold);
invokeAll(subTask1, subTask2);
return subTask1.join() + subTask2.join();
}
}
protected Integer computeDirectly() {
Integer count = 0;
for (int i = start; i < end; i++) {
if (array[i] % 2 == 0) {
count++;
}
}
return count;
}
}
測試程序?qū)⒉⑿卸燃墑e和閾值作為參數(shù)傳遞:
import java.util.*;
import java.util.concurrent.*;
public class ParallelismTest {
static final int SIZE = 10_000_000;
static int[] array = randomArray();
public static void main(String[] args) {
int threshold = Integer.parseInt(args[0]);
int parallelism = Integer.parseInt(args[1]);
long startTime = System.currentTimeMillis();
ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE, threshold);
ForkJoinPool pool = new ForkJoinPool(parallelism);
Integer evenNumberCount = pool.invoke(mainTask);
long endTime = System.currentTimeMillis();
System.out.println("偶數(shù)的個(gè)數(shù): " + evenNumberCount);
long time = (endTime - startTime);
System.out.println("執(zhí)行時(shí)間: " + time + " ms");
}
static int[] randomArray() {
int[] array = new int[SIZE];
Random random = new Random();
for (int i = 0; i < SIZE; i++) {
array[i] = random.nextInt(100);
}
return array;
}
}
該程序允許您使用不同的并行度和閾值輕松測試性能。注意,它在最后打印執(zhí)行時(shí)間。嘗試用不同的參數(shù)多次運(yùn)行這個(gè)程序,并觀察執(zhí)行時(shí)間。
結(jié)論
- Fork/Join框架的設(shè)計(jì)簡化了java語言的并行程序
-
ForkJoinPool是Fork/Join框架的核心,它允許多個(gè)ForkJoinTask請求由少量實(shí)際線程執(zhí)行,每個(gè)線程運(yùn)行在單獨(dú)的處理核心上 - 既可以通過構(gòu)造器也可以通過靜態(tài)方法common pool去獲取ForkJoinPool的實(shí)例
- ForkJoinTask是一個(gè)抽象類,它表示的任務(wù)比普通線程更輕。通過覆蓋其compute()方法實(shí)現(xiàn)計(jì)算邏輯
- RecursiveAction是一個(gè)沒有返回值的ForkJoinTask
- RecursiveTask是一個(gè)有返回值的ForkJoinTask
- ForkJoinPool與其它池的不同之處在于,它使用了工作竊取算法,該算法允許一個(gè)線程完成了可以做的事情,從仍然繁忙的其他線程竊取任務(wù)
- ForkJoinPool中的線程是守護(hù)線程,不必顯式地關(guān)閉池
- 執(zhí)行一個(gè)ForkJoinTask既可以通過調(diào)用它自己的
invoke()或fork()方法,也可以提交任務(wù)給ForkJoinPool并調(diào)用它的invoke()或者execute()方法 - 直接使用ForkJoinTask自身的方法執(zhí)行任務(wù),如果它還沒運(yùn)行在
ForkJoinPool中那么將運(yùn)行在common pool中 - 在
ForkJoinTask中使用join()方法,可以合并子任務(wù)的結(jié)果 -
invoke()方法會等待子任務(wù)完成,但是execute()方法不會