JDK線程池

線程池

類繼承關(guān)系,方法太多就不列出來了。

thread_pool_uml.jpg

Executors

Executors扮演線程池工廠的角色,ThreadPoolExecutor就代表一個(gè)線程池。Executors提供了各種類型的線程池,主要有以下這些方法:

public static ExecutorService newFixedThreadPool(int nThreads);
public static ExecutorService newSingleThreadExecutor();
public static ExecutorService newCachedThreadPool();
public static ScheduledExecutorService newSingleThreadScheduledExecutor();
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);

newFixedThreadPool(int nThreads)

返回固定線程數(shù)量的線程池。該線程池中的線程數(shù)量始終不變,當(dāng)有新任務(wù)提交時(shí),若有空閑線程,則立即執(zhí)行,否則存進(jìn)任務(wù)隊(duì)列,待有線程空閑時(shí)再執(zhí)行。

newSingleThreadExecutor()

返回一個(gè)只有一個(gè)線程的線程池,若多余一個(gè)任務(wù)被提交到該線程池,保存到任務(wù)隊(duì)列,按FIFO的順序執(zhí)行。

newCachedThreadPool()

返回一個(gè)可根據(jù)實(shí)際情況調(diào)整線程數(shù)量的線程池,線程池線程數(shù)量不確定,如果新任務(wù)沒有空閑線程,則會(huì)創(chuàng)建新的線程處理任務(wù),所有線程執(zhí)行完之后,會(huì)丟回線程池復(fù)用。

newSingleThreadScheduledExecutor()

返回一個(gè)ScheduledExecutorService對(duì)象,線程池大小為1。具有scheduled調(diào)度功能,如在入定的延遲或者周期性的執(zhí)行某個(gè)任務(wù)。

newScheduledThreadPool(int corePoolSize)

返回一個(gè)ScheduledExecutorService對(duì)象,線程數(shù)量可以指定。

ScheduledExecutorService

ScheduledExecutorService的scheduleAtFiexRate()和scheduleWithFixedDelay()都可以周期性的調(diào)度任務(wù),如果任務(wù)遇到異常,會(huì)停止調(diào)度,他們的調(diào)度區(qū)別如下:

scheduled_rate_delay.jpg
對(duì)于fixRate而言,如果任務(wù)執(zhí)行的時(shí)間大于等待的周期,那么會(huì)在任務(wù)執(zhí)行完之后馬上再次調(diào)度,不會(huì)出現(xiàn)任務(wù)重疊的情況!
public static void main(String[] args) {
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(10);
        Runnable runnable = () -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(System.currentTimeMillis() / 1000);
        };
        //兩秒輸出一次,調(diào)度周期為2秒
        //exec.scheduleAtFixedRate(runnable,0,2, TimeUnit.SECONDS);
        //三秒輸出一次,調(diào)度周期為2秒,線程執(zhí)行1秒
        exec.scheduleWithFixedDelay(runnable,0,2, TimeUnit.SECONDS);
    }

ThreadPoolExecutor

Executors工廠對(duì)與ThreadPoolExecutor的創(chuàng)建都是基于其構(gòu)造方法,來看一下核心的構(gòu)造方法。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
corePoolSize: 指定線程池中的線程數(shù)量;
maxinumPoolSize: 指定線程池中的最大線程數(shù)量;
keepAliveTime: 當(dāng)線程池線程數(shù)量超過corePoolSize時(shí),多余的線程的存活時(shí)間;
unit: keepaliveTime的時(shí)間單位;
workQueue: 任務(wù)隊(duì)列,被提交單尚未被執(zhí)行的任務(wù);
threadFactory: 線程工廠,用于創(chuàng)建線程,一般用默認(rèn)的就行;
handler: 拒絕策略,當(dāng)任務(wù)太多來不及處理時(shí),如何拒絕任務(wù);

workqueue

