我們先看 Java 開發(fā)手冊上說的:

我們可以看一下源碼:

這里的 ThreadPoolExecutor 的構(gòu)造函數(shù)如下:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory and rejected execution handler.
* It may be more convenient to use one of the {@link Executors} factory
* methods instead of this general purpose constructor.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
參數(shù)說明:

RejectedExecutionHandler
其中,RejectedExecutionHandler(拒絕策略)指的是當(dāng)阻塞隊(duì)列滿了之后,線程數(shù)量也達(dá)到最大值,無法再接受新任務(wù)的時(shí)候,可以根據(jù)飽和策略對新任務(wù)作出相應(yīng)的處理。原生JDK線程池提供了4種飽和策略:
AbortPolicy:直接拋出異常。
CallerRunsPolicy:只用調(diào)用者所在線程來運(yùn)行任務(wù)。
DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)。
DiscardPolicy:不處理,丟棄掉
除此之外,我們還可以自定義飽和策略滿足業(yè)務(wù)場景的需求,比如:
public class LogPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
// 持久化不能處理的任務(wù)
insertToDB(r);
}
}
}
以上是ThreadPoolExecutor構(gòu)造函數(shù)的參數(shù)詳細(xì)解析和作用。
類圖結(jié)構(gòu):

Executors的創(chuàng)建線程池的方法,創(chuàng)建出來的線程池都實(shí)現(xiàn)了ExecutorService接口。常用方法有以下幾個(gè):
newFiexedThreadPool(int Threads):創(chuàng)建固定數(shù)目線程的線程池。
newCachedThreadPool():創(chuàng)建一個(gè)可緩存的線程池,調(diào)用execute 將重用以前構(gòu)造的線程(如果線程可用)。如果沒有可用的線程,則創(chuàng)建一個(gè)新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。
newSingleThreadExecutor()創(chuàng)建一個(gè)單線程化的Executor。
newScheduledThreadPool(int corePoolSize) 創(chuàng)建一個(gè)支持定時(shí)及周期性的任務(wù)執(zhí)行的線程池,多數(shù)情況下可用來替代Timer類。
類看起來功能還是比較強(qiáng)大的,又用到了工廠模式、又有比較強(qiáng)的擴(kuò)展性,重要的是用起來還比較方便,如:
ExecutorService executor = Executors.newFixedThreadPool(nThreads) ;
即可創(chuàng)建一個(gè)固定大小的線程池。
執(zhí)行原理
線程池執(zhí)行器將會(huì)根據(jù)corePoolSize和maximumPoolSize自動(dòng)地調(diào)整線程池大小。
當(dāng)在execute(Runnable)方法中提交新任務(wù)并且少于corePoolSize線程正在運(yùn)行時(shí),即使其他工作線程處于空閑狀態(tài),也會(huì)創(chuàng)建一個(gè)新線程來處理該請求。 如果有多于corePoolSize但小于maximumPoolSize線程正在運(yùn)行,則僅當(dāng)隊(duì)列已滿時(shí)才會(huì)創(chuàng)建新線程。 通過設(shè)置corePoolSize和maximumPoolSize相同,您可以創(chuàng)建一個(gè)固定大小的線程池。 通過將maximumPoolSize設(shè)置為基本上無界的值,例如Integer.MAX_VALUE,您可以允許池容納任意數(shù)量的并發(fā)任務(wù)。 通常,核心和最大池大小僅在構(gòu)建時(shí)設(shè)置,但也可以使用setCorePoolSize和setMaximumPoolSize進(jìn)行動(dòng)態(tài)更改。
這段話詳細(xì)了描述了線程池對任務(wù)的處理流程,這里用個(gè)圖總結(jié)一下

使用 Executors 創(chuàng)建四種類型的線程池
newCachedThreadPool是Executors工廠類的一個(gè)靜態(tài)函數(shù),用來創(chuàng)建一個(gè)可以無限擴(kuò)大的線程池。
而Executors工廠類一共可以創(chuàng)建四種類型的線程池,通過Executors.newXXX即可創(chuàng)建。下面就分別都介紹一下。
1. FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads){
return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

- 它是一種固定大小的線程池;
- corePoolSize和maximunPoolSize都為用戶設(shè)定的線程數(shù)量nThreads;
- keepAliveTime為0,意味著一旦有多余的空閑線程,就會(huì)被立即停止掉;但這里keepAliveTime無效;
- 阻塞隊(duì)列采用了LinkedBlockingQueue,它是一個(gè)無界隊(duì)列;
- 由于阻塞隊(duì)列是一個(gè)無界隊(duì)列,因此永遠(yuǎn)不可能拒絕任務(wù);
- 由于采用了無界隊(duì)列,實(shí)際線程數(shù)量將永遠(yuǎn)維持在nThreads,因此maximumPoolSize和keepAliveTime將無效。
2. CachedThreadPool
public static ExecutorService newCachedThreadPool(){
return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>());
}

