最近有個業(yè)務(wù)是批量導(dǎo)出cognos報表, 由于未開發(fā)此功能, 人工導(dǎo)出需要大量的時間消耗, 奔著珍惜時間的使命寫了一個導(dǎo)出工具類, 至此在導(dǎo)出的過程中用到了并發(fā)請求數(shù), 比如:每秒并發(fā)10次,20次等. 工作中使用的Java8并發(fā)語法, 在此之前先介紹一下Java7 Fork/Join的框架使用方式.
之前使用的此框架是一個查詢SQL時, 當(dāng)時一個SQL有28個子語句通過left join 拼接而成, 查詢速度為20s,經(jīng)??ㄋ? 最后寫成并發(fā),28個子語句,分成3批次,每次10個SQL,并行查詢,最后通過Java算法拼接成List,從20S變?yōu)?.4S左右,性能大大提升.
下面就開始今天的內(nèi)容:
簡介
從JDK1.7開始,Java提供Fork/Join框架用于并行執(zhí)行任務(wù),它的思想就是講一個大任務(wù)分割成若干小任務(wù),最終匯總每個小任務(wù)的結(jié)果得到這個大任務(wù)的結(jié)果。
這種思想和MapReduce很像(input --> split --> map --> reduce --> output)
主要有兩步:
第一、任務(wù)切分;
第二、結(jié)果合并
剛剛我介紹的SQL其實就是這樣的原理.
API 介紹
ForkJoinPool 池子
ForkJoinPool的優(yōu)勢在于,可以充分利用多cpu,多核cpu的優(yōu)勢,把一個任務(wù)拆分成多個“小任務(wù)”,把多個“小任務(wù)”放到多個處理器核心上并行執(zhí)行;當(dāng)多個“小任務(wù)”執(zhí)行完成之后,再將這些執(zhí)行結(jié)果合并起來即可。
ForkJoinTask 任務(wù)
ForkJoinTask代表運行在ForkJoinPool中的任務(wù)。
主要方法:
fork() 在當(dāng)前線程運行的線程池中安排一個異步執(zhí)行。簡單的理解就是再創(chuàng)建一個子任務(wù)。
join() 當(dāng)任務(wù)完成的時候返回計算結(jié)果。
invoke() 開始執(zhí)行任務(wù),如果必要,等待計算完成。
子類:
RecursiveAction 一個遞歸無結(jié)果的ForkJoinTask(沒有返回值)
RecursiveTask 一個遞歸有結(jié)果的ForkJoinTask(有返回值)
例子
private static final ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(100),
new ThreadFactoryBuilder().setNameFormat("Reports-%d").setDaemon(true).build(),
new ThreadPoolExecutor.AbortPolicy());
這里先創(chuàng)建了一個多線程任務(wù),意思為:
這里核心線程數(shù)5
最大線程數(shù)5
blockingQueue 最大size 100, 解釋: workQueue:一個阻塞隊列,用來存儲等待執(zhí)行的任務(wù),這個參數(shù)的選擇也很重要,會對線程池的運行過程產(chǎn)生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇: ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue; ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關(guān)。
reject策略 java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy,意思是由調(diào)用線程處理該任務(wù)
另外的策略
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
方法調(diào)用,模擬一下并發(fā)執(zhí)行SQL拼接表的過程
public Page<ReportVo> getRepairTaskReport() {
List<String>ids = Lists.newArrayList("1","2","3","4","5","6","7","8","9");
//多線程查詢列數(shù)據(jù)
Set<String> setIds = new HashSet<>();
for (String id : ids) {
setIds.add(id);
}
//整理要查詢的列,以后可以做成由前端指定查詢哪些column
List<T> columns = new ArrayList<>();
columns.add("ID1相關(guān)");
columns.add("ID2相關(guān)");
...
columns.add("ID9相關(guān)");
Map<T, Future<List<Object[]>>> futureMap = new HashMap<>();
Map<T, Callable<List<Object[]>>> columnCallableMap = getColumnCallableMap(appId, staff, columns, params, queryConditionVo, departIdSet);
for (Map.Entry<T, Callable<List<Object[]>>> entry : columnCallableMap.entrySet()) {
futureMap.put(entry.getKey(), executorService.submit(entry.getValue()));
}
//合并報表
for (RepairTaskReportColumn column : columns) {
try {
//列數(shù)據(jù)
List<Object[]> columnDataList = futureMap.get(column).get();
for (Object[] objects : columnDataList) {
String departId = objects[0].toString();
String columnData = objects[1].toString();
//匹配行數(shù)據(jù)
for (String reportVo : ids) {
if (departId.equals(reportVo.getDepartId())) {
switch (column) {
case "ID1相關(guān)":
// 并行返回ID1相關(guān)數(shù)據(jù)
break;
case "ID2相關(guān)":
// 并行返回ID2相關(guān)數(shù)據(jù)
break;
...
//
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
return "最終結(jié)果";
}
/**
* 根據(jù)請求的column數(shù)據(jù),生成Callable
*
* @param columns
* @param queryConditionVo
* @return
*/
private Map<T, Callable<List<Object[]>>> getColumnCallableMap(String ID, List<T> columns, Set<String> departIdSet) {
Map<T, Callable<List<Object[]>>> columnMap = new HashMap<>();
for (T column : columns) {
switch (column) {
case "ID1相關(guān)SQL查詢":
columnMap.put("ID1相關(guān)", getSQL(ID,T,departIdSet));
break;
case "ID2相關(guān)SQL查詢":
columnMap.put("ID2相關(guān)", getSQL(ID,T,departIdSet));
break;
....
default:
break;
}
}
return columnMap;
}
private Callable<List<Object[]>> getSQL(final String ID, final T column, final Set<String> departIdSet) {
return new Callable<List<Object[]>>() {
@Override
public List<Object[]> call() throws Exception {
//todo sql query
//返回格式:object[0]為departId,object[1]為需要的數(shù)據(jù)
String sql = "select * from table xxxxx";
List<Object[]> result = dao.getResult(sql);
return result;
}
};
}
以上代碼為偽代碼,實現(xiàn)的邏輯其實很簡單.
大致邏輯如下
我有一條SQL,為N個left join 拼接而成,那么我現(xiàn)在就是吧N個left 拆分成N個小SQL,并發(fā)執(zhí)行,那么執(zhí)行時間縮短為N倍, 然后通過N個SQL查詢出的結(jié)果,通過相同的屬性 再次拼接成業(yè)務(wù)正確的數(shù)據(jù)
就是這樣的一個圖:


哈哈, 大致就是這樣,通過并發(fā)執(zhí)行任務(wù),人工點擊的8八小時縮短為10分鐘! 是不是很秀呢
下次講解JDK8中并發(fā)執(zhí)行的例子,更為簡潔
歡迎小伙伴們留言哦