workQueue是一個(gè)BlockingQueue接口的對(duì)象,在ThreadPoolExecutor的構(gòu)造函數(shù)中可使用一下幾種:

直接提交的隊(duì)列: 該功能由SynchronousQueue對(duì)象提供。SynchronousQueue是一種特殊的BlockingQueue,沒有容量,每一次插入操作都要等待一個(gè)相應(yīng)的刪除操作,同理每一次刪除操作都要等待相應(yīng)插入操作。如果使用SynchronousQueue,提交的任務(wù)不會(huì)被真實(shí)的保存,而總是將新任務(wù)提交給線程執(zhí)行。如果沒有空閑的線程,則嘗試創(chuàng)建,如果線程數(shù)量已達(dá)最大值,則執(zhí)行拒絕策略,因此使用SynchronousQueue,通常要設(shè)置很大的maxinumPoolsize值,否則很容易執(zhí)行拒絕策略。

有界任務(wù)隊(duì)列(FIFO): 游街任務(wù)隊(duì)列可以用ArrayBlockQueue實(shí)現(xiàn),看到array很容易知道,這是個(gè)定值的,因此我們使用這個(gè)隊(duì)列的時(shí)候需要指定最大容量,有了最大容量,才有界。有新的任務(wù)執(zhí)行時(shí),如果線程池的實(shí)際線程數(shù)小于corePoolSize,則會(huì)優(yōu)先創(chuàng)建新的線程,若大于corePoolSize,則添加到等待隊(duì)列。如果隊(duì)列已滿,則在總線程數(shù)不大于maxinumPoolSize的前提下創(chuàng)建新的線程執(zhí)行任務(wù)。若大于maxinumPoolSize,則執(zhí)行拒絕策略。可見有界隊(duì)列僅在任務(wù)隊(duì)列滿時(shí),才可能將線程數(shù)提升到corePoolSize以上,除非系統(tǒng)非常繁忙,否則可以確核心線程數(shù)維持在corPoolSize

無界任務(wù)隊(duì)列(FIFO): 無界任務(wù)隊(duì)列可以通過LinckedBlockingQueue類實(shí)現(xiàn)。與有界隊(duì)列相比,除非系統(tǒng)資源耗盡,否則無界的隊(duì)列村存在任務(wù)如對(duì)失敗的情況。

優(yōu)先任務(wù)隊(duì)列: 優(yōu)先任務(wù)隊(duì)列是帶有優(yōu)先級(jí)的隊(duì)列,它通過PriorityBlockingQueue實(shí)現(xiàn),可以指定任務(wù)執(zhí)行的先后順序。

看看ThreadPoolExecutor線程池的核心調(diào)度代碼:

