為什么要使用線程池
- 反復(fù)創(chuàng)建線程開銷大
- 過多的線程會(huì)占用太多內(nèi)存
線程池的好處
- 加快響應(yīng)速度
- 合理利用CPU和內(nèi)存
- 統(tǒng)一管理
線程池適合應(yīng)用的場合
- 服務(wù)器接受到大量請(qǐng)求時(shí),使用線程池技術(shù)是非常合適的,它可以大大減少線程的創(chuàng)建和銷毀次數(shù),提高服務(wù)器的工作效率
- 在開發(fā)中,如果需要?jiǎng)?chuàng)建5哥以上的線程,那么就可以使用線程池來管理
線程池構(gòu)造函數(shù)的參數(shù)
| 參數(shù)名 | 類型 | 含義 |
|---|---|---|
| corePoolSize | int | 核心線程數(shù) |
| maximumPoolSize | int | 最大線程數(shù) |
| keepAliveTime | long | 保持存活時(shí)間 |
| workQueue | BlockingQueue | 任務(wù)存儲(chǔ)隊(duì)列 |
| threadFactory | ThreadFactory | 當(dāng)線程池需要新的線程的時(shí)候,會(huì)使用threadFactory來生成新的線程 |
| Handler | RejectedExecutionHandler | 由于線程池?zé)o法接受你所提交的任務(wù)的拒絕策略 |
corePoolSize
指的是核心線程數(shù),線程池在完成初始化后,默認(rèn)情況下,線程池中并沒有任何線程,線程池會(huì)等待有任務(wù)到來時(shí),再創(chuàng)建新線程去執(zhí)行任務(wù)
maximumPoolSize
線程池有可能會(huì)在核心線程數(shù)的基礎(chǔ)上,額外增加一些線程,但是這些新增加的線程數(shù)有一個(gè)上限,這就是maximumPoolSize
keepAliveTime
如果線程池當(dāng)前的線程數(shù)多于corePoolSize,那么如果多余的線程空閑時(shí)間超過keepAliveTime,它們就會(huì)被終止
ThreadFactory
新的線程都是由ThreadFactory創(chuàng)建的,默認(rèn)使用Executors.defaultThreadFactory,創(chuàng)建出來的線程都在同一個(gè)線程組,擁有同樣的NORM_PRIORITY優(yōu)先級(jí)并且都不是守護(hù)線程。如果自己指定ThreadFactory,那么就可以改變線程名、線程組、優(yōu)先級(jí)、是否是守護(hù)線程等。通常使用默認(rèn)的ThreadFactory就可以了。
workQueue
有3種最常見的隊(duì)列類型
1.直接交接:SynchronousQueue
2.無界隊(duì)列:LinkedBlockingQueue
3.有界隊(duì)列:ArrayBlockingQueue
線程池添加線程規(guī)則
- 如果線程數(shù)小于corePoolSize,即使其他工作線程處于空閑狀態(tài),也會(huì)創(chuàng)建一個(gè)新線程來運(yùn)行新任務(wù)
- 如果線程數(shù)等于(或大于)corePoolSize但少于maximumPoolSize,則將任務(wù)放入隊(duì)列
- 如果隊(duì)列已滿,并且線程數(shù)小于maximumPoolSize,則創(chuàng)建一個(gè)新線程來運(yùn)行任務(wù)
- 如果隊(duì)列已滿,并且線程數(shù)大于或等于maximumPoolSize,則拒絕該任務(wù)

