ForkJoin分析

一、ForkJoin

ForkJoin是由JDK1.7后提供多線并發(fā)處理框架。ForkJoin的框架的基本思想是分而治之。什么是分而治之?分而治之就是將一個(gè)復(fù)雜的計(jì)算,按照設(shè)定的閾值進(jìn)行分解成多個(gè)計(jì)算,然后將各個(gè)計(jì)算結(jié)果進(jìn)行匯總。相應(yīng)的ForkJoin將復(fù)雜的計(jì)算當(dāng)做一個(gè)任務(wù)。而分解的多個(gè)計(jì)算則是當(dāng)做一個(gè)子任務(wù)。


image.png

二、ForkJoin的使用

  • 一、 創(chuàng)建Task
    使用ForkJoin框架,需要?jiǎng)?chuàng)建一個(gè)ForkJoin的任務(wù),而ForkJoinTask是一個(gè)抽象類,我們不需要去繼承ForkJoinTask進(jìn)行使用。因?yàn)镕orkJoin框架為我們提供了RecursiveAction和RecursiveTask。我們只需要繼承ForkJoin為我們提供的抽象類的其中一個(gè)并且實(shí)現(xiàn)compute方法。
private static class SumTask extends RecursiveTask<Integer> {

        private  int threshold ;
        private static final int segmentation = 10;

        private int[] src;

        private int fromIndex;
        private int toIndex;

        public SumTask(int formIndex,int toIndex,int[] src){
            this.fromIndex = formIndex;
            this.toIndex = toIndex;
            this.src = src;
            this.threshold = src.length/segmentation;
        }

        @Override
        protected Integer compute() {
            if((toIndex - fromIndex)<threshold ){
                int count = 0;
                System.out.println(" from index = "+fromIndex
                        +" toIndex="+toIndex);
                for(int i = fromIndex;i<=toIndex;i++){
                  count+=src[i];
                }
                return count;
            }else{
                int mid = (fromIndex+toIndex)/2;
                SumTask left =  new SumTask(fromIndex,mid,src);
                SumTask right = new SumTask(mid+1,toIndex,src);
                invokeAll(left,right);
                return left.join()+right.join();
            }
        }
    }
  • 二、使用ForkJoinPool進(jìn)行執(zhí)行
    task要通過(guò)ForkJoinPool來(lái)執(zhí)行,分割的子任務(wù)也會(huì)添加到當(dāng)前工作線程的雙端隊(duì)列中,
    進(jìn)入隊(duì)列的頭部。當(dāng)一個(gè)工作線程中沒(méi)有任務(wù)時(shí),會(huì)從其他工作線程的隊(duì)列尾部獲取一個(gè)任務(wù)(工作竊取)。
 public static void main(String[] args) {
            int[]  array = MakeArray.createIntArray();
        ForkJoinPool forkJoinPool= new ForkJoinPool();
        SumTask sumTask  = new SumTask(0,array.length-1,array);

        long start = System.currentTimeMillis();

        forkJoinPool.invoke(sumTask);
        System.out.println("The count is "+sumTask.join()
                +" spend time:"+(System.currentTimeMillis()-start)+"ms");

    }

三、RecursiveTask和RecursiveAction區(qū)別

  • RecursiveTask
    通過(guò)源碼的查看我們可以發(fā)現(xiàn)RecursiveTask在進(jìn)行exec之后會(huì)使用一個(gè)result的變量進(jìn)行接受返回的結(jié)果。而result返回結(jié)果類型是通過(guò)泛型進(jìn)行傳入。也就是說(shuō)RecursiveTask執(zhí)行后是有返回結(jié)果。
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
    private static final long serialVersionUID = 5232453952276485270L;

    /**
     * The result of the computation.
     */
    V result;

    /**
     * The main computation performed by this task.
     * @return the result of the computation
     */
    protected abstract V compute();

    public final V getRawResult() {
        return result;
    }

    protected final void setRawResult(V value) {
        result = value;
    }

    /**
     * Implements execution conventions for RecursiveTask.
     */
    protected final boolean exec() {
        result = compute();
        return true;
    }

}
  • RecursiveAction
    RecursiveAction在exec后是不會(huì)保存返回結(jié)果,因此RecursiveAction與RecursiveTask區(qū)別在與RecursiveTask是有返回結(jié)果而RecursiveAction是沒(méi)有返回結(jié)果。
public abstract class RecursiveAction extends ForkJoinTask<Void> {
    private static final long serialVersionUID = 5232453952276485070L;

    /**
     * The main computation performed by this task.
     */
    protected abstract void compute();

    /**
     * Always returns {@code null}.
     *
     * @return {@code null} always
     */
    public final Void getRawResult() { return null; }

    /**
     * Requires null completion value.
     */
    protected final void setRawResult(Void mustBeNull) { }

    /**
     * Implements execution conventions for RecursiveActions.
     */
    protected final boolean exec() {
        compute();
        return true;
    }

}

ForkJoinTask是RecursiveAction與RecursiveTask的父類, ForkJoinTask中使用了模板模式進(jìn)行設(shè)計(jì)
,將ForkJoinTask的執(zhí)行相關(guān)的代碼進(jìn)行隱藏,通過(guò)提供抽象類暴露用戶的實(shí)際業(yè)務(wù)處理。

