Java線程池基礎(chǔ)詳解

為什么要使用線程池

  1. 反復(fù)創(chuàng)建線程開銷大
  2. 過多的線程會(huì)占用太多內(nèi)存

線程池的好處

  1. 加快響應(yīng)速度
  2. 合理利用CPU和內(nèi)存
  3. 統(tǒng)一管理

線程池適合應(yīng)用的場合

  1. 服務(wù)器接受到大量請(qǐng)求時(shí),使用線程池技術(shù)是非常合適的,它可以大大減少線程的創(chuàng)建和銷毀次數(shù),提高服務(wù)器的工作效率
  2. 在開發(fā)中,如果需要?jiǎng)?chuàng)建5哥以上的線程,那么就可以使用線程池來管理

線程池構(gòu)造函數(shù)的參數(shù)

參數(shù)名 類型 含義
corePoolSize int 核心線程數(shù)
maximumPoolSize int 最大線程數(shù)
keepAliveTime long 保持存活時(shí)間
workQueue BlockingQueue 任務(wù)存儲(chǔ)隊(duì)列
threadFactory ThreadFactory 當(dāng)線程池需要新的線程的時(shí)候,會(huì)使用threadFactory來生成新的線程
Handler RejectedExecutionHandler 由于線程池?zé)o法接受你所提交的任務(wù)的拒絕策略
corePoolSize

指的是核心線程數(shù),線程池在完成初始化后,默認(rèn)情況下,線程池中并沒有任何線程,線程池會(huì)等待有任務(wù)到來時(shí),再創(chuàng)建新線程去執(zhí)行任務(wù)

maximumPoolSize

線程池有可能會(huì)在核心線程數(shù)的基礎(chǔ)上,額外增加一些線程,但是這些新增加的線程數(shù)有一個(gè)上限,這就是maximumPoolSize

keepAliveTime

如果線程池當(dāng)前的線程數(shù)多于corePoolSize,那么如果多余的線程空閑時(shí)間超過keepAliveTime,它們就會(huì)被終止

ThreadFactory

新的線程都是由ThreadFactory創(chuàng)建的,默認(rèn)使用Executors.defaultThreadFactory,創(chuàng)建出來的線程都在同一個(gè)線程組,擁有同樣的NORM_PRIORITY優(yōu)先級(jí)并且都不是守護(hù)線程。如果自己指定ThreadFactory,那么就可以改變線程名、線程組、優(yōu)先級(jí)、是否是守護(hù)線程等。通常使用默認(rèn)的ThreadFactory就可以了。

workQueue

有3種最常見的隊(duì)列類型
1.直接交接:SynchronousQueue
2.無界隊(duì)列:LinkedBlockingQueue
3.有界隊(duì)列:ArrayBlockingQueue

線程池添加線程規(guī)則

  1. 如果線程數(shù)小于corePoolSize,即使其他工作線程處于空閑狀態(tài),也會(huì)創(chuàng)建一個(gè)新線程來運(yùn)行新任務(wù)
  2. 如果線程數(shù)等于(或大于)corePoolSize但少于maximumPoolSize,則將任務(wù)放入隊(duì)列
  3. 如果隊(duì)列已滿,并且線程數(shù)小于maximumPoolSize,則創(chuàng)建一個(gè)新線程來運(yùn)行任務(wù)
  4. 如果隊(duì)列已滿,并且線程數(shù)大于或等于maximumPoolSize,則拒絕該任務(wù)
線程池添加線程規(guī)則

增減線程的特點(diǎn)

  1. 通過設(shè)置corePoolSize和maximumPoolSize相同,就可以創(chuàng)建固定大小的線程池;
  2. 線程池希望保持較少的線程數(shù),并且只有在負(fù)載變得很大時(shí)才增加它;
  3. 通過設(shè)置maximumPoolSize為很高的值(例如Integer.MAX_VALUE),可以允許線程池容納任意數(shù)量的并發(fā)任務(wù);
  4. 是只有在隊(duì)列填滿時(shí)才創(chuàng)建多于corePoolSize的線程,所以如果你使用的是無界隊(duì)列(例如LinkedBlockingQueue),那么線程數(shù)就不會(huì)超過corePoolSize。

