【Java 并發(fā)編程】Java 創(chuàng)建線程池的正確姿勢: Executors 和 ThreadPoolExecutor 詳解

我們先看 Java 開發(fā)手冊上說的:

我們可以看一下源碼:


這里的 ThreadPoolExecutor 的構(gòu)造函數(shù)如下:

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters and default thread factory and rejected execution handler.
     * It may be more convenient to use one of the {@link Executors} factory
     * methods instead of this general purpose constructor.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

參數(shù)說明:

RejectedExecutionHandler

其中,RejectedExecutionHandler(拒絕策略)指的是當(dāng)阻塞隊(duì)列滿了之后,線程數(shù)量也達(dá)到最大值,無法再接受新任務(wù)的時(shí)候,可以根據(jù)飽和策略對新任務(wù)作出相應(yīng)的處理。原生JDK線程池提供了4種飽和策略:

AbortPolicy:直接拋出異常。
CallerRunsPolicy:只用調(diào)用者所在線程來運(yùn)行任務(wù)。
DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)。
DiscardPolicy:不處理,丟棄掉

除此之外,我們還可以自定義飽和策略滿足業(yè)務(wù)場景的需求,比如:

public class LogPolicy implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (!executor.isShutdown()) {
            // 持久化不能處理的任務(wù)
            insertToDB(r);
        }
    }
}

以上是ThreadPoolExecutor構(gòu)造函數(shù)的參數(shù)詳細(xì)解析和作用。

類圖結(jié)構(gòu):

Executors的創(chuàng)建線程池的方法,創(chuàng)建出來的線程池都實(shí)現(xiàn)了ExecutorService接口。常用方法有以下幾個(gè):

newFiexedThreadPool(int Threads):創(chuàng)建固定數(shù)目線程的線程池。

newCachedThreadPool():創(chuàng)建一個(gè)可緩存的線程池,調(diào)用execute 將重用以前構(gòu)造的線程(如果線程可用)。如果沒有可用的線程,則創(chuàng)建一個(gè)新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。

newSingleThreadExecutor()創(chuàng)建一個(gè)單線程化的Executor。

newScheduledThreadPool(int corePoolSize) 創(chuàng)建一個(gè)支持定時(shí)及周期性的任務(wù)執(zhí)行的線程池,多數(shù)情況下可用來替代Timer類。

類看起來功能還是比較強(qiáng)大的,又用到了工廠模式、又有比較強(qiáng)的擴(kuò)展性,重要的是用起來還比較方便,如:

ExecutorService executor = Executors.newFixedThreadPool(nThreads) ;

即可創(chuàng)建一個(gè)固定大小的線程池。

執(zhí)行原理

線程池執(zhí)行器將會(huì)根據(jù)corePoolSize和maximumPoolSize自動(dòng)地調(diào)整線程池大小。

當(dāng)在execute(Runnable)方法中提交新任務(wù)并且少于corePoolSize線程正在運(yùn)行時(shí),即使其他工作線程處于空閑狀態(tài),也會(huì)創(chuàng)建一個(gè)新線程來處理該請求。 如果有多于corePoolSize但小于maximumPoolSize線程正在運(yùn)行,則僅當(dāng)隊(duì)列已滿時(shí)才會(huì)創(chuàng)建新線程。 通過設(shè)置corePoolSize和maximumPoolSize相同,您可以創(chuàng)建一個(gè)固定大小的線程池。 通過將maximumPoolSize設(shè)置為基本上無界的值,例如Integer.MAX_VALUE,您可以允許池容納任意數(shù)量的并發(fā)任務(wù)。 通常,核心和最大池大小僅在構(gòu)建時(shí)設(shè)置,但也可以使用setCorePoolSize和setMaximumPoolSize進(jìn)行動(dòng)態(tài)更改。

這段話詳細(xì)了描述了線程池對任務(wù)的處理流程,這里用個(gè)圖總結(jié)一下

使用 Executors 創(chuàng)建四種類型的線程池

newCachedThreadPool是Executors工廠類的一個(gè)靜態(tài)函數(shù),用來創(chuàng)建一個(gè)可以無限擴(kuò)大的線程池。

而Executors工廠類一共可以創(chuàng)建四種類型的線程池,通過Executors.newXXX即可創(chuàng)建。下面就分別都介紹一下。

1. FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads){
    return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
  • 它是一種固定大小的線程池;
  • corePoolSize和maximunPoolSize都為用戶設(shè)定的線程數(shù)量nThreads;
  • keepAliveTime為0,意味著一旦有多余的空閑線程,就會(huì)被立即停止掉;但這里keepAliveTime無效;
  • 阻塞隊(duì)列采用了LinkedBlockingQueue,它是一個(gè)無界隊(duì)列;
  • 由于阻塞隊(duì)列是一個(gè)無界隊(duì)列,因此永遠(yuǎn)不可能拒絕任務(wù);
  • 由于采用了無界隊(duì)列,實(shí)際線程數(shù)量將永遠(yuǎn)維持在nThreads,因此maximumPoolSize和keepAliveTime將無效。

2. CachedThreadPool

public static ExecutorService newCachedThreadPool(){
    return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUnit.MILLISECONDS,new SynchronousQueue<Runnable>());
}
  • 它是一個(gè)可以無限擴(kuò)大的線程池;
  • 它比較適合處理執(zhí)行時(shí)間比較小的任務(wù);
  • corePoolSize為0,maximumPoolSize為無限大,意味著線程數(shù)量可以無限大;
  • keepAliveTime為60S,意味著線程空閑時(shí)間超過60S就會(huì)被殺死;
  • 采用SynchronousQueue裝等待的任務(wù),這個(gè)阻塞隊(duì)列沒有存儲(chǔ)空間,這意味著只要有請求到來,就必須要找到一條工作線程處理他,如果當(dāng)前沒有空閑的線程,那么就會(huì)再創(chuàng)建一條新的線程。

3. SingleThreadExecutor

public static ExecutorService newSingleThreadExecutor(){
    return new ThreadPoolExecutor(1,1,0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
  • 它只會(huì)創(chuàng)建一條工作線程處理任務(wù);
  • 采用的阻塞隊(duì)列為LinkedBlockingQueue;

4. ScheduledThreadPool

它用來處理延時(shí)任務(wù)或定時(shí)任務(wù)。

  • 它接收SchduledFutureTask類型的任務(wù),有兩種提交任務(wù)的方式:
  1. scheduledAtFixedRate
  2. scheduledWithFixedDelay
  • SchduledFutureTask接收的參數(shù):
  1. time:任務(wù)開始的時(shí)間
  2. sequenceNumber:任務(wù)的序號(hào)
  3. period:任務(wù)執(zhí)行的時(shí)間間隔
  • 它采用DelayQueue存儲(chǔ)等待的任務(wù)
  • DelayQueue內(nèi)部封裝了一個(gè)PriorityQueue,它會(huì)根據(jù)time的先后時(shí)間排序,若time相同則根據(jù)sequenceNumber排序;
  • DelayQueue也是一個(gè)無界隊(duì)列;
  • 工作線程的執(zhí)行過程:
  • 工作線程會(huì)從DelayQueue取已經(jīng)到期的任務(wù)去執(zhí)行;
  • 執(zhí)行結(jié)束后重新設(shè)置任務(wù)的到期時(shí)間,再次放回DelayQueue

Executors存在什么問題

在阿里巴巴Java開發(fā)手冊中提到,使用Executors創(chuàng)建線程池可能會(huì)導(dǎo)致OOM(OutOfMemory ,內(nèi)存溢出),但是并沒有說明為什么,那么接下來我們就來看一下到底為什么不允許使用Executors?

我們先來一個(gè)簡單的例子,模擬一下使用Executors導(dǎo)致OOM的情況。

/**
 * @author Hollis
 */
public class ExecutorsDemo {
    private static ExecutorService executor = Executors.newFixedThreadPool(15);
    public static void main(String[] args) {
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            executor.execute(new SubThread());
        }
    }
}

class SubThread implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            //do nothing
        }
    }
}

通過指定JVM參數(shù):-Xmx8m -Xms8m 運(yùn)行以上代碼,會(huì)拋出OOM:

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
    at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)
以上代碼指出,ExecutorsDemo.java的第16行,就是代碼中的executor.execute(new SubThread());。

Executors為什么存在缺陷

通過上面的例子,我們知道了Executors創(chuàng)建的線程池存在OOM的風(fēng)險(xiǎn),那么到底是什么原因?qū)е碌哪??我們需要深入Executors的源碼來分析一下。

其實(shí),在上面的報(bào)錯(cuò)信息中,我們是可以看出蛛絲馬跡的,在以上的代碼中其實(shí)已經(jīng)說了,真正的導(dǎo)致OOM的其實(shí)是LinkedBlockingQueue.offer方法。

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:416)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
    at com.hollis.ExecutorsDemo.main(ExecutorsDemo.java:16)

如果讀者翻看代碼的話,也可以發(fā)現(xiàn),其實(shí)底層確實(shí)是通過LinkedBlockingQueue實(shí)現(xiàn)的:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());

如果讀者對Java中的阻塞隊(duì)列有所了解的話,看到這里或許就能夠明白原因了。

Java中的BlockingQueue主要有兩種實(shí)現(xiàn),分別是ArrayBlockingQueue 和 LinkedBlockingQueue。

ArrayBlockingQueue是一個(gè)用數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列,必須設(shè)置容量。

LinkedBlockingQueue是一個(gè)用鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列,容量可以選擇進(jìn)行設(shè)置,不設(shè)置的話,將是一個(gè)無邊界的阻塞隊(duì)列,最大長度為Integer.MAX_VALUE。

