線程池
類繼承關(guān)系,方法太多就不列出來了。

Executors
Executors扮演線程池工廠的角色,ThreadPoolExecutor就代表一個(gè)線程池。Executors提供了各種類型的線程池,主要有以下這些方法:
public static ExecutorService newFixedThreadPool(int nThreads);
public static ExecutorService newSingleThreadExecutor();
public static ExecutorService newCachedThreadPool();
public static ScheduledExecutorService newSingleThreadScheduledExecutor();
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
newFixedThreadPool(int nThreads)
返回固定線程數(shù)量的線程池。該線程池中的線程數(shù)量始終不變,當(dāng)有新任務(wù)提交時(shí),若有空閑線程,則立即執(zhí)行,否則存進(jìn)任務(wù)隊(duì)列,待有線程空閑時(shí)再執(zhí)行。
newSingleThreadExecutor()
返回一個(gè)只有一個(gè)線程的線程池,若多余一個(gè)任務(wù)被提交到該線程池,保存到任務(wù)隊(duì)列,按FIFO的順序執(zhí)行。
newCachedThreadPool()
返回一個(gè)可根據(jù)實(shí)際情況調(diào)整線程數(shù)量的線程池,線程池線程數(shù)量不確定,如果新任務(wù)沒有空閑線程,則會(huì)創(chuàng)建新的線程處理任務(wù),所有線程執(zhí)行完之后,會(huì)丟回線程池復(fù)用。
newSingleThreadScheduledExecutor()
返回一個(gè)ScheduledExecutorService對(duì)象,線程池大小為1。具有scheduled調(diào)度功能,如在入定的延遲或者周期性的執(zhí)行某個(gè)任務(wù)。
newScheduledThreadPool(int corePoolSize)
返回一個(gè)ScheduledExecutorService對(duì)象,線程數(shù)量可以指定。
ScheduledExecutorService
ScheduledExecutorService的scheduleAtFiexRate()和scheduleWithFixedDelay()都可以周期性的調(diào)度任務(wù),如果任務(wù)遇到異常,會(huì)停止調(diào)度,他們的調(diào)度區(qū)別如下:

對(duì)于fixRate而言,如果任務(wù)執(zhí)行的時(shí)間大于等待的周期,那么會(huì)在任務(wù)執(zhí)行完之后馬上再次調(diào)度,不會(huì)出現(xiàn)任務(wù)重疊的情況!
public static void main(String[] args) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(10);
Runnable runnable = () -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(System.currentTimeMillis() / 1000);
};
//兩秒輸出一次,調(diào)度周期為2秒
//exec.scheduleAtFixedRate(runnable,0,2, TimeUnit.SECONDS);
//三秒輸出一次,調(diào)度周期為2秒,線程執(zhí)行1秒
exec.scheduleWithFixedDelay(runnable,0,2, TimeUnit.SECONDS);
}
ThreadPoolExecutor
Executors工廠對(duì)與ThreadPoolExecutor的創(chuàng)建都是基于其構(gòu)造方法,來看一下核心的構(gòu)造方法。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
corePoolSize: 指定線程池中的線程數(shù)量;
maxinumPoolSize: 指定線程池中的最大線程數(shù)量;
keepAliveTime: 當(dāng)線程池線程數(shù)量超過corePoolSize時(shí),多余的線程的存活時(shí)間;
unit: keepaliveTime的時(shí)間單位;
workQueue: 任務(wù)隊(duì)列,被提交單尚未被執(zhí)行的任務(wù);
threadFactory: 線程工廠,用于創(chuàng)建線程,一般用默認(rèn)的就行;
handler: 拒絕策略,當(dāng)任務(wù)太多來不及處理時(shí),如何拒絕任務(wù);
workqueue
workQueue是一個(gè)BlockingQueue接口的對(duì)象,在ThreadPoolExecutor的構(gòu)造函數(shù)中可使用一下幾種:
直接提交的隊(duì)列: 該功能由SynchronousQueue對(duì)象提供。SynchronousQueue是一種特殊的BlockingQueue,沒有容量,每一次插入操作都要等待一個(gè)相應(yīng)的刪除操作,同理每一次刪除操作都要等待相應(yīng)插入操作。如果使用SynchronousQueue,提交的任務(wù)不會(huì)被真實(shí)的保存,而總是將新任務(wù)提交給線程執(zhí)行。如果沒有空閑的線程,則嘗試創(chuàng)建,如果線程數(shù)量已達(dá)最大值,則執(zhí)行拒絕策略,因此使用SynchronousQueue,通常要設(shè)置很大的maxinumPoolsize值,否則很容易執(zhí)行拒絕策略。
有界任務(wù)隊(duì)列(FIFO): 游街任務(wù)隊(duì)列可以用ArrayBlockQueue實(shí)現(xiàn),看到array很容易知道,這是個(gè)定值的,因此我們使用這個(gè)隊(duì)列的時(shí)候需要指定最大容量,有了最大容量,才有界。有新的任務(wù)執(zhí)行時(shí),如果線程池的實(shí)際線程數(shù)小于corePoolSize,則會(huì)優(yōu)先創(chuàng)建新的線程,若大于corePoolSize,則添加到等待隊(duì)列。如果隊(duì)列已滿,則在總線程數(shù)不大于maxinumPoolSize的前提下創(chuàng)建新的線程執(zhí)行任務(wù)。若大于maxinumPoolSize,則執(zhí)行拒絕策略。可見有界隊(duì)列僅在任務(wù)隊(duì)列滿時(shí),才可能將線程數(shù)提升到corePoolSize以上,除非系統(tǒng)非常繁忙,否則可以確核心線程數(shù)維持在corPoolSize
無界任務(wù)隊(duì)列(FIFO): 無界任務(wù)隊(duì)列可以通過LinckedBlockingQueue類實(shí)現(xiàn)。與有界隊(duì)列相比,除非系統(tǒng)資源耗盡,否則無界的隊(duì)列村存在任務(wù)如對(duì)失敗的情況。
優(yōu)先任務(wù)隊(duì)列: 優(yōu)先任務(wù)隊(duì)列是帶有優(yōu)先級(jí)的隊(duì)列,它通過PriorityBlockingQueue實(shí)現(xiàn),可以指定任務(wù)執(zhí)行的先后順序。
看看ThreadPoolExecutor線程池的核心調(diào)度代碼:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
可以直接從代碼和注釋中看出執(zhí)行邏輯。
workerCountOf獲取當(dāng)前線程池的線程總數(shù)。當(dāng)總數(shù)小于corePoolsize時(shí),會(huì)將任務(wù)通過addWorker直接將任務(wù)提交給線程池的方式直接調(diào)度執(zhí)行。否則通過workerQueue.offer進(jìn)入等待隊(duì)列,如果進(jìn)入等待隊(duì)列失敗,則再次嘗試(可能在之前的檢查中又有線程退出了,存在空閑線程)將任務(wù)直接提交給線程池,如果當(dāng)前線程數(shù)已經(jīng)達(dá)到maxinumPoolSize,則執(zhí)行拒絕策略。