線程池應(yīng)該手動(dòng)創(chuàng)建還是自動(dòng)創(chuàng)建

newFixedThreadPool:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
  • 通過源碼可以看出,newFixedThreadPool 使用的是 LinkedBlockingQueue,由于 LinkedBlockingQueue 是沒有容量上限的,所以當(dāng)請(qǐng)求數(shù)越來越多,并且無法及時(shí)處理完畢的時(shí)候,也就是請(qǐng)求堆積的時(shí)候,會(huì)容易造成占用大量的內(nèi)存,可能會(huì)導(dǎo)致OOM

newSingleThreadExecutor:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  • 通過源碼可以看出,這里和上面的 newFixedThreadPool 的原理基本一樣,只不過是把線程數(shù)直接設(shè)置成了1,所以這也會(huì)導(dǎo)致同樣的問題,也就是當(dāng)請(qǐng)求堆積的時(shí)候,可能會(huì)占用大量的內(nèi)存。

newCachedThreadPool:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • 可緩存線程池
  • 特點(diǎn):無界限線程池,具有自動(dòng)回收多余線程的功能(默認(rèn)時(shí)間是60秒)
  • 弊端:在于第二個(gè)參數(shù) maximumPoolSize 被設(shè)置為了Integer.MAX_VALUE,這可能會(huì)創(chuàng)建數(shù)量非常多的線程,甚至導(dǎo)致OOM。

newScheduledThreadPool:

  • 支持定時(shí)及周期性任務(wù)執(zhí)行的線程池
正確的創(chuàng)建線程池的方法

根據(jù)不同的業(yè)務(wù)場景,選擇合適的方式,最后是我們自己手動(dòng)創(chuàng)建線程池,自己設(shè)置線程池參數(shù)。

線程池里的線程數(shù)量設(shè)定為多少比較合適

  1. CPU密集型(加密、計(jì)算hash等):最佳線程數(shù)為CPU核心數(shù)的 1-2 倍左右
  2. 耗時(shí)IO型(讀寫數(shù)據(jù)庫、文件、網(wǎng)絡(luò)讀寫等):最佳線程數(shù)一般會(huì)大于CPU核心數(shù)很多倍,以JVM線程監(jiān)控顯示繁忙情況為依據(jù),保證線程空閑可以銜接上,參考 Brain Goetz 推薦的計(jì)算方法:
    線程數(shù)=CPU核心數(shù)x(1+平均等待時(shí)間/平均工作時(shí)間)

停止線程池的正確方法

  1. shutdown
    shutdown 并不是立即粗暴的結(jié)束線程,線程池仍然會(huì)繼續(xù)執(zhí)行已創(chuàng)建的任務(wù),但是不會(huì)接收新的任務(wù)了。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ShutDownDemo {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        executorService.shutdown();
        // 1500毫秒后此處會(huì)拋出異常 RejectedExecutionException
        executorService.execute(new ShutDownTask());
    }
    
}

class ShutDownTask implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上述代碼在1.5秒后執(zhí)行 executorService.shutdown(); 之后,再執(zhí)行
executorService.execute(new ShutDownTask()); 則會(huì)報(bào)錯(cuò)拋出異常 RejectedExecutionException

  1. isShutdown
    返回true或false告訴我們線程是否已經(jīng)停止
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        System.out.println(executorService.isShutdown());  // 打印false
        executorService.shutdown();
        System.out.println(executorService.isShutdown());  // 打印true

        System.out.println(executorService.isTerminated());

    }
  1. isTerminated
    返回true或false告訴我們線程是否已經(jīng)完全停止
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        executorService.shutdown();
        // 打印false,因?yàn)榫€程還沒有完全執(zhí)行完
        System.out.println(executorService.isTerminated());
    }
  1. awaitTermination
    awaitTermination 有三種情況會(huì)返回,沒返回之前都是阻塞
    第一種情況:所有任務(wù)都執(zhí)行完畢了
    第二種情況:等待的時(shí)間到了
    第三種情況:等待的過程中被打斷了,會(huì)拋出 InterruptedException
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        boolean b = executorService.awaitTermination(3L, TimeUnit.SECONDS);
        // 會(huì)打印false,因?yàn)?秒鐘不夠線程全部執(zhí)行完
        System.out.println(b);
    }
  1. shutdownNow
    正在執(zhí)行任務(wù)的線程繼續(xù)執(zhí)行,等待隊(duì)列中的線程直接結(jié)束并返回
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ShutDownDemo {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
            executorService.execute(new ShutDownTask());
        }
        // 等待1.5秒
        Thread.sleep(1500);
        // 這里會(huì)返回已經(jīng)放到線程池隊(duì)列中還沒有執(zhí)行的Runnable
        List<Runnable> runnables = executorService.shutdownNow();
    }

}