public void execute(Runnable command) {
    if (command == null)
            throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

可以直接從代碼和注釋中看出執(zhí)行邏輯。

workerCountOf獲取當(dāng)前線程池的線程總數(shù)。當(dāng)總數(shù)小于corePoolsize時(shí),會(huì)將任務(wù)通過addWorker直接將任務(wù)提交給線程池的方式直接調(diào)度執(zhí)行。否則通過workerQueue.offer進(jìn)入等待隊(duì)列,如果進(jìn)入等待隊(duì)列失敗,則再次嘗試(可能在之前的檢查中又有線程退出了,存在空閑線程)將任務(wù)直接提交給線程池,如果當(dāng)前線程數(shù)已經(jīng)達(dá)到maxinumPoolSize,則執(zhí)行拒絕策略。

thread_pool_exec.jpg

拒絕策略

當(dāng)任務(wù)數(shù)量超過系統(tǒng)實(shí)際承載能力時(shí),我們需要根據(jù)某種策略來拒絕新來的任務(wù)。JDK提供了四種拒絕策略:

AbortPolicy: 直接拋出異常,阻止系統(tǒng)正常工作;

CallerRunsPolicy: 只要線程池未關(guān)閉,該策略直接在調(diào)用者線程中,運(yùn)行當(dāng)前被丟棄的任務(wù)。顯然這樣不會(huì)真的丟棄任務(wù),但是任務(wù)提交線程的性能極有可能會(huì)急劇下降;

DiscardOldestPolicy: 丟棄最老的一個(gè)請(qǐng)求,也就是即將被執(zhí)行的一個(gè)任務(wù)(FIFO),并嘗試再次提交當(dāng)前任務(wù)。

DiscardPolicy: 默認(rèn)丟棄無法處理的任務(wù),不予任何處理。如果允許任務(wù)丟失,這可能是很好的策略。

public static void main(String[] args) throws InterruptedException {
        Runnable task = () -> {
            try {
                Thread.sleep(1000);
                System.out.println("thread-" + Thread.currentThread().getId() + ":" + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        RejectedExecutionHandler reject = (r, executor) -> System.out.println(r.toString() + ": discard");
        ExecutorService exec = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), Executors.defaultThreadFactory(), reject);
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            exec.execute(task);
            Thread.sleep(10);
        }
    }
從上面的demo中,程序很快就會(huì)執(zhí)行拒絕策略,可以自行測(cè)試。自定義的拒絕策略,什么事都沒有做,只是輸出了一段xx:discard的信息。

ThreadFactory

該接口只有一個(gè)方法,用來創(chuàng)建線程。當(dāng)線程池需要新建線程的時(shí)候,就會(huì)調(diào)用這個(gè)方法。

Thread newThread(Runnable r);

自定義線程可以幫助我們實(shí)現(xiàn)很多功能,例如可以跟中線程池在核實(shí)創(chuàng)建了多少線程,也可以自定義線程的名稱,組,以及優(yōu)先級(jí)等信息。

public static void main(String[] args) throws InterruptedException {
    Runnable task = () -> {
        try {
            Thread.sleep(1000);
            System.out.println("thread-" + Thread.currentThread().getId() + ":" + System.currentTimeMillis());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };
    ThreadFactory fac = (runnable) -> {
        Thread t = new Thread(runnable);
        //打印出創(chuàng)建線程信息,設(shè)置成守護(hù)線程,當(dāng)主線程退出時(shí)強(qiáng)制銷毀
        System.out.println("create Thread...");
        t.setDaemon(true);
        return t;
    };
    ExecutorService exec = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), fac);
    for (int i = 0; i < 10; i++) {
        exec.execute(task);
    }
    Thread.sleep(2000);
}

線程池ThreadPoolExecutor的擴(kuò)展

ThreadPoolExecutor是一個(gè)可以擴(kuò)展的線程池。它提供了beforeExecute()、afterExecute()和terminated()三個(gè)接口對(duì)線程池進(jìn)行控制。這三個(gè)方法的調(diào)用在ThreadPoolExecutor.Worker.runWorker()方法中。

final void runWorker(Worker w) {
    ...
    try {
        while (task != null || (task = getTask()) != null) {
            ...
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                }
                ...
                finally {
                    afterExecute(task, thrown);
                }
            } 
            ...
    } finally {
        //對(duì)terminated()的調(diào)用是從這里進(jìn)入的
        processWorkerExit(w, completedAbruptly);
    }
}

線程池的擴(kuò)展demo

public static class MyTask implements Runnable {

    String name;

    public MyTask(String name) {
        this.name = name;
    }

    @Override
    public void run() {
        System.out.println("正在執(zhí)行:ThreadID = " + Thread.currentThread().getId() + ": Task name = " + name);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public static void main(String[] args) throws InterruptedException {
    ExecutorService exec = new ThreadPoolExecutor(5, 5, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5)) {
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            System.out.println("準(zhǔn)備執(zhí)行:ThreadID = " + Thread.currentThread().getId() + ": Task name = " + ((MyTask) r).name);
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            System.out.println("執(zhí)行完成:ThreadID = " + Thread.currentThread().getId() + ": Task name = " + ((MyTask) r).name);
        }

        @Override
        protected void terminated() {
            System.out.println("線程池退出");
        }
    };

