
前言
多線程的軟件設(shè)計方法確實可以最大限度的發(fā)揮現(xiàn)代多核處理器的計算能力,提高生產(chǎn)系統(tǒng)的吞吐量和性能。但是,如果一個系統(tǒng)同時創(chuàng)建大量線程,線程間頻繁的切換上下文導(dǎo)致的系統(tǒng)開銷將會拖慢整個系統(tǒng)。嚴(yán)重的甚至導(dǎo)致內(nèi)存耗盡導(dǎo)致OOM異常。因此,在實際的生產(chǎn)環(huán)境中,線程的數(shù)量必須得到控制,盲目的創(chuàng)建大量新車對系統(tǒng)是有傷害的。
那么,怎么才能最大限度的利用CPU的性能,又能保持系統(tǒng)的穩(wěn)定性呢?其中有一個方法就是使用線程池。
簡而言之,在使用線程池后,創(chuàng)建線程便處理從線程池獲得空閑線程,關(guān)閉線程變成了向池子歸還線程。也就是說,提高了線程的復(fù)用。
而 JDK 在 1.5 之后為我提供了現(xiàn)成的線程池工具,我們今天就來學(xué)習(xí)看看如何使用他們。
- Executors 線程池工廠能創(chuàng)建哪些線程池
- 如何手動創(chuàng)建線程池
- 如何擴(kuò)展線程池
- 如何優(yōu)化線程池的異常信息
- 如何設(shè)計線程池中的線程數(shù)量
1. Executors 線程池工廠能創(chuàng)建哪些線程池
先來一個最簡單的線程池使用例子:
static class MyTask implements Runnable {
@Override
public void run() {
System.out
.println(System.currentTimeMillis() + ": Thread ID :" + Thread.currentThread().getId());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
MyTask myTask = new MyTask();
ExecutorService service1 = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
service1.submit(myTask);
}
service1.shutdown();
}
運(yùn)行結(jié)果:

我們創(chuàng)建了一個線程池實例,并設(shè)置默認(rèn)線程數(shù)量為5,并向線程池提交了10任務(wù),分別打印當(dāng)前毫秒時間和線程ID,從結(jié)果中,我們可以看到結(jié)果中有5個相同 id 的線程打印了毫秒時間。
這是最簡單的例子。
接下來我們講講其他的線程創(chuàng)建方式。
1. 固定線程池
ExecutorService service1 = Executors.newFixedThreadPool(5);
該方法返回一個固定線程數(shù)量的線程池。該線程池中的線程數(shù)量始終不變。當(dāng)有一個新的任務(wù)提交時,線程池中若有空閑線程,則立即執(zhí)行,若沒有,則新的任務(wù)會被暫存在一個任務(wù)隊列(默認(rèn)無界隊列 int 最大數(shù))中,待有線程空閑時,便處理在任務(wù)隊列中的任務(wù)。
2. 單例線程池
ExecutorService service3 = Executors.newSingleThreadExecutor();
該方法返回一個只有一個線程的線程池。若多余一個任務(wù)被提交到該線程池,任務(wù)會被保存在一個任務(wù)隊列(默認(rèn)無界隊列 int 最大數(shù))中,待線程空閑,按先入先出的順序執(zhí)行隊列中的任務(wù)。
3. 緩存線程池
ExecutorService service2 = Executors.newCachedThreadPool();
該方法返回一個可根據(jù)實際情況調(diào)整線程數(shù)量的線程池,線程池的線程數(shù)量不確定,但若有空閑線程可以復(fù)用,則會優(yōu)先使用可復(fù)用的線程,所有線程均在工作,如果有新的任務(wù)提交,則會創(chuàng)建新的線程處理任務(wù)。所有線程在當(dāng)前任務(wù)執(zhí)行完畢后,將返回線程池進(jìn)行復(fù)用。
4. 任務(wù)調(diào)用線程池
ExecutorService service4 = Executors.newScheduledThreadPool(2);
該方法也返回一個 ScheduledThreadPoolExecutor 對象,該線程池可以指定線程數(shù)量。
前3個線程的用法沒什么差異,關(guān)鍵是第四個,雖然線程任務(wù)調(diào)度框架很多,但是我們?nèi)匀豢梢詫W(xué)習(xí)該線程池。如何使用呢?下面來個例子:
class A {
public static void main(String[] args) {
ScheduledThreadPoolExecutor service4 = (ScheduledThreadPoolExecutor) Executors
.newScheduledThreadPool(2);
// 如果前面的任務(wù)沒有完成,則調(diào)度也不會啟動
service4.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
// 如果任務(wù)執(zhí)行時間大于間隔時間,那么就以執(zhí)行時間為準(zhǔn)(防止任務(wù)出現(xiàn)堆疊)。
Thread.sleep(10000);
System.out.println(System.currentTimeMillis() / 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}// initialDelay(初始延遲) 表示第一次延時時間 ; period 表示間隔時間
}, 0, 2, TimeUnit.SECONDS);
service4.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
System.out.println(System.currentTimeMillis() / 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}// initialDelay(初始延遲) 表示延時時間;delay + 任務(wù)執(zhí)行時間 = 等于間隔時間 period
}, 0, 2, TimeUnit.SECONDS);
// 在給定時間,對任務(wù)進(jìn)行一次調(diào)度
service4.schedule(new Runnable() {
@Override
public void run() {
System.out.println("5 秒之后執(zhí)行 schedule");
}
}, 5, TimeUnit.SECONDS);
}
}
}
上面的代碼創(chuàng)建了一個 ScheduledThreadPoolExecutor 任務(wù)調(diào)度線程池,分別調(diào)用了3個方法,需要著重解釋 scheduleAtFixedRate 和 scheduleWithFixedDelay 方法,這兩個方法的作用很相似,唯一的區(qū)別就是他們執(zhí)行人物的間隔時間的計算方式,前者時間間隔算法是根據(jù)指定的 period 時間和任務(wù)執(zhí)行時間中取時間長的,后者取的是指定的 delay 時間 + 任務(wù)執(zhí)行時間。如果同學(xué)們有興趣,可以將上面的代碼跑跑看。一樣便能看出端倪。
好了,JDK 給我們封裝了創(chuàng)建線程池的 4 個方法,但是,請注意,由于這些方法高度封裝,因此,如果使用不當(dāng),出了問題將無從排查,因此,我建議,程序員應(yīng)到自己手動創(chuàng)建線程池,而手動創(chuàng)建的前提就是高度了解線程池的參數(shù)設(shè)置。那么我們就來看看如何手動創(chuàng)建線程池。
2. 如何手動創(chuàng)建線程池
下面是一個手動創(chuàng)建線程池的范本:
/**
* 默認(rèn)5條線程(默認(rèn)數(shù)量,即最少數(shù)量),
* 最大20線程(指定了線程池中的最大線程數(shù)量),
* 空閑時間0秒(當(dāng)線程池梳理超過核心數(shù)量時,多余的空閑時間的存活時間,即超過核心線程數(shù)量的空閑線程,在多長時間內(nèi),會被銷毀),
* 等待隊列長度1024,
* 線程名稱[MXR-Task-%d],方便回溯,
* 拒絕策略:當(dāng)任務(wù)隊列已滿,拋出RejectedExecutionException
* 異常。
*/
private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 20, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024)
, new ThreadFactoryBuilder().setNameFormat("My-Task-%d").build()
, new AbortPolicy()
);
我們看到,ThreadPoolExecutor 也就是線程池有 7 個參數(shù),我們一起來好好看看:
- corePoolSize 線程池中核心線程數(shù)量
- maximumPoolSize 最大線程數(shù)量
- keepAliveTime 空閑時間(當(dāng)線程池梳理超過核心數(shù)量時,多余的空閑時間的存活時間,即超過核心線程數(shù)量的空閑線程,在多長時間內(nèi),會被銷毀)
- unit 時間單位
- workQueue 當(dāng)核心線程工作已滿,需要存儲任務(wù)的隊列
- threadFactory 創(chuàng)建線程的工廠
- handler 當(dāng)隊列滿了之后的拒絕策略
前面幾個參數(shù)我們就不講了,很簡單,主要是后面幾個參數(shù),隊列,線程工廠,拒絕策略。
我們先看看隊列,線程池默認(rèn)提供了 4 個隊列。
- 無界隊列: 默認(rèn)大小 int 最大值,因此可能會耗盡系統(tǒng)內(nèi)存,引起OOM,非常危險。
- 直接提交的隊列 : 沒有容量,不會保存,直接創(chuàng)建新的線程,因此需要設(shè)置很大的線程池數(shù)。否則容易執(zhí)行拒絕策略,也很危險。
- 有界隊列:如果core滿了,則存儲在隊列中,如果core滿了且隊列滿了,則創(chuàng)建線程,直到maximumPoolSize 到了,如果隊列滿了且最大線程數(shù)已經(jīng)到了,則執(zhí)行拒絕策略。
- 優(yōu)先級隊列:按照優(yōu)先級執(zhí)行任務(wù)。也可以設(shè)置大小。
樓主在自己的項目中使用了無界隊列,但是設(shè)置了任務(wù)大小,1024。如果你的任務(wù)很多,建議分為多個線程池。不要把雞蛋放在一個籃子里。
再看看拒絕策略,什么是拒絕策略呢?當(dāng)隊列滿了,如何處理那些仍然提交的任務(wù)。JDK 默認(rèn)有4種策略。
- AbortPolicy :直接拋出異常,阻止系統(tǒng)正常工作.
- CallerRunsPolicy : 只要線程池未關(guān)閉,該策略直接在調(diào)用者線程中,運(yùn)行當(dāng)前被丟棄的任務(wù)。顯然這樣做不會真的丟棄任務(wù),但是,任務(wù)提交線程的性能極有可能會急劇下降。
- DiscardOldestPolicy: 該策略將丟棄最老的一個請求,也就是即將被執(zhí)行的一個任務(wù),并嘗試再次提交當(dāng)前任務(wù).
- DiscardPolicy: 該策略默默地丟棄無法處理的任務(wù),不予任何處理,如果允許任務(wù)丟失,我覺得這是最好的方案.
當(dāng)然,如果你不滿意JDK提供的拒絕策略,可以自己實現(xiàn),只需要實現(xiàn) RejectedExecutionHandler 接口,并重寫 rejectedExecution 方法即可。
最后,線程工廠,線程池的所有線程都由線程工廠來創(chuàng)建,而默認(rèn)的線程工廠太過單一,我們看看默認(rèn)的線程工廠是如何創(chuàng)建線程的:
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
可以看到,線程名稱為 pool- + 線程池編號 + -thread- + 線程編號 。設(shè)置為非守護(hù)線程。優(yōu)先級為默認(rèn)。
如果我們想修改名稱呢?對,實現(xiàn) ThreadFactory 接口,重寫 newThread 方法即可。但是已經(jīng)有人造好輪子了, 比如我們的例子中使用的 google 的 guaua 提供的 ThreadFactoryBuilder 工廠??梢宰远x線程名稱,是否守護(hù),優(yōu)先級,異常處理等等,功能強(qiáng)大。
3. 如何擴(kuò)展線程池
那么我們能擴(kuò)展線程池的功能嗎?比如記錄線程任務(wù)的執(zhí)行時間。實際上,JDK 的線程池已經(jīng)為我們預(yù)留的接口,在線程池核心方法中,有2 個方法是空的,就是給我們預(yù)留的。還有一個線程池退出時會調(diào)用的方法。我們看看例子:
/**
* 如何擴(kuò)展線程池,重寫 beforeExecute, afterExecute, terminated 方法,這三個方法默認(rèn)是空的。
*
* 可以監(jiān)控每個線程任務(wù)執(zhí)行的開始和結(jié)束時間,或者自定義一些增強(qiáng)。
*
* 在 Worker 的 runWork 方法中,會調(diào)用這些方法
*/
public class ExtendThreadPoolDemo {
static class MyTask implements Runnable {
String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
System.out
.println("正在執(zhí)行:Thread ID:" + Thread.currentThread().getId() + ", Task Name = " + name);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()) {
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("準(zhǔn)備執(zhí)行:" + ((MyTask) r).name);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("執(zhí)行完成: " + ((MyTask) r).name);
}
@Override
protected void terminated() {
System.out.println("線程池退出");
}
};
for (int i = 0; i < 5; i++) {
MyTask myTask = new MyTask("TASK-GEYM-" + i);
es.execute(myTask);
Thread.sleep(10);
}
es.shutdown();
}
}
我們重寫了 beforeExecute 方法,也就是執(zhí)行任務(wù)之前會調(diào)用該方法,而 afterExecute 方法則是在任務(wù)執(zhí)行完畢后會調(diào)用該方法。還有一個 terminated 方法,在線程池退出時會調(diào)用該方法。執(zhí)行結(jié)果是什么呢?