class ShutDownTask implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(500);
            System.out.println(Thread.currentThread().getName());
        } catch (InterruptedException e) {
            // 接收被中斷信號(hào)
            System.out.println(Thread.currentThread().getName() + "被中斷了!");
        }
    }
}

executorService.shutdownNow();會(huì)返回已經(jīng)放到線程池隊(duì)列中還沒有執(zhí)行的Runnable
運(yùn)行結(jié)果:

...
...
...
pool-1-thread-8
pool-1-thread-7
pool-1-thread-6
pool-1-thread-10
pool-1-thread-1
pool-1-thread-3
pool-1-thread-1被中斷了!
pool-1-thread-4
pool-1-thread-2
pool-1-thread-7被中斷了!
pool-1-thread-8被中斷了!
pool-1-thread-10被中斷了!
pool-1-thread-6被中斷了!
pool-1-thread-9被中斷了!
pool-1-thread-5被中斷了!

Process finished with exit code 0

線程池任務(wù)太多,怎么拒絕

  • 拒絕時(shí)機(jī)

  1. 當(dāng)Executor關(guān)閉時(shí),提交新任務(wù)會(huì)被拒絕
  2. 以及當(dāng)Executor對(duì)最大線程和工作隊(duì)列容量使用有限邊界并且已經(jīng)飽和時(shí)
  • 拒絕策略

  1. AbortPolicy,直接拋出一個(gè)異常
  2. DiscardPolicy,悄悄的把任務(wù)丟棄,沒有通知
  3. DiscardOldestPolicy,把隊(duì)列中最老的那個(gè)任務(wù)丟棄,新任務(wù)加進(jìn)來
  4. CallerRunsPolicy,誰提交的任務(wù)誰去執(zhí)行(比如說主線程給線程池提交了一個(gè)任務(wù),但是線程池已經(jīng)飽和無法再執(zhí)行了,這時(shí)則會(huì)讓提交任務(wù)的主線程去執(zhí)行這個(gè)任務(wù))

線程池鉤子函數(shù)

import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 演示每個(gè)任務(wù)執(zhí)行前后放鉤子函數(shù)
 */
public class PauseableThreadPool extends ThreadPoolExecutor {

    private boolean isPaused;
    private final ReentrantLock lock = new ReentrantLock();
    private Condition unpaused = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());

        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("我被執(zhí)行了");
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        for (int i = 0; i < 10000; i++) {
            pauseableThreadPool.execute(runnable);
        }

        Thread.sleep(2000);
        // 等待2秒,執(zhí)行暫停方法
        pauseableThreadPool.pause();
        System.out.println("線程池被暫停了!");
        Thread.sleep(2000);
        // 再等待2秒,執(zhí)行恢復(fù)方法
        pauseableThreadPool.resume();
        System.out.println("線程池被恢復(fù)了!");
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 暫停線程
     */
    private void pause() {
        lock.lock();
        try {
            isPaused = true;
        } finally {
            lock.unlock();
        }
    }

    /**
     * 恢復(fù)線程
     */
    private void resume() {
        lock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
}

運(yùn)行結(jié)果:


鉤子函數(shù)執(zhí)行效果
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容