    for (int i = 0; i < 5; i++) {
        exec.execute(new MyTask("MyTask-" + i));
        Thread.sleep(10);
    }
    exec.shutdown();
}

任務(wù)提交完成后,調(diào)用shutdown關(guān)閉線程池。這是個(gè)比較安全的方法,如果當(dāng)前有線程正在執(zhí)行,該方法并不會(huì)暴力的終止所有任務(wù),它會(huì)等待所有任務(wù)執(zhí)行完成后再關(guān)閉線程池,但它并不會(huì)等待所有線程執(zhí)行完成后再返回。因此可以簡單地理解成只是發(fā)送了一個(gè)關(guān)閉信號(hào)而已。再執(zhí)行shutdown方法后,這個(gè)線程池就不能在接受其他新的任務(wù)了。

ForkJoinPool

采用“分而治之”的思想,將大的任務(wù)分解成小的任務(wù),分階段處理,再將這些結(jié)果進(jìn)行合成。在JDK中,給出了一個(gè)ForkJoinPool線程池,對(duì)于fork()方法并不急著開啟線程,而是提交給ForkJoinPool線程池進(jìn)行處理。絕大多數(shù)情況下一個(gè)線程是需要處理多個(gè)邏輯任務(wù)的,因此每個(gè)線程必然擁有一個(gè)任務(wù)隊(duì)列。當(dāng)線程A已經(jīng)把自己的任務(wù)執(zhí)行完成之后,而線程B還有一大堆任務(wù)等著處理,此時(shí)線程A就會(huì)幫助線程B,從線程B的任務(wù)隊(duì)列中拿任務(wù)過來處理。當(dāng)線程試圖幫助別人時(shí),總是從任務(wù)隊(duì)列的底部開始拿數(shù)據(jù),而線程執(zhí)行自己的任務(wù)是則是從頂部開始拿,這種行為有利于避免數(shù)據(jù)競(jìng)爭。使用fork()后系統(tǒng)多了一個(gè)執(zhí)行分支(線程),所以需要等到這個(gè)分支執(zhí)行完畢,才有可能得到最終結(jié)果,因此join()就表示等待。這個(gè)join()和普通線程的join()不一樣哦

fork_join.jpg
fork_join_1.jpg
// ForkJoin求和demo
public class ForkJoinCountTask extends RecursiveTask<Long> {

    private static final int THRESHOLD = 10000;
    private long start;
    private long end;

    public ForkJoinCountTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    protected Long compute() {
        long sum = 0;
        // 如果計(jì)算的個(gè)數(shù)沒有超過閾值,直接開始計(jì)算
        if ((end - start) <= THRESHOLD) {
            for (long i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 超過了閾值,則分成100個(gè)小任務(wù)
            long forkCount = (end - start) / THRESHOLD;
            if (THRESHOLD * forkCount < end) {
                forkCount++;
            }
            ArrayList<ForkJoinCountTask> tasks = new ArrayList<ForkJoinCountTask>();
            long pos = start;
            for (int i = 0; i < forkCount; i++) {
                long lastOne = pos + THRESHOLD;
                lastOne = lastOne > end ? end : lastOne;
                ForkJoinCountTask task = new ForkJoinCountTask(pos, lastOne);
                pos += THRESHOLD + 1;
                tasks.add(task);
                task.fork();
            }
            // 對(duì)小任務(wù)的結(jié)果進(jìn)行求和
            for (ForkJoinCountTask t : tasks) {
                sum += t.join();
            }
        }
        // 返回部分結(jié)果
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        ForkJoinCountTask t = new ForkJoinCountTask(0, 20000);
        ForkJoinTask<Long> fjt = pool.submit(t);
        try {
            long res = fjt.get();
            System.out.println(res);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

over ...

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容