可以看到,每個任務(wù)執(zhí)行前后都會調(diào)用 before 和 after 方法。相當(dāng)于執(zhí)行了一個切面。而在調(diào)用 shutdown 方法后則會調(diào)用 terminated 方法。
4. 如何優(yōu)化線程池的異常信息
如何優(yōu)化線程池的異常信息? 在說這個問題之前,我們先說一個不容易發(fā)現(xiàn)的bug:
看代碼:
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0L,
TimeUnit.MILLISECONDS, new SynchronousQueue<>());
for (int i = 0; i < 5; i++) {
executor.submit(new DivTask(100, i));
}
}
static class DivTask implements Runnable {
int a, b;
public DivTask(int a, int b) {
this.a = a;
this.b = b;
}
@Override
public void run() {
double re = a / b;
System.out.println(re);
}
}
執(zhí)行結(jié)果:

注意:只有4個結(jié)果,其中一個結(jié)果被吞沒了,并且沒有任何信息。為什么呢?如果仔細(xì)看代碼,會發(fā)現(xiàn),在進(jìn)行 100 / 0 的時候肯定會報錯的,但是卻沒有報錯信息,令人頭痛,為什么呢?實際上,如果你使用 execute 方法則會打印錯誤信息,當(dāng)你使用 submit 方法卻沒有調(diào)用它的get 方法,異常將會被吞沒,因為,如果發(fā)生了異常,異常是作為返回值返回的。
怎么辦呢?我們當(dāng)然可以使用 execute 方法,但是我們可以有另一種方式:重寫 submit 方法,樓主寫了一個例子,大家看一下:
static class TraceThreadPoolExecutor extends ThreadPoolExecutor {
public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public void execute(Runnable command) {
// super.execute(command);
super.execute(wrap(command, clientTrace(), Thread.currentThread().getName()));
}
@Override
public Future<?> submit(Runnable task) {
// return super.submit(task);
return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
}
private Exception clientTrace() {
return new Exception("Client stack trace");
}
private Runnable wrap(final Runnable task, final Exception clientStack,
String clientThreaName) {
return new Runnable() {
@Override
public void run() {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
clientStack.printStackTrace();
throw e;
}
}
};
}
}
我們重寫了 submit 方法,封裝了異常信息,如果發(fā)生了異常,將會打印堆棧信息。我們看看使用重寫后的線程池后的結(jié)果是什么?