- 它是一個(gè)可以無限擴(kuò)大的線程池;
- 它比較適合處理執(zhí)行時(shí)間比較小的任務(wù);
- corePoolSize為0,maximumPoolSize為無限大,意味著線程數(shù)量可以無限大;
- keepAliveTime為60S,意味著線程空閑時(shí)間超過60S就會(huì)被殺死;
- 采用SynchronousQueue裝等待的任務(wù),這個(gè)阻塞隊(duì)列沒有存儲(chǔ)空間,這意味著只要有請求到來,就必須要找到一條工作線程處理他,如果當(dāng)前沒有空閑的線程,那么就會(huì)再創(chuàng)建一條新的線程。
3. SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(){
return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

- 它只會(huì)創(chuàng)建一條工作線程處理任務(wù);
- 采用的阻塞隊(duì)列為LinkedBlockingQueue;
4. ScheduledThreadPool
它用來處理延時(shí)任務(wù)或定時(shí)任務(wù)。

- 它接收SchduledFutureTask類型的任務(wù),有兩種提交任務(wù)的方式:
- scheduledAtFixedRate
- scheduledWithFixedDelay
- SchduledFutureTask接收的參數(shù):
- time:任務(wù)開始的時(shí)間
- sequenceNumber:任務(wù)的序號(hào)
- period:任務(wù)執(zhí)行的時(shí)間間隔
- 它采用DelayQueue存儲(chǔ)等待的任務(wù)
- DelayQueue內(nèi)部封裝了一個(gè)PriorityQueue,它會(huì)根據(jù)time的先后時(shí)間排序,若time相同則根據(jù)sequenceNumber排序;
- DelayQueue也是一個(gè)無界隊(duì)列;
- 工作線程的執(zhí)行過程:
- 工作線程會(huì)從DelayQueue取已經(jīng)到期的任務(wù)去執(zhí)行;
- 執(zhí)行結(jié)束后重新設(shè)置任務(wù)的到期時(shí)間,再次放回DelayQueue
Executors存在什么問題
在阿里巴巴Java開發(fā)手冊中提到,使用Executors創(chuàng)建線程池可能會(huì)導(dǎo)致OOM(OutOfMemory ,內(nèi)存溢出),但是并沒有說明為什么,那么接下來我們就來看一下到底為什么不允許使用Executors?
我們先來一個(gè)簡單的例子,模擬一下使用Executors導(dǎo)致OOM的情況。
/**
* @author Hollis
*/
public class ExecutorsDemo {
private static ExecutorService executor = Executors.newFixedThreadPool(15);
public static void main(String[] args) {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executor.execute(new SubThread());
}
}
}
class SubThread implements Runnable {
@Override
public void run() {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
//do nothing
}
}
}
通過指定JVM參數(shù):-Xmx8m -Xms8m 運(yùn)行以上代碼,會(huì)拋出OOM:
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)
以上代碼指出,ExecutorsDemo.java的第16行,就是代碼中的executor.execute(new SubThread());。
Executors為什么存在缺陷
通過上面的例子,我們知道了Executors創(chuàng)建的線程池存在OOM的風(fēng)險(xiǎn),那么到底是什么原因?qū)е碌哪??我們需要深入Executors的源碼來分析一下。
其實(shí),在上面的報(bào)錯(cuò)信息中,我們是可以看出蛛絲馬跡的,在以上的代碼中其實(shí)已經(jīng)說了,真正的導(dǎo)致OOM的其實(shí)是LinkedBlockingQueue.offer方法。
Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)
如果讀者翻看代碼的話,也可以發(fā)現(xiàn),其實(shí)底層確實(shí)是通過LinkedBlockingQueue實(shí)現(xiàn)的:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
如果讀者對Java中的阻塞隊(duì)列有所了解的話,看到這里或許就能夠明白原因了。
Java中的BlockingQueue主要有兩種實(shí)現(xiàn),分別是ArrayBlockingQueue 和 LinkedBlockingQueue。
ArrayBlockingQueue是一個(gè)用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列,必須設(shè)置容量。
LinkedBlockingQueue是一個(gè)用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列,容量可以選擇進(jìn)行設(shè)置,不設(shè)置的話,將是一個(gè)無邊界的阻塞隊(duì)列,最大長度為Integer.MAX_VALUE。
這里的問題就出在:不設(shè)置的話,將是一個(gè)無邊界的阻塞隊(duì)列,最大長度為Integer.MAX_VALUE。也就是說,如果我們不設(shè)置LinkedBlockingQueue的容量的話,其默認(rèn)容量將會(huì)是Integer.MAX_VALUE。
而newFixedThreadPool中創(chuàng)建LinkedBlockingQueue時(shí),并未指定容量。此時(shí),LinkedBlockingQueue就是一個(gè)無邊界隊(duì)列,對于一個(gè)無邊界隊(duì)列來說,是可以不斷的向隊(duì)列中加入任務(wù)的,這種情況下就有可能因?yàn)槿蝿?wù)過多而導(dǎo)致內(nèi)存溢出問題。
上面提到的問題主要體現(xiàn)在newFixedThreadPool和newSingleThreadExecutor兩個(gè)工廠方法上,并不是說newCachedThreadPool和newScheduledThreadPool這兩個(gè)方法就安全了,這兩種方式創(chuàng)建的最大線程數(shù)可能是Integer.MAX_VALUE,而創(chuàng)建這么多線程,必然就有可能導(dǎo)致OOM。
創(chuàng)建線程池的正確姿勢
避免使用Executors創(chuàng)建線程池,主要是避免使用其中的默認(rèn)實(shí)現(xiàn),那么我們可以自己直接調(diào)用ThreadPoolExecutor的構(gòu)造函數(shù)來自己創(chuàng)建線程池。在創(chuàng)建的同時(shí),給BlockQueue指定容量就可以了。
private static ExecutorService executor = new ThreadPoolExecutor(10, 10,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
這種情況下,一旦提交的線程數(shù)超過當(dāng)前可用線程數(shù)時(shí),就會(huì)拋出java.util.concurrent.RejectedExecutionException,這是因?yàn)楫?dāng)前線程池使用的隊(duì)列是有邊界隊(duì)列,隊(duì)列已經(jīng)滿了便無法繼續(xù)處理新的請求。但是異常(Exception)總比發(fā)生錯(cuò)誤(Error)要好。
除了自己定義ThreadPoolExecutor外。還有其他方法。這個(gè)時(shí)候第一時(shí)間就應(yīng)該想到開源類庫,如apache和guava等。
作者推薦使用guava提供的ThreadFactoryBuilder來創(chuàng)建線程池。
public class ExecutorsDemo {
private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();
private static ExecutorService pool = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
pool.execute(new SubThread());
}
}
}
通過上述方式創(chuàng)建線程時(shí),不僅可以避免OOM的問題,還可以自定義線程名稱,更加方便的出錯(cuò)的時(shí)候溯源。
參考資料
https://www.zhihu.com/question/23212914
https://www.zhihu.com/question/23212914/answer/245992718
http://m.itdecent.cn/p/c41e942bcd64
http://m.itdecent.cn/p/5c688d14188a
Kotlin開發(fā)者社區(qū)

