Fork/Join框架學(xué)習(xí)

什么是Fork/Join框架

Fork/Join框架是一組允許程序員利用多核處理器支持的并行執(zhí)行的API。它使用了“分而治之”策略:把非常大的問題分成更小的部分,反過來,小部分又可以進(jìn)一步分成更小的部分,遞歸地直到一個(gè)部分可以直接解決。這被叫做“fork”。

然后所有部件在多個(gè)處理核心上并行執(zhí)行。每個(gè)部分的結(jié)果被“join”在一起以產(chǎn)生最終結(jié)果。因此,框架的名稱是“Fork/Join”。

下面的為代碼展示了分治策略如何與Fork/Join框架一起工作:

if (problemSize < threshold)
    solve problem directly
else {
    break problem into subproblems
    recursively solve each problem
    combine the results
}

Fork/Join框架在JDk7中被加入,并在JDK8中進(jìn)行了改進(jìn)。它用了Java語言中的幾個(gè)新特性,包括并行的Stream API和排序。

Fork/Join框架簡化了并行程序的原因有:

  • 它簡化了線程的創(chuàng)建,在框架中線程是自動(dòng)被創(chuàng)建和管理。
  • 它自動(dòng)使用多個(gè)處理器,因此程序可以擴(kuò)展到使用可用處理器。

由于支持真正的并行執(zhí)行,F(xiàn)ork/Join框架可以顯著減少計(jì)算時(shí)間,并提高解決圖像處理、視頻處理、大數(shù)據(jù)處理等非常大問題的性能。

關(guān)于Fork/Join框架的一個(gè)有趣的地方是:它使用工作竊取算法來平衡線程之間的負(fù)載:如果一個(gè)工作線程沒有事情要做,它可以從其他仍然忙碌的線程竊取任務(wù)。

理解Fork/Join框架API

Fork/Join框架在java.util.concurrent包下被實(shí)現(xiàn)。它的核心有4個(gè)類:

  • ForkJoinTask<V>: 這是一個(gè)抽象任務(wù)類,并且運(yùn)行在ForkJoinPool中。
  • ForkJoinPool:這是一個(gè)線程池管理并運(yùn)行眾多ForkJoinTask任務(wù)。
  • RecursiveAction: ForkJoinTask的子類,這個(gè)類沒有返回值。
  • RecursiveTask<V>: ForkJoinTask的子類,有返回值。

