線程池基本使用和ThreadPoolExecutor核心原理講解

java和spring都提供了線程池的框架

java提供的是Executors;
spring提供的是ThreadPoolTaskExecutor;

一、基本使用

Executors提供了4個(gè)線程池,

  1. FixedThreadPool
  2. SingleThreadExecutor
  3. CachedThreadPool
  4. ScheduledThreadPool

FixedThreadPool-固定線程池

public class FixPool {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i =0;i<10;i++){
            executorService.execute(()->{
                Thread wt = Thread.currentThread();

                String format = String.format("當(dāng)前線程名稱:%s,當(dāng)前時(shí)間:%s", wt.getName(), LocalDateTime.now());
                System.out.println(format);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}

效果如下:


可以看到5個(gè)線程在接任務(wù),接完5個(gè)任務(wù)之后就停止了1秒,完成之后繼續(xù)接任務(wù);

SingleThreadExecutor-單一線程池,線程數(shù)為1的固定線程池

為什么要?jiǎng)?chuàng)造這么一個(gè)線程池出來(lái)呢?
因?yàn)橛行r(shí)候需要用到線程池的隊(duì)列任務(wù)機(jī)制,又不想多線程并發(fā)。此時(shí)就需要用單一線程池了。
以下兩種寫法完全一樣的效果

ExecutorService executorService = Executors.newSingleThreadExecutor();
ExecutorService executorService = Executors.newFixedThreadPool(1);

CachedThreadPool-緩存線程池,線程數(shù)為無(wú)窮大的一個(gè)線程池

當(dāng)有空閑線程的時(shí)候就讓空閑線程去做任務(wù);
當(dāng)沒空閑線程的時(shí)候就新建一個(gè)線程去任務(wù);

public class CashPool {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();

        for (int i =0;;i++){
            executorService.execute(()->{
                Thread wt = Thread.currentThread();

                String format = String.format("緩存線程池,當(dāng)前線程名稱:%s,當(dāng)前時(shí)間:%s", wt.getName(), LocalDateTime.now());
                System.out.println(format);
                try {
                    double d = Math.random();
                    int sleep = (int)(d*5000);
                    Thread.sleep(sleep);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            Thread.sleep(100);
        }
    }
}

效果如下:



由于任務(wù)耗時(shí)不確定,所以線程池會(huì)動(dòng)態(tài)根據(jù)情況去判斷是否創(chuàng)建新的線程;

ScheduledThreadPool-調(diào)度線程池,線程會(huì)根據(jù)一定的時(shí)間規(guī)律去消化任務(wù)

分別有3個(gè)

  1. schedule(固定延時(shí)才執(zhí)行任務(wù))
  2. scheduleAtFixedRate(一定的間隔執(zhí)行一次任務(wù),執(zhí)行時(shí)長(zhǎng)不影響間隔時(shí)間)
  3. scheduleWithFixedDelay(一定的間隔執(zhí)行一次任務(wù),從任務(wù)執(zhí)行接觸才開始計(jì)算,執(zhí)行時(shí)長(zhǎng)影響間隔時(shí)間)
public class ScheduledPool {
    public static void main(String[] args) {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);
        String begin = String.format("調(diào)度線程池-begin,當(dāng)前時(shí)間:%s",  LocalDateTime.now());
        System.out.println(begin);

//        schedule(pool);
        scheduleAtFixedRate(pool);
//        scheduleWithFixedDelay(pool);
    }

    private static void scheduleAtFixedRate(ScheduledExecutorService pool){
        pool.scheduleAtFixedRate(()->{
            Thread wt = Thread.currentThread();
            String format = String.format("scheduleAtFixedRate-調(diào)度線程池,當(dāng)前線程名稱:%s,當(dāng)前時(shí)間:%s", wt.getName(), LocalDateTime.now());
            System.out.println(format);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },1,1, TimeUnit.SECONDS);
    }


    private static void scheduleWithFixedDelay(ScheduledExecutorService pool){
        pool.scheduleWithFixedDelay(()->{
            Thread wt = Thread.currentThread();
            String format = String.format("scheduleWithFixedDelay-調(diào)度線程池,當(dāng)前線程名稱:%s,當(dāng)前時(shí)間:%s", wt.getName(), LocalDateTime.now());
            System.out.println(format);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },1,1, TimeUnit.SECONDS);
    }
    private static void schedule(ScheduledExecutorService pool){
        for (int i =0;i<10;i++){
            pool.schedule(()->{
                Thread wt = Thread.currentThread();
                String format = String.format("調(diào)度線程池,當(dāng)前線程名稱:%s,當(dāng)前時(shí)間:%s", wt.getName(), LocalDateTime.now());
                System.out.println(format);
            },5, TimeUnit.SECONDS);
        }
        pool.shutdown();
    }
}

二、原理講解

上面介紹的4個(gè)線程池工具,都是基于一個(gè)類ThreadPoolExecutor
ThreadPoolExecutor有幾個(gè)重要的參數(shù)