專注分享 Java、 Kotlin、Spring/Spring Boot、MySQL、redis、neo4j、NoSQL、Android、JavaScript、React、Node、函數(shù)式編程、編程思想、"高可用,高性能,高實(shí)時(shí)"大型分布式系統(tǒng)架構(gòu)設(shè)計(jì)主題。
High availability, high performance, high real-time large-scale distributed system architecture design。
分布式框架:Zookeeper、分布式中間件框架等
分布式存儲(chǔ):GridFS、FastDFS、TFS、MemCache、redis等
分布式數(shù)據(jù)庫:Cobar、tddl、Amoeba、Mycat
云計(jì)算、大數(shù)據(jù)、AI算法
虛擬化、云原生技術(shù)
分布式計(jì)算框架:MapReduce、Hadoop、Storm、Flink等
分布式通信機(jī)制:Dubbo、RPC調(diào)用、共享遠(yuǎn)程數(shù)據(jù)、消息隊(duì)列等
消息隊(duì)列MQ:Kafka、MetaQ,RocketMQ
怎樣打造高可用系統(tǒng):基于硬件、軟件中間件、系統(tǒng)架構(gòu)等一些典型方案的實(shí)現(xiàn):HAProxy、基于Corosync+Pacemaker的高可用集群套件中間件系統(tǒng)
Mycat架構(gòu)分布式演進(jìn)
大數(shù)據(jù)Join背后的難題:數(shù)據(jù)、網(wǎng)絡(luò)、內(nèi)存和計(jì)算能力的矛盾和調(diào)和
Java分布式系統(tǒng)中的高性能難題:AIO,NIO,Netty還是自己開發(fā)框架?
高性能事件派發(fā)機(jī)制:線程池模型、Disruptor模型等等。。。
合抱之木,生于毫末;九層之臺(tái),起于壘土;千里之行,始于足下。不積跬步,無以至千里;不積小流,無以成江河。