拒絕策略
當(dāng)任務(wù)數(shù)量超過系統(tǒng)實(shí)際承載能力時(shí),我們需要根據(jù)某種策略來拒絕新來的任務(wù)。JDK提供了四種拒絕策略:
AbortPolicy: 直接拋出異常,阻止系統(tǒng)正常工作;
CallerRunsPolicy: 只要線程池未關(guān)閉,該策略直接在調(diào)用者線程中,運(yùn)行當(dāng)前被丟棄的任務(wù)。顯然這樣不會(huì)真的丟棄任務(wù),但是任務(wù)提交線程的性能極有可能會(huì)急劇下降;
DiscardOldestPolicy: 丟棄最老的一個(gè)請(qǐng)求,也就是即將被執(zhí)行的一個(gè)任務(wù)(FIFO),并嘗試再次提交當(dāng)前任務(wù)。
DiscardPolicy: 默認(rèn)丟棄無法處理的任務(wù),不予任何處理。如果允許任務(wù)丟失,這可能是很好的策略。
public static void main(String[] args) throws InterruptedException {
Runnable task = () -> {
try {
Thread.sleep(1000);
System.out.println("thread-" + Thread.currentThread().getId() + ":" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
};
RejectedExecutionHandler reject = (r, executor) -> System.out.println(r.toString() + ": discard");
ExecutorService exec = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(), reject);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
exec.execute(task);
Thread.sleep(10);
}
}
從上面的demo中,程序很快就會(huì)執(zhí)行拒絕策略,可以自行測(cè)試。自定義的拒絕策略,什么事都沒有做,只是輸出了一段xx:discard的信息。
ThreadFactory
該接口只有一個(gè)方法,用來創(chuàng)建線程。當(dāng)線程池需要新建線程的時(shí)候,就會(huì)調(diào)用這個(gè)方法。
Thread newThread(Runnable r);
自定義線程可以幫助我們實(shí)現(xiàn)很多功能,例如可以跟中線程池在核實(shí)創(chuàng)建了多少線程,也可以自定義線程的名稱,組,以及優(yōu)先級(jí)等信息。
public static void main(String[] args) throws InterruptedException {
Runnable task = () -> {
try {
Thread.sleep(1000);
System.out.println("thread-" + Thread.currentThread().getId() + ":" + System.currentTimeMillis());
} catch (InterruptedException e) {
e.printStackTrace();
}
};
ThreadFactory fac = (runnable) -> {
Thread t = new Thread(runnable);
//打印出創(chuàng)建線程信息,設(shè)置成守護(hù)線程,當(dāng)主線程退出時(shí)強(qiáng)制銷毀
System.out.println("create Thread...");
t.setDaemon(true);
return t;
};
ExecutorService exec = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), fac);
for (int i = 0; i < 10; i++) {
exec.execute(task);
}
Thread.sleep(2000);
}
線程池ThreadPoolExecutor的擴(kuò)展
ThreadPoolExecutor是一個(gè)可以擴(kuò)展的線程池。它提供了beforeExecute()、afterExecute()和terminated()三個(gè)接口對(duì)線程池進(jìn)行控制。這三個(gè)方法的調(diào)用在ThreadPoolExecutor.Worker.runWorker()方法中。
final void runWorker(Worker w) {
...
try {
while (task != null || (task = getTask()) != null) {
...
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
}
...
finally {
afterExecute(task, thrown);
}
}
...
} finally {
//對(duì)terminated()的調(diào)用是從這里進(jìn)入的
processWorkerExit(w, completedAbruptly);
}
}
線程池的擴(kuò)展demo
public static class MyTask implements Runnable {
String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("正在執(zhí)行:ThreadID = " + Thread.currentThread().getId() + ": Task name = " + name);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5)) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("準(zhǔn)備執(zhí)行:ThreadID = " + Thread.currentThread().getId() + ": Task name = " + ((MyTask) r).name);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("執(zhí)行完成:ThreadID = " + Thread.currentThread().getId() + ": Task name = " + ((MyTask) r).name);
}
@Override
protected void terminated() {
System.out.println("線程池退出");
}
};
for (int i = 0; i < 5; i++) {
exec.execute(new MyTask("MyTask-" + i));
Thread.sleep(10);
}
exec.shutdown();
}
任務(wù)提交完成后,調(diào)用shutdown關(guān)閉線程池。這是個(gè)比較安全的方法,如果當(dāng)前有線程正在執(zhí)行,該方法并不會(huì)暴力的終止所有任務(wù),它會(huì)等待所有任務(wù)執(zhí)行完成后再關(guān)閉線程池,但它并不會(huì)等待所有線程執(zhí)行完成后再返回。因此可以簡單地理解成只是發(fā)送了一個(gè)關(guān)閉信號(hào)而已。再執(zhí)行shutdown方法后,這個(gè)線程池就不能在接受其他新的任務(wù)了。
ForkJoinPool
采用“分而治之”的思想,將大的任務(wù)分解成小的任務(wù),分階段處理,再將這些結(jié)果進(jìn)行合成。在JDK中,給出了一個(gè)ForkJoinPool線程池,對(duì)于fork()方法并不急著開啟線程,而是提交給ForkJoinPool線程池進(jìn)行處理。絕大多數(shù)情況下一個(gè)線程是需要處理多個(gè)邏輯任務(wù)的,因此每個(gè)線程必然擁有一個(gè)任務(wù)隊(duì)列。當(dāng)線程A已經(jīng)把自己的任務(wù)執(zhí)行完成之后,而線程B還有一大堆任務(wù)等著處理,此時(shí)線程A就會(huì)幫助線程B,從線程B的任務(wù)隊(duì)列中拿任務(wù)過來處理。當(dāng)線程試圖幫助別人時(shí),總是從任務(wù)隊(duì)列的底部開始拿數(shù)據(jù),而線程執(zhí)行自己的任務(wù)是則是從頂部開始拿,這種行為有利于避免數(shù)據(jù)競(jìng)爭。使用fork()后系統(tǒng)多了一個(gè)執(zhí)行分支(線程),所以需要等到這個(gè)分支執(zhí)行完畢,才有可能得到最終結(jié)果,因此join()就表示等待。這個(gè)join()和普通線程的join()不一樣哦。


// ForkJoin求和demo
public class ForkJoinCountTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 10000;
private long start;
private long end;
public ForkJoinCountTask(long start, long end) {
this.start = start;
this.end = end;
}
protected Long compute() {
long sum = 0;
// 如果計(jì)算的個(gè)數(shù)沒有超過閾值,直接開始計(jì)算
if ((end - start) <= THRESHOLD) {
for (long i = start; i <= end; i++) {
sum += i;
}
} else {
// 超過了閾值,則分成100個(gè)小任務(wù)
long forkCount = (end - start) / THRESHOLD;
if (THRESHOLD * forkCount < end) {
forkCount++;
}
ArrayList<ForkJoinCountTask> tasks = new ArrayList<ForkJoinCountTask>();
long pos = start;
for (int i = 0; i < forkCount; i++) {
long lastOne = pos + THRESHOLD;
lastOne = lastOne > end ? end : lastOne;
ForkJoinCountTask task = new ForkJoinCountTask(pos, lastOne);
pos += THRESHOLD + 1;
tasks.add(task);
task.fork();
}
// 對(duì)小任務(wù)的結(jié)果進(jìn)行求和
for (ForkJoinCountTask t : tasks) {
sum += t.join();
}
}
// 返回部分結(jié)果
return sum;
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
ForkJoinCountTask t = new ForkJoinCountTask(0, 20000);
ForkJoinTask<Long> fjt = pool.submit(t);
try {
long res = fjt.get();
System.out.println(res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
over ...