從結(jié)果中,我們清楚的看到了錯誤信息的原因:by zero!并且堆棧信息明確,方便排錯。優(yōu)化了默認(rèn)線程池的策略。
5. 如何設(shè)計線程池中的線程數(shù)量
線程池的大小對系統(tǒng)的性能有一定的影響,過大或者過小的線程數(shù)量都無法發(fā)揮最優(yōu)的系統(tǒng)性能,但是線程池大小的確定也不需要做的非常精確。因為只要避免極大和極小兩種情況,線程池的大小對性能的影響都不會影響太大,一般來說,確定線程池的大小需要考慮CPU數(shù)量,內(nèi)存大小等因素,在《Java Concurrency in Practice》 書中給出了一個估算線程池大小的經(jīng)驗公式:

公式還是有點(diǎn)復(fù)雜的,簡單來說,就是如果你是CPU密集型運(yùn)算,那么線程數(shù)量和CPU核心數(shù)相同就好,避免了大量無用的切換線程上下文,如果你是IO密集型的話,需要大量等待,那么線程數(shù)可以設(shè)置的多一些,比如CPU核心乘以2.
至于如何獲取 CPU 核心數(shù),Java 提供了一個方法:
Runtime.getRuntime().availableProcessors();
返回了CPU的核心數(shù)量。
總結(jié)
好了,到這里,我們已經(jīng)對如何使用線程池有了一個認(rèn)識,這里,樓主建議大家手動創(chuàng)建線程池,這樣對線程池中的各個參數(shù)可以有精準(zhǔn)的了解,在對系統(tǒng)進(jìn)行排錯或者調(diào)優(yōu)的時候有好處。比如設(shè)置核心線程數(shù)多少合適,最大線程數(shù),拒絕策略,線程工廠,隊列的大小和類型等等,也可以是G家的線程工廠自定義線程。
下一篇,我們將深入源碼,看看JDK 的線程池是如何實現(xiàn)的。因此,先熟悉線程池的使用吧!?。?/p>
good luck?。?!