一、ForkJoin
ForkJoin是由JDK1.7后提供多線并發(fā)處理框架。ForkJoin的框架的基本思想是分而治之。什么是分而治之?分而治之就是將一個(gè)復(fù)雜的計(jì)算,按照設(shè)定的閾值進(jìn)行分解成多個(gè)計(jì)算,然后將各個(gè)計(jì)算結(jié)果進(jìn)行匯總。相應(yīng)的ForkJoin將復(fù)雜的計(jì)算當(dāng)做一個(gè)任務(wù)。而分解的多個(gè)計(jì)算則是當(dāng)做一個(gè)子任務(wù)。

二、ForkJoin的使用
- 一、 創(chuàng)建Task
使用ForkJoin框架,需要?jiǎng)?chuàng)建一個(gè)ForkJoin的任務(wù),而ForkJoinTask是一個(gè)抽象類,我們不需要去繼承ForkJoinTask進(jìn)行使用。因?yàn)镕orkJoin框架為我們提供了RecursiveAction和RecursiveTask。我們只需要繼承ForkJoin為我們提供的抽象類的其中一個(gè)并且實(shí)現(xiàn)compute方法。
private static class SumTask extends RecursiveTask<Integer> {
private int threshold ;
private static final int segmentation = 10;
private int[] src;
private int fromIndex;
private int toIndex;
public SumTask(int formIndex,int toIndex,int[] src){
this.fromIndex = formIndex;
this.toIndex = toIndex;
this.src = src;
this.threshold = src.length/segmentation;
}
@Override
protected Integer compute() {
if((toIndex - fromIndex)<threshold ){
int count = 0;
System.out.println(" from index = "+fromIndex
+" toIndex="+toIndex);
for(int i = fromIndex;i<=toIndex;i++){
count+=src[i];
}
return count;
}else{
int mid = (fromIndex+toIndex)/2;
SumTask left = new SumTask(fromIndex,mid,src);
SumTask right = new SumTask(mid+1,toIndex,src);
invokeAll(left,right);
return left.join()+right.join();
}
}
}
- 二、使用ForkJoinPool進(jìn)行執(zhí)行
task要通過(guò)ForkJoinPool來(lái)執(zhí)行,分割的子任務(wù)也會(huì)添加到當(dāng)前工作線程的雙端隊(duì)列中,
進(jìn)入隊(duì)列的頭部。當(dāng)一個(gè)工作線程中沒(méi)有任務(wù)時(shí),會(huì)從其他工作線程的隊(duì)列尾部獲取一個(gè)任務(wù)(工作竊取)。
public static void main(String[] args) {
int[] array = MakeArray.createIntArray();
ForkJoinPool forkJoinPool= new ForkJoinPool();
SumTask sumTask = new SumTask(0,array.length-1,array);
long start = System.currentTimeMillis();
forkJoinPool.invoke(sumTask);
System.out.println("The count is "+sumTask.join()
+" spend time:"+(System.currentTimeMillis()-start)+"ms");
}
三、RecursiveTask和RecursiveAction區(qū)別
- RecursiveTask
通過(guò)源碼的查看我們可以發(fā)現(xiàn)RecursiveTask在進(jìn)行exec之后會(huì)使用一個(gè)result的變量進(jìn)行接受返回的結(jié)果。而result返回結(jié)果類型是通過(guò)泛型進(jìn)行傳入。也就是說(shuō)RecursiveTask執(zhí)行后是有返回結(jié)果。
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;
/**
* The result of the computation.
*/
V result;
/**
* The main computation performed by this task.
* @return the result of the computation
*/
protected abstract V compute();
public final V getRawResult() {
return result;
}
protected final void setRawResult(V value) {
result = value;
}
/**
* Implements execution conventions for RecursiveTask.
*/
protected final boolean exec() {
result = compute();
return true;
}
}
- RecursiveAction
RecursiveAction在exec后是不會(huì)保存返回結(jié)果,因此RecursiveAction與RecursiveTask區(qū)別在與RecursiveTask是有返回結(jié)果而RecursiveAction是沒(méi)有返回結(jié)果。
public abstract class RecursiveAction extends ForkJoinTask<Void> {
private static final long serialVersionUID = 5232453952276485070L;
/**
* The main computation performed by this task.
*/
protected abstract void compute();
/**
* Always returns {@code null}.
*
* @return {@code null} always
*/
public final Void getRawResult() { return null; }
/**
* Requires null completion value.
*/
protected final void setRawResult(Void mustBeNull) { }
/**
* Implements execution conventions for RecursiveActions.
*/
protected final boolean exec() {
compute();
return true;
}
}
ForkJoinTask是RecursiveAction與RecursiveTask的父類, ForkJoinTask中使用了模板模式進(jìn)行設(shè)計(jì)
,將ForkJoinTask的執(zhí)行相關(guān)的代碼進(jìn)行隱藏,通過(guò)提供抽象類暴露用戶的實(shí)際業(yè)務(wù)處理。
三、ForJoin注意點(diǎn)
使用ForkJoin將相同的計(jì)算任務(wù)通過(guò)多線程的進(jìn)行執(zhí)行。從而能提高數(shù)據(jù)的計(jì)算速度。在google的中的大數(shù)據(jù)處理框架mapreduce就通過(guò)類似ForkJoin的思想。通過(guò)多線程提高大數(shù)據(jù)的處理。但是我們需要注意:
- 使用這種多線程帶來(lái)的數(shù)據(jù)共享問(wèn)題,在處理結(jié)果的合并的時(shí)候如果涉及到數(shù)據(jù)共享的問(wèn)題,我們盡可能使用JDK為我們提供的并發(fā)容器。
- 在使用JVM的時(shí)候我們要考慮OOM的問(wèn)題,如果我們的任務(wù)處理時(shí)間非常耗時(shí),并且處理的數(shù)據(jù)非常大的時(shí)候。會(huì)造成OOM。
- ForkJoin也是通過(guò)多線程的方式進(jìn)行處理任務(wù)。那么我們不得不考慮是否應(yīng)該使用ForkJoin。因?yàn)楫?dāng)數(shù)據(jù)量不是特別大的時(shí)候,我們沒(méi)有必要使用ForkJoin。因?yàn)槎嗑€程會(huì)涉及到上下文的切換。所以數(shù)據(jù)量不大的時(shí)候使用串行比使用多線程快。
四、ForkJoin工作竊?。╳ork-stealing)
為什么ForkJoin會(huì)存在工作竊取呢?因?yàn)槲覀儗⑷蝿?wù)進(jìn)行分解成多個(gè)子任務(wù)的時(shí)候。每個(gè)子任務(wù)的處理時(shí)間都不一樣。例如分別有子任務(wù)A\B。如果子任務(wù)A的1ms的時(shí)候已經(jīng)執(zhí)行,子任務(wù)B還在執(zhí)行。那么如果我們子任務(wù)A的線程等待子任務(wù)B完畢后在進(jìn)行匯總,那么子任務(wù)A線程就會(huì)在浪費(fèi)執(zhí)行時(shí)間,最終的執(zhí)行時(shí)間就以最耗時(shí)的子任務(wù)為準(zhǔn)。而如果我們的子任務(wù)A執(zhí)行完畢后,處理子任務(wù)B的任務(wù),并且執(zhí)行完畢后將任務(wù)歸還給子任務(wù)B。這樣就可以提高執(zhí)行效率。而這種就是工作竊取。
五、ForkJoin排序
public class SortForkJoin {
/**
* 數(shù)組排序
*
* @param arry
* @return
*/
public static int[] sort(int[] arry) {
if (arry.length == 0) return arry;
for (int index = 0; index < arry.length - 1; index++) {
int pre_index = index;
int currentValue = arry[index + 1];
while (pre_index >= 0 && arry[pre_index] > currentValue) {
arry[pre_index + 1] = arry[pre_index];
pre_index--;
}
arry[pre_index + 1] = currentValue;
}
return arry;
}
/**
* 組合
*
* @param left
* @param right
* @return
*/
public static int[] merge(int[] left, int[] right) {
int[] result = new int[left.length + right.length];
for (int resultIndex = 0, leftIndex = 0, rightIndex = 0; resultIndex < result.length; resultIndex++) {
if (leftIndex >= left.length) {
result[resultIndex] = right[rightIndex++];
} else if (rightIndex >= right.length) {
result[resultIndex] = left[leftIndex++];
} else if (left[leftIndex] > right[rightIndex]) {
result[resultIndex] = right[rightIndex++];
} else {
result[resultIndex] = left[leftIndex++];
}
}
return result;
}
static class SortTask extends RecursiveTask<int[]> {
private int threshold;
private int start;
private int end;
private int segmentation ;
private int[] src;
public SortTask(int[] src,int start,int end,int segmentation){
this.src = src;
this.start = start;
this.end = end;
this.threshold = src.length/segmentation;
this.segmentation = segmentation;
}
@Override
protected int[] compute() {
if((end - start) <threshold){
int mid = (end-start)/2;
SortTask leftTask = new SortTask(src,start,mid,segmentation);
SortTask rightTask = new SortTask(src,mid+1,end,segmentation);
invokeAll(leftTask,rightTask);
return SortForkJoin.merge(leftTask.join(),rightTask.join());
}else{
return SortForkJoin.sort(src);
}
}
}
@Test
public void test() {
int[] array = MakeArray.createIntArray();
ForkJoinPool forkJoinPool= new ForkJoinPool();
SortTask sortTask =new SortTask(array,0,array.length-1,1000);
long start = System.currentTimeMillis();
forkJoinPool.execute(sortTask);
System.out.println(
" spend time:"+(System.currentTimeMillis()-start)+"ms");
}
}