這里的問題就出在:不設(shè)置的話,將是一個(gè)無邊界的阻塞隊(duì)列,最大長度為Integer.MAX_VALUE。也就是說,如果我們不設(shè)置LinkedBlockingQueue的容量的話,其默認(rèn)容量將會(huì)是Integer.MAX_VALUE。

而newFixedThreadPool中創(chuàng)建LinkedBlockingQueue時(shí),并未指定容量。此時(shí),LinkedBlockingQueue就是一個(gè)無邊界隊(duì)列,對于一個(gè)無邊界隊(duì)列來說,是可以不斷的向隊(duì)列中加入任務(wù)的,這種情況下就有可能因?yàn)槿蝿?wù)過多而導(dǎo)致內(nèi)存溢出問題。

上面提到的問題主要體現(xiàn)在newFixedThreadPool和newSingleThreadExecutor兩個(gè)工廠方法上,并不是說newCachedThreadPool和newScheduledThreadPool這兩個(gè)方法就安全了,這兩種方式創(chuàng)建的最大線程數(shù)可能是Integer.MAX_VALUE,而創(chuàng)建這么多線程,必然就有可能導(dǎo)致OOM。

創(chuàng)建線程池的正確姿勢

避免使用Executors創(chuàng)建線程池,主要是避免使用其中的默認(rèn)實(shí)現(xiàn),那么我們可以自己直接調(diào)用ThreadPoolExecutor的構(gòu)造函數(shù)來自己創(chuàng)建線程池。在創(chuàng)建的同時(shí),給BlockQueue指定容量就可以了。

private static ExecutorService executor = new ThreadPoolExecutor(10, 10,
        60L, TimeUnit.SECONDS,
        new ArrayBlockingQueue(10));

這種情況下,一旦提交的線程數(shù)超過當(dāng)前可用線程數(shù)時(shí),就會(huì)拋出java.util.concurrent.RejectedExecutionException,這是因?yàn)楫?dāng)前線程池使用的隊(duì)列是有邊界隊(duì)列,隊(duì)列已經(jīng)滿了便無法繼續(xù)處理新的請求。但是異常(Exception)總比發(fā)生錯(cuò)誤(Error)要好。

除了自己定義ThreadPoolExecutor外。還有其他方法。這個(gè)時(shí)候第一時(shí)間就應(yīng)該想到開源類庫,如apache和guava等。

作者推薦使用guava提供的ThreadFactoryBuilder來創(chuàng)建線程池。

public class ExecutorsDemo {

    private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
        .setNameFormat("demo-pool-%d").build();

    private static ExecutorService pool = new ThreadPoolExecutor(5, 200,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

    public static void main(String[] args) {

        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            pool.execute(new SubThread());
        }
    }
}

通過上述方式創(chuàng)建線程時(shí),不僅可以避免OOM的問題,還可以自定義線程名稱,更加方便的出錯(cuò)的時(shí)候溯源。

參考資料

https://www.zhihu.com/question/23212914
https://www.zhihu.com/question/23212914/answer/245992718
http://m.itdecent.cn/p/c41e942bcd64
http://m.itdecent.cn/p/5c688d14188a


Kotlin開發(fā)者社區(qū)

專注分享 Java、 Kotlin、Spring/Spring Boot、MySQL、redis、neo4j、NoSQL、Android、JavaScript、React、Node、函數(shù)式編程、編程思想、"高可用,高性能,高實(shí)時(shí)"大型分布式系統(tǒng)架構(gòu)設(shè)計(jì)主題。

High availability, high performance, high real-time large-scale distributed system architecture design。

分布式框架:Zookeeper、分布式中間件框架等
分布式存儲(chǔ):GridFS、FastDFS、TFS、MemCache、redis等
分布式數(shù)據(jù)庫:Cobar、tddl、Amoeba、Mycat
云計(jì)算、大數(shù)據(jù)、AI算法
虛擬化、云原生技術(shù)
分布式計(jì)算框架:MapReduce、Hadoop、Storm、Flink等
分布式通信機(jī)制:Dubbo、RPC調(diào)用、共享遠(yuǎn)程數(shù)據(jù)、消息隊(duì)列等
消息隊(duì)列MQ:Kafka、MetaQ,RocketMQ
怎樣打造高可用系統(tǒng):基于硬件、軟件中間件、系統(tǒng)架構(gòu)等一些典型方案的實(shí)現(xiàn):HAProxy、基于Corosync+Pacemaker的高可用集群套件中間件系統(tǒng)
Mycat架構(gòu)分布式演進(jìn)
大數(shù)據(jù)Join背后的難題:數(shù)據(jù)、網(wǎng)絡(luò)、內(nèi)存和計(jì)算能力的矛盾和調(diào)和
Java分布式系統(tǒng)中的高性能難題:AIO,NIO,Netty還是自己開發(fā)框架?
高性能事件派發(fā)機(jī)制:線程池模型、Disruptor模型等等。。。

合抱之木,生于毫末;九層之臺(tái),起于壘土;千里之行,始于足下。不積跬步,無以至千里;不積小流,無以成江河。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容