三、ForJoin注意點(diǎn)

使用ForkJoin將相同的計(jì)算任務(wù)通過(guò)多線程的進(jìn)行執(zhí)行。從而能提高數(shù)據(jù)的計(jì)算速度。在google的中的大數(shù)據(jù)處理框架mapreduce就通過(guò)類似ForkJoin的思想。通過(guò)多線程提高大數(shù)據(jù)的處理。但是我們需要注意:

  • 使用這種多線程帶來(lái)的數(shù)據(jù)共享問(wèn)題,在處理結(jié)果的合并的時(shí)候如果涉及到數(shù)據(jù)共享的問(wèn)題,我們盡可能使用JDK為我們提供的并發(fā)容器。
  • 在使用JVM的時(shí)候我們要考慮OOM的問(wèn)題,如果我們的任務(wù)處理時(shí)間非常耗時(shí),并且處理的數(shù)據(jù)非常大的時(shí)候。會(huì)造成OOM。
  • ForkJoin也是通過(guò)多線程的方式進(jìn)行處理任務(wù)。那么我們不得不考慮是否應(yīng)該使用ForkJoin。因?yàn)楫?dāng)數(shù)據(jù)量不是特別大的時(shí)候,我們沒(méi)有必要使用ForkJoin。因?yàn)槎嗑€程會(huì)涉及到上下文的切換。所以數(shù)據(jù)量不大的時(shí)候使用串行比使用多線程快。

四、ForkJoin工作竊?。╳ork-stealing)

為什么ForkJoin會(huì)存在工作竊取呢?因?yàn)槲覀儗⑷蝿?wù)進(jìn)行分解成多個(gè)子任務(wù)的時(shí)候。每個(gè)子任務(wù)的處理時(shí)間都不一樣。例如分別有子任務(wù)A\B。如果子任務(wù)A的1ms的時(shí)候已經(jīng)執(zhí)行,子任務(wù)B還在執(zhí)行。那么如果我們子任務(wù)A的線程等待子任務(wù)B完畢后在進(jìn)行匯總,那么子任務(wù)A線程就會(huì)在浪費(fèi)執(zhí)行時(shí)間,最終的執(zhí)行時(shí)間就以最耗時(shí)的子任務(wù)為準(zhǔn)。而如果我們的子任務(wù)A執(zhí)行完畢后,處理子任務(wù)B的任務(wù),并且執(zhí)行完畢后將任務(wù)歸還給子任務(wù)B。這樣就可以提高執(zhí)行效率。而這種就是工作竊取。

五、ForkJoin排序

public class SortForkJoin {
    /**
     * 數(shù)組排序
     *
     * @param arry
     * @return
     */
    public static int[] sort(int[] arry) {
        if (arry.length == 0) return arry;
        for (int index = 0; index < arry.length - 1; index++) {
            int pre_index = index;
            int currentValue = arry[index + 1];
            while (pre_index >= 0 && arry[pre_index] > currentValue) {
                arry[pre_index + 1] = arry[pre_index];
                pre_index--;
            }
            arry[pre_index + 1] = currentValue;
        }
        return arry;
    }

    /**
     * 組合
     *
     * @param left
     * @param right
     * @return
     */
    public static int[] merge(int[] left, int[] right) {
        int[] result = new int[left.length + right.length];
        for (int resultIndex = 0, leftIndex = 0, rightIndex = 0; resultIndex < result.length; resultIndex++) {
            if (leftIndex >= left.length) {
                result[resultIndex] = right[rightIndex++];
            } else if (rightIndex >= right.length) {
                result[resultIndex] = left[leftIndex++];
            } else if (left[leftIndex] > right[rightIndex]) {
                result[resultIndex] = right[rightIndex++];
            } else {
                result[resultIndex] = left[leftIndex++];
            }
        }
        return result;
    }


     static  class SortTask extends RecursiveTask<int[]> {
        private int threshold;
        private int start;
        private int end;
        private int segmentation ;
        private int[] src;

        public SortTask(int[] src,int start,int end,int segmentation){
            this.src = src;
            this.start = start;
            this.end = end;
            this.threshold = src.length/segmentation;
            this.segmentation = segmentation;
        }
        @Override
        protected int[] compute() {
            if((end - start) <threshold){
               int mid =  (end-start)/2;
               SortTask leftTask = new SortTask(src,start,mid,segmentation);
               SortTask rightTask = new SortTask(src,mid+1,end,segmentation);
               invokeAll(leftTask,rightTask);
               return SortForkJoin.merge(leftTask.join(),rightTask.join());
            }else{
               return  SortForkJoin.sort(src);
            }
        }
    }

    @Test
    public void test() {
        int[]  array = MakeArray.createIntArray();
        ForkJoinPool forkJoinPool= new ForkJoinPool();
        SortTask sortTask =new SortTask(array,0,array.length-1,1000);
        long start = System.currentTimeMillis();
        forkJoinPool.execute(sortTask);
        System.out.println(
                " spend time:"+(System.currentTimeMillis()-start)+"ms");
    }

}


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容