并發(fā)編程系列之什么是ForkJoin框架?

并發(fā)編程系列之什么是ForkJoin框架?

1、什么是ForkJoin框架

ForkJoin框架是java的JUC包里提供的,用于處理一些比較繁重的任務(wù),會將這個大任務(wù)分為多個小任務(wù),多個小任務(wù)處理完成后會將結(jié)果匯總給Result,體現(xiàn)的是一種“分而治之”的思想。第一步,拆分fork任務(wù),將大任務(wù)分為多個小任務(wù);第二步,歸并join,會將小任務(wù)的處理結(jié)果進行歸并為一個結(jié)果。


在這里插入圖片描述

2、ForkJoinTask

ForkJoinTaskForkJoin框架的提供的任務(wù)API,ForkJoinTask是一個抽象類,有兩個主要的實現(xiàn)類,RecursiveTaskRecursiveAction,其中RecursiveTaskRecursiveAction的主要區(qū)別是,RecursiveAction沒有返回值,而RecursiveTask是有返回值的

在這里插入圖片描述

3、ForkJoinPool

ForkJoinPool類是forkjoin框架的線程池實現(xiàn),基于ExecutorService接口。這個線程池是jdk1.7才加入的,用于管理線程,執(zhí)行forkjoin的任務(wù)。對于線程池的使用,我們使用ThreadPoolExecutor比較多,可以在idea里看一下uml類圖,可以看出ForkJoinPool和ThreadPoolExecutor實現(xiàn)差不多的。


在這里插入圖片描述
ForkJoinPool()
ForkJoinPool(int parallelism)
ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory,
 UncaughtExceptionHandler handler, boolean asyncMode)

幾個重要的參數(shù):

  • parallelism:并行度,并行執(zhí)行線程,可用指定,也可以不指定,不指定的情況,是根據(jù)cpu核數(shù)創(chuàng)建可用的線程
  • ForkJoinWorkerThreadFactory:創(chuàng)建線程的工廠實現(xiàn)
  • UncaughtExceptionHandler :因為未知異常中斷的回調(diào)處理
  • asyncMode:是否異步,默認(rèn)情況是false

使用時候,可以直接創(chuàng)建ForkJoinPool,可以不傳參,不傳參的情況,默認(rèn)指定的線程并行數(shù)為Runtime.getRunTime().availableProcessors();,表示根據(jù)cpu核數(shù)創(chuàng)建可用線程數(shù)

ForkJoinPool forkJoinPool = new ForkJoinPool();
ArraySortTask task = new ArraySortTask(array , 0 , size);
forkJoinPool.submit(task);
task.get();

也是可用傳參,對并行度進行指定的public ForkJoinPool(int parallelism), parallelism并行度,并行執(zhí)行幾個線程

將forkjoin任務(wù)加入到FrokJoinPool線程池有幾種方式

  • execute():調(diào)用其 fork 方法在多個線程之間拆分工作。
  • invoke():在ForkJoinPool線程池上調(diào)用invoke方法
  • submit():返回一個Future對象,F(xiàn)uture可以進行監(jiān)控,任務(wù)完成后返回結(jié)果

4、打印斐波那契數(shù)列

ForkJoin框架可以用于一些遞歸遍歷的場景,對于斐波那契數(shù)列,你可以比較熟悉,因為在面試中有時候經(jīng)常問到,斐波那契數(shù)列的特點就是最后一項的結(jié)果等于前面兩項的和

package com.example.concurrent.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * <pre>
 *      斐波那契數(shù)列
 * </pre>
 * <p>
 * <pre>
 * @author nicky.ma
 * 修改記錄
 *    修改后版本:     修改人:  修改日期: 2021/10/12 16:22  修改內(nèi)容:
 * </pre>
 */
public class Fibonacci extends RecursiveTask<Integer>{

    private int n;

    public Fibonacci(int n) {
        this.n = n;
    }

    @Override
    protected Integer compute() {
        if (n <= 1)
            return n;
        Fibonacci f1 = new Fibonacci(n - 1);
        f1.fork();
        Fibonacci f2 = new Fibonacci(n - 2);
        f2.fork();
        return f1.join() + f2.join();
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool pool = new ForkJoinPool();
        for (int i = 0; i< 10; i++) {
            ForkJoinTask task = pool.submit(new Fibonacci(i));
            System.out.println(task.get());
        }
    }
}

5、ForkJoin歸并排序

面試題:快速實現(xiàn)對一個長度百萬的數(shù)組的排序

難點:可以使用歸并排序,多線程如何組織實現(xiàn)歸并排序

package com.example.concurrent.forkjoin;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

/**
 * <pre>
 *      大數(shù)組排序
 * </pre>
 * <p>
 * <pre>
 * @author mazq
 * 修改記錄
 *    修改后版本:     修改人:  修改日期: 2021/10/12 17:04  修改內(nèi)容:
 * </pre>
 */
public class ArraySortTask extends RecursiveAction{

    final long[] array; final int lo, hi;
    ArraySortTask(long[] array, int lo, int hi) {
        this.array = array; this.lo = lo; this.hi = hi;
    }
    ArraySortTask(long[] array) { this(array, 0, array.length); }

    @Override
    protected void compute() {
        if (hi - lo < THRESHOLD)
            // 少于閥值,使用Arrays.sort 快排
            sortSequentially(lo, hi);
        else {
            /* 歸并排序 */
            // 取中間值
            int mid = (lo + hi) >>> 1;
            // 拆分任務(wù)
            invokeAll(new ArraySortTask(array, lo, mid),
                    new ArraySortTask(array, mid, hi));
            // 歸并結(jié)果
            merge(lo, mid, hi);
        }
    }

    // implementation details follow:
    static final int THRESHOLD = 1000;
    void sortSequentially(int lo, int hi) {
        Arrays.sort(array, lo, hi);
    }

    void merge(int lo, int mid, int hi) {
        long[] buf = Arrays.copyOfRange(array, lo, mid);
        for (int i = 0, j = lo, k = mid; i < buf.length; j++)
            array[j] = (k == hi || buf[i] < array[k]) ?
                    buf[i++] : array[k++];
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        int size = 10_000;
        long[] array = new long[size];
        Random random = new Random();
        for (int i = 0; i< size; i++) {
            array[i] = random.nextInt();
        }

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ArraySortTask task = new ArraySortTask(array , 0 , size);
        forkJoinPool.submit(task);
        task.get();

        for (long a : array) {
            System.out.println(a);
        }
    }
}

參考資料

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容