  1. corePoolSize,核心線程數(shù)
  2. maximumPoolSize,最大線程數(shù)
  3. keepAliveTime,線程空閑時(shí)間,和4組合使用
  4. unit
  5. workQueue,工作隊(duì)列,是各個(gè)工具的機(jī)制策略的核心
  6. threadFactory ,生成線程的工廠,可以代理run方法,還有給thread命名
  7. handler,拒絕策略,當(dāng)任務(wù)太多的時(shí)候會(huì)執(zhí)行的方法,框架有4個(gè)實(shí)現(xiàn)好的策略,可以自己寫自己的策略。

對(duì)于fixPool線程池,corePoolSize=maximumPoolSize=n,keepAliveTime=0,workQueue=LinkedBlockingQueue,threadFactory和handler都是默認(rèn)的。


對(duì)于cashPool線程池,corePoolSize=0,maximumPoolSize=2^32,keepAliveTime=60s,workQueue=SynchronousQueue,threadFactory和handler都是默認(rèn)的。


我們先看一看execute方法


其中ctl參數(shù)是一個(gè)核心參數(shù),保存著線程池的運(yùn)行狀態(tài)和線程數(shù)量,通過workerCountOf()獲取當(dāng)前的工作線程數(shù)量。
execute整個(gè)過程分成3個(gè)部分。

  1. 當(dāng)工作線程數(shù)少于核心線程數(shù),創(chuàng)建一個(gè)工作線程去消化任務(wù)。
  2. 當(dāng)線程池在運(yùn)行狀態(tài)而且能把任務(wù)放到隊(duì)列,則接受任務(wù),調(diào)用addWorker讓線程去消化隊(duì)列的任務(wù)。
  3. 讓線程獲取消化任務(wù)失敗,拒絕任務(wù)。

核心一 workQueue.offer

對(duì)于fixPool,由于workQueue是LinkedBlockingQueue,所以offer方法基本會(huì)返回true。
對(duì)于cashpool,workQueue是SynchronousQueue,如果沒有消費(fèi)者在take,則會(huì)立馬返回false,然后立馬新建一個(gè)線程。

核心二 getTask

每個(gè)線程都被包裝成worker對(duì)象,worker對(duì)象會(huì)執(zhí)行自己的runWorker方法,方法在死循環(huán)不停得調(diào)用getTask方法去消化任務(wù)。



getTask里面最核心的是take和poll方法,這個(gè)是跟你傳入的隊(duì)列特性有關(guān)。


對(duì)于spring提供的ThreadPoolTaskExecutor,其實(shí)也是對(duì)ThreadPoolExecutor的一個(gè)封裝。
具體看initializeExecutor方法



在執(zhí)行execute方法的時(shí)候,也是執(zhí)行ThreadPoolExecutor的execute方法。

結(jié)論,線程池的核心是ThreadPoolExecutor類,根據(jù)傳入的workQueue的offer、poll、take方法特性的不同而誕生緩存線程池,固定線程池,調(diào)度線程等各種線程策略。

github地址:https://github.com/hd-eujian/threadpool.git
碼云地址: https://gitee.com/guoeryyj/threadpool.git

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容