基本上,我們解決問題的代碼是在RecursiveAction或者RecursiveTask中進(jìn)行的,然后將任務(wù)提交由ForkJoinPool`執(zhí)行,F(xiàn)orkJoinPool處理從線程管理到多核處理器的利用等各種事務(wù)。

我們先來理解一下這些類中的關(guān)鍵方法。

ForkJoinTask<V>

這是一個(gè)運(yùn)行在ForkJoinPool中的抽象的任務(wù)類。類型V指定了任務(wù)的返回結(jié)果。ForkJoinTask是一個(gè)類似線程的實(shí)體,它表示任務(wù)的輕量級抽象,而不是實(shí)際的執(zhí)行線程。該機(jī)制允許由ForkJoinPool中的少量實(shí)際線程管理大量任務(wù)。其關(guān)鍵方法是:

  • final ForkJoinTask<V> fork()
  • final V join()
  • final V invoke()

fork()方法提交并執(zhí)行異步任務(wù),該方法返回ForkJoinTask并且調(diào)用線程繼續(xù)運(yùn)行。

join()方法等待任務(wù)直到返回結(jié)果。

invoke()方法是組合了fork()join(),它開始一個(gè)任務(wù)并等待結(jié)束返回結(jié)果。

此外,ForkJoinTask中還提供了用于一次調(diào)用多個(gè)任務(wù)的兩個(gè)靜態(tài)方法

  • static void invokeAll(ForkJoinTask<?> task1, ForkJoinTask<?> task2) :執(zhí)行兩個(gè)任務(wù)
  • static void invokeAll(ForkJoinTask<?>… taskList):執(zhí)行任務(wù)集合

RecursiveAction

這是一個(gè)遞歸的ForkJoinTask子類,不返回結(jié)果。Recursive意思是任務(wù)可以通過分治策略分成自己的子任務(wù)(在下面的下一節(jié)中,您將看到如何劃分代碼示例)。

我們必須重寫compute()方法,并將計(jì)算代碼寫在其中:

protected abstract void compute();

RecursiveTask<V>

RecursiveAction一樣,但是RecursiveTask有返回結(jié)果,結(jié)果類型由V指定。我們?nèi)匀恍枰貙?code>compute()方法:

protected abstract V compute();

ForkJoinPool

這是Fork/Join框架的核心類。它負(fù)責(zé)線程的管理和ForkJoinTask的執(zhí)行,為了執(zhí)行ForkJoinTask,首先需要獲取到ForkJoinPool的實(shí)例。

有兩種構(gòu)造器方式可以獲取ForkJoinPool的實(shí)例,第一種使用構(gòu)造器創(chuàng)建:

  • ForkJoinPool(): 使用默認(rèn)的構(gòu)造器創(chuàng)建實(shí)例,該構(gòu)造器創(chuàng)建出的池與系統(tǒng)中可用的處理器數(shù)量相等。
  • ForkJoinPool(int parallelism):該構(gòu)造器指定處理器數(shù)量,創(chuàng)建具有自定義并行度級別的池,該級別的并行度必須大于0,且不超過可用處理器的實(shí)際數(shù)量。

并行性的級別決定了可以并發(fā)執(zhí)行的線程的數(shù)量。換句話說,它決定了可以同時(shí)執(zhí)行的任務(wù)的數(shù)量——但不能超過處理器的數(shù)量。

但是,這并不限制池可以管理的任務(wù)的數(shù)量。ForkJoinPool可以管理比其并行級別多得多的任務(wù)。

獲取ForkJoinPool實(shí)例的第二種方法是使用以下ForkJoinPool的靜態(tài)方法獲取公共池實(shí)例:

public static ForkJoinPool commonPool();

這種方式創(chuàng)建的池不受shutdown()或者shutdownNow()方法的影響,但是他會在System.exit()時(shí)會自動(dòng)中止。任何依賴異步任務(wù)處理的程序在主體程序中止前都應(yīng)該調(diào)用awaitQuiescence()方法。該方式是靜態(tài)的,可以自動(dòng)被使用。

ForkJoinPool中執(zhí)行ForkJoinTasks

在創(chuàng)建好ForkJoinPool實(shí)例之后,可以使用下面的方法執(zhí)行任務(wù):

  • <T>T invoke(ForkJoinTask<T> task):執(zhí)行指定任務(wù)并返回結(jié)果,該方法是異步的,調(diào)用的線程會一直等待直到該方法返回結(jié)果,對于RecursiveAction任務(wù)來說,參數(shù)類型是Void.
  • void execute(ForkJoinTask<?> task):異步執(zhí)行指定的任務(wù),調(diào)用的線程一直等待知道任務(wù)完成才會繼續(xù)執(zhí)行。

另外,也可以通過ForkJoinTask自己擁有的方法fork()invoke()執(zhí)行任務(wù)。在這種情況下,如果任務(wù)還沒在ForkJoinPool中運(yùn)行,那么commonPool()將會自動(dòng)被使用。

值得注意的一點(diǎn)是:ForkJoinPool使用的是守護(hù)線程,當(dāng)所有的用戶線程被終止是它也會被終止,這意味著可以不必顯示的關(guān)閉ForkPoolJoin(雖然這樣也可以)。如果是common pool的情況下,調(diào)用shutdown沒有任何效果,應(yīng)為這個(gè)池總是可用的。

好了,現(xiàn)在來看看一些例子。

案例

使用RecursiveAction

這里例子中,看一下如果使用Fork/Join框架去執(zhí)行一個(gè)沒有返回值的任務(wù)。

假設(shè)要對一個(gè)很大的數(shù)字?jǐn)?shù)組進(jìn)行變換,為了簡單簡單起見,轉(zhuǎn)換只需要將數(shù)組中的每個(gè)元素乘以指定的數(shù)字。下面的代碼用于轉(zhuǎn)換任務(wù):

import java.util.concurrent.*;
 
public class ArrayTransform extends RecursiveAction {
    int[] array;
    int number;
    int threshold = 100_000;
    int start;
    int end;
 
    public ArrayTransform(int[] array, int number, int start, int end) {
        this.array = array;
        this.number = number;
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected void compute() {
        if (end - start < threshold) {
            computeDirectly();
        } else {
            int middle = (end + start) / 2;
 
            ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle);
            ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end);
 
            invokeAll(subTask1, subTask2);
        }
    }
 
    protected void computeDirectly() {
        for (int i = start; i < end; i++) {
            array[i] = array[i] * number;
        }
    }
}

可以看到,這是一個(gè)RecursiveAction的子類,我們重寫了compute()方法。

數(shù)組和數(shù)字從它的構(gòu)造函數(shù)傳遞。參數(shù)start和end指定要處理的數(shù)組中的元素的范圍。如果數(shù)組的大小大于閾值,這有助于將數(shù)組拆分為子數(shù)組,否則直接對整個(gè)數(shù)組執(zhí)行計(jì)算。

觀察else中的代碼片段:

protected void compute() {
    if (end - start < threshold) {
        computeDirectly();
    } else {
        int middle = (end + start) / 2;
 
        ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle);
        ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end);
 
        invokeAll(subTask1, subTask2);
    }
}

這里,將數(shù)組分成兩個(gè)部分,并分別創(chuàng)建他們的子任務(wù),反過來,子任務(wù)也可以遞歸的進(jìn)一步劃分為更小的子任務(wù),直到其大小小于直接調(diào)用computeDirectly();方法的的閾值。

然后,在main函數(shù)中創(chuàng)建ForkJoinPool執(zhí)行任務(wù):

ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(mainTask);

或者使用common pool執(zhí)行任務(wù):

ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
mainTask.invoke();

這里是全部的測試程序:

import java.util.*;
import java.util.concurrent.*;
 
public class ForkJoinRecursiveActionTest {
    static final int SIZE = 10_000_000;
    static int[] array = randomArray();
 
    public static void main(String[] args) {
 
        int number = 9;
 
        System.out.println("數(shù)組中的初始元素: ");
        print();
 
        ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
        ForkJoinPool pool = new ForkJoinPool();
        pool.invoke(mainTask);
 
        System.out.println("并行計(jì)算之后的元素:");
        print();
    }
 
    static int[] randomArray() {
        int[] array = new int[SIZE];
        Random random = new Random();
 
        for (int i = 0; i < SIZE; i++) {
            array[i] = random.nextInt(100);
        }
 
        return array;
    }
 
    static void print() {
        for (int i = 0; i < 10; i++) {
            System.out.print(array[i] + ", ");
        }
        System.out.println();
    }
}

如您所見,使用隨機(jī)生成的1,000萬個(gè)元素?cái)?shù)組進(jìn)行測試。由于數(shù)組太大,我們在計(jì)算前后只打印前10個(gè)元素,看效果如何:

數(shù)組中的初始元素:
42, 98, 43, 14, 9, 92, 33, 18, 18, 76,
并行計(jì)算之后的元素:
378, 882, 387, 126, 81, 828, 297, 162, 162, 684,

使用RecursiveTask

這個(gè)例子中,展示了如何使用帶有返回值的任務(wù),下面的任務(wù)計(jì)算在一個(gè)大數(shù)組中出現(xiàn)偶數(shù)的次數(shù):

import java.util.concurrent.*;
 
public class ArrayCounter extends RecursiveTask<Integer> {
    int[] array;
    int threshold = 100_000;
    int start;
    int end;
 
    public ArrayCounter(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }
 
    protected Integer compute() {
        if (end - start < threshold) {
            return computeDirectly();
        } else {
            int middle = (end + start) / 2;
 
            ArrayCounter subTask1 = new ArrayCounter(array, start, middle);
            ArrayCounter subTask2 = new ArrayCounter(array, middle, end);
 
            invokeAll(subTask1, subTask2);
 
 
            return subTask1.join() + subTask2.join();
        }
    }
 
    protected Integer computeDirectly() {
        Integer count = 0;
 
        for (int i = start; i < end; i++) {
            if (array[i] % 2 == 0) {
                count++;
            }
        }
 
        return count;
    }
}

如你所見,這個(gè)類是RecursiveTask的子類并且重寫了compute()方法,并且返回了一個(gè)整型的結(jié)果。

這里還使用了join()方法去合并子任務(wù)的結(jié)果:

return subTask1.join() + subTask2.join();

測試程序就和RecursiveAction的一樣:

import java.util.*;
import java.util.concurrent.*;
 
public class ForkJoinRecursiveTaskTest {
    static final int SIZE = 10_000_000;
    static int[] array = randomArray();
 
    public static void main(String[] args) {
 
        ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE);
        ForkJoinPool pool = new ForkJoinPool();
        Integer evenNumberCount = pool.invoke(mainTask);
 
        System.out.println("偶數(shù)的個(gè)數(shù): " + evenNumberCount);
    }
 
    static int[] randomArray() {
        int[] array = new int[SIZE];
        Random random = new Random();
 
        for (int i = 0; i < SIZE; i++) {
            array[i] = random.nextInt(100);
        }
 
        return array;
    }
 
}

運(yùn)行程序就會看到如下的結(jié)果:

偶數(shù)的個(gè)數(shù): 5000045

并行性試驗(yàn)

這個(gè)例子展示并行性的級別如何影響計(jì)算時(shí)間:

ArrayCounter類讓閾值可以通過構(gòu)造器傳入:

import java.util.concurrent.*;
 
public class ArrayCounter extends RecursiveTask<Integer> {
    int[] array;
    int threshold;
    int start;
    int end;
 
    public ArrayCounter(int[] array, int start, int end, int threshold) {
        this.array = array;
        this.start = start;
        this.end = end;
        this.threshold = threshold;
    }
 
    protected Integer compute() {
        if (end - start < threshold) {
            return computeDirectly();
        } else {
            int middle = (end + start) / 2;
 
            ArrayCounter subTask1 = new ArrayCounter(array, start, middle, threshold);
            ArrayCounter subTask2 = new ArrayCounter(array, middle, end, threshold);
 
            invokeAll(subTask1, subTask2);
 
 
            return subTask1.join() + subTask2.join();
        }
    }
 
    protected Integer computeDirectly() {
        Integer count = 0;
 
        for (int i = start; i < end; i++) {
            if (array[i] % 2 == 0) {
                count++;
            }
        }
 
        return count;
    }
}

測試程序?qū)⒉⑿卸燃墑e和閾值作為參數(shù)傳遞:

import java.util.*;
import java.util.concurrent.*;
 
public class ParallelismTest {
    static final int SIZE = 10_000_000;
 
    static int[] array = randomArray();
 
    public static void main(String[] args) {
        int threshold = Integer.parseInt(args[0]);
        int parallelism = Integer.parseInt(args[1]);
 
        long startTime = System.currentTimeMillis();
 
        ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE, threshold);
        ForkJoinPool pool = new ForkJoinPool(parallelism);
        Integer evenNumberCount = pool.invoke(mainTask);
 
        long endTime = System.currentTimeMillis();
 
        System.out.println("偶數(shù)的個(gè)數(shù): " + evenNumberCount);
 
        long time = (endTime - startTime);
        System.out.println("執(zhí)行時(shí)間: " + time + " ms");
    }
 
    static int[] randomArray() {
        int[] array = new int[SIZE];
        Random random = new Random();
 
        for (int i = 0; i < SIZE; i++) {
            array[i] = random.nextInt(100);
        }
 
        return array;
    }
 
}

該程序允許您使用不同的并行度和閾值輕松測試性能。注意,它在最后打印執(zhí)行時(shí)間。嘗試用不同的參數(shù)多次運(yùn)行這個(gè)程序,并觀察執(zhí)行時(shí)間。

結(jié)論

  • Fork/Join框架的設(shè)計(jì)簡化了java語言的并行程序
  • ForkJoinPoolFork/Join框架的核心,它允許多個(gè)ForkJoinTask請求由少量實(shí)際線程執(zhí)行,每個(gè)線程運(yùn)行在單獨(dú)的處理核心上
  • 既可以通過構(gòu)造器也可以通過靜態(tài)方法common pool去獲取ForkJoinPool的實(shí)例
  • ForkJoinTask是一個(gè)抽象類,它表示的任務(wù)比普通線程更輕。通過覆蓋其compute()方法實(shí)現(xiàn)計(jì)算邏輯
  • RecursiveAction是一個(gè)沒有返回值的ForkJoinTask
  • RecursiveTask是一個(gè)有返回值的ForkJoinTask
  • ForkJoinPool與其它池的不同之處在于,它使用了工作竊取算法,該算法允許一個(gè)線程完成了可以做的事情,從仍然繁忙的其他線程竊取任務(wù)
  • ForkJoinPool中的線程是守護(hù)線程,不必顯式地關(guān)閉池
  • 執(zhí)行一個(gè)ForkJoinTask既可以通過調(diào)用它自己的invoke()fork()方法,也可以提交任務(wù)給ForkJoinPool并調(diào)用它的invoke()或者execute()方法
  • 直接使用ForkJoinTask自身的方法執(zhí)行任務(wù),如果它還沒運(yùn)行在ForkJoinPool中那么將運(yùn)行在common pool
  • ForkJoinTask中使用join()方法,可以合并子任務(wù)的結(jié)果
  • invoke()方法會等待子任務(wù)完成,但是execute()方法不會
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容