增減線程的特點(diǎn)
- 通過設(shè)置corePoolSize和maximumPoolSize相同,就可以創(chuàng)建固定大小的線程池;
- 線程池希望保持較少的線程數(shù),并且只有在負(fù)載變得很大時(shí)才增加它;
- 通過設(shè)置maximumPoolSize為很高的值(例如Integer.MAX_VALUE),可以允許線程池容納任意數(shù)量的并發(fā)任務(wù);
- 是只有在隊(duì)列填滿時(shí)才創(chuàng)建多于corePoolSize的線程,所以如果你使用的是無界隊(duì)列(例如LinkedBlockingQueue),那么線程數(shù)就不會(huì)超過corePoolSize。
線程池應(yīng)該手動(dòng)創(chuàng)建還是自動(dòng)創(chuàng)建
newFixedThreadPool:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
- 通過源碼可以看出,newFixedThreadPool 使用的是 LinkedBlockingQueue,由于 LinkedBlockingQueue 是沒有容量上限的,所以當(dāng)請(qǐng)求數(shù)越來越多,并且無法及時(shí)處理完畢的時(shí)候,也就是請(qǐng)求堆積的時(shí)候,會(huì)容易造成占用大量的內(nèi)存,可能會(huì)導(dǎo)致OOM
newSingleThreadExecutor:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- 通過源碼可以看出,這里和上面的 newFixedThreadPool 的原理基本一樣,只不過是把線程數(shù)直接設(shè)置成了1,所以這也會(huì)導(dǎo)致同樣的問題,也就是當(dāng)請(qǐng)求堆積的時(shí)候,可能會(huì)占用大量的內(nèi)存。
newCachedThreadPool:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
- 可緩存線程池
- 特點(diǎn):無界限線程池,具有自動(dòng)回收多余線程的功能(默認(rèn)時(shí)間是60秒)
- 弊端:在于第二個(gè)參數(shù) maximumPoolSize 被設(shè)置為了Integer.MAX_VALUE,這可能會(huì)創(chuàng)建數(shù)量非常多的線程,甚至導(dǎo)致OOM。
newScheduledThreadPool:
- 支持定時(shí)及周期性任務(wù)執(zhí)行的線程池
正確的創(chuàng)建線程池的方法
根據(jù)不同的業(yè)務(wù)場景,選擇合適的方式,最后是我們自己手動(dòng)創(chuàng)建線程池,自己設(shè)置線程池參數(shù)。
線程池里的線程數(shù)量設(shè)定為多少比較合適
- CPU密集型(加密、計(jì)算hash等):最佳線程數(shù)為CPU核心數(shù)的 1-2 倍左右
- 耗時(shí)IO型(讀寫數(shù)據(jù)庫、文件、網(wǎng)絡(luò)讀寫等):最佳線程數(shù)一般會(huì)大于CPU核心數(shù)很多倍,以JVM線程監(jiān)控顯示繁忙情況為依據(jù),保證線程空閑可以銜接上,參考 Brain Goetz 推薦的計(jì)算方法:
線程數(shù)=CPU核心數(shù)x(1+平均等待時(shí)間/平均工作時(shí)間)
停止線程池的正確方法
- shutdown
shutdown 并不是立即粗暴的結(jié)束線程,線程池仍然會(huì)繼續(xù)執(zhí)行已創(chuàng)建的任務(wù),但是不會(huì)接收新的任務(wù)了。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ShutDownDemo {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
executorService.shutdown();
// 1500毫秒后此處會(huì)拋出異常 RejectedExecutionException
executorService.execute(new ShutDownTask());
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上述代碼在1.5秒后執(zhí)行 executorService.shutdown(); 之后,再執(zhí)行
executorService.execute(new ShutDownTask()); 則會(huì)報(bào)錯(cuò)拋出異常 RejectedExecutionException
- isShutdown
返回true或false告訴我們線程是否已經(jīng)停止
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
System.out.println(executorService.isShutdown()); // 打印false
executorService.shutdown();
System.out.println(executorService.isShutdown()); // 打印true
System.out.println(executorService.isTerminated());
}
- isTerminated
返回true或false告訴我們線程是否已經(jīng)完全停止
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
executorService.shutdown();
// 打印false,因?yàn)榫€程還沒有完全執(zhí)行完
System.out.println(executorService.isTerminated());
}
- awaitTermination
awaitTermination 有三種情況會(huì)返回,沒返回之前都是阻塞的
第一種情況:所有任務(wù)都執(zhí)行完畢了
第二種情況:等待的時(shí)間到了
第三種情況:等待的過程中被打斷了,會(huì)拋出 InterruptedException
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
boolean b = executorService.awaitTermination(3L, TimeUnit.SECONDS);
// 會(huì)打印false,因?yàn)?秒鐘不夠線程全部執(zhí)行完
System.out.println(b);
}
- shutdownNow
正在執(zhí)行任務(wù)的線程繼續(xù)執(zhí)行,等待隊(duì)列中的線程直接結(jié)束并返回
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ShutDownDemo {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
executorService.execute(new ShutDownTask());
}
// 等待1.5秒
Thread.sleep(1500);
// 這里會(huì)返回已經(jīng)放到線程池隊(duì)列中還沒有執(zhí)行的Runnable
List<Runnable> runnables = executorService.shutdownNow();
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
System.out.println(Thread.currentThread().getName());
} catch (InterruptedException e) {
// 接收被中斷信號(hào)
System.out.println(Thread.currentThread().getName() + "被中斷了!");
}
}
}
executorService.shutdownNow();會(huì)返回已經(jīng)放到線程池隊(duì)列中還沒有執(zhí)行的Runnable
運(yùn)行結(jié)果:
...
...
...
pool-1-thread-8
pool-1-thread-7
pool-1-thread-6
pool-1-thread-10
pool-1-thread-1
pool-1-thread-3
pool-1-thread-1被中斷了!
pool-1-thread-4
pool-1-thread-2
pool-1-thread-7被中斷了!
pool-1-thread-8被中斷了!
pool-1-thread-10被中斷了!
pool-1-thread-6被中斷了!
pool-1-thread-9被中斷了!
pool-1-thread-5被中斷了!
Process finished with exit code 0
線程池任務(wù)太多,怎么拒絕
-
拒絕時(shí)機(jī)
- 當(dāng)Executor關(guān)閉時(shí),提交新任務(wù)會(huì)被拒絕
- 以及當(dāng)Executor對(duì)最大線程和工作隊(duì)列容量使用有限邊界并且已經(jīng)飽和時(shí)
-
拒絕策略
- AbortPolicy,直接拋出一個(gè)異常
- DiscardPolicy,悄悄的把任務(wù)丟棄,沒有通知
- DiscardOldestPolicy,把隊(duì)列中最老的那個(gè)任務(wù)丟棄,新任務(wù)加進(jìn)來
- CallerRunsPolicy,誰提交的任務(wù)誰去執(zhí)行(比如說主線程給線程池提交了一個(gè)任務(wù),但是線程池已經(jīng)飽和無法再執(zhí)行了,這時(shí)則會(huì)讓提交任務(wù)的主線程去執(zhí)行這個(gè)任務(wù))
線程池鉤子函數(shù)
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* 演示每個(gè)任務(wù)執(zhí)行前后放鉤子函數(shù)
*/
public class PauseableThreadPool extends ThreadPoolExecutor {
private boolean isPaused;
private final ReentrantLock lock = new ReentrantLock();
private Condition unpaused = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被執(zhí)行了");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(2000);
// 等待2秒,執(zhí)行暫停方法
pauseableThreadPool.pause();
System.out.println("線程池被暫停了!");
Thread.sleep(2000);
// 再等待2秒,執(zhí)行恢復(fù)方法
pauseableThreadPool.resume();
System.out.println("線程池被恢復(fù)了!");
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused) {
unpaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 暫停線程
*/
private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}
/**
* 恢復(fù)線程
*/
private void resume() {
lock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
lock.unlock();
}
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
}
運(yùn)行結(jié)果:
