Java線程池 - ThreadPoolExecutor示例

為什么需要Java中的線程池?答案通常是,當(dāng)您在Java中開發(fā)一個(gè)簡(jiǎn)單的并發(fā)應(yīng)用程序時(shí),您創(chuàng)建一些Runnable的對(duì)象,然后創(chuàng)建相應(yīng)的線程對(duì)象來(lái)執(zhí)行它們。在Java中創(chuàng)建線程是一項(xiàng)昂貴的操作。如果每次執(zhí)行任務(wù)時(shí)都開始創(chuàng)建新的線程實(shí)例,那么應(yīng)用程序的性能肯定會(huì)下降。

1. 線程池在java中是如何工作的

線程池是預(yù)初始化線程的集合。通常集合的大小是固定的,但不是強(qiáng)制的。它有助于使用相同線程執(zhí)行N個(gè)任務(wù)。如果線程任務(wù)比線程多,那么任務(wù)需要在類似隊(duì)列的結(jié)構(gòu)中等待(FIFO -先進(jìn)先出)。

當(dāng)任何線程完成它的執(zhí)行時(shí),它可以從隊(duì)列中獲取一個(gè)新任務(wù)并執(zhí)行它。當(dāng)所有任務(wù)完成時(shí),線程保持活動(dòng)狀態(tài)并等待線程池中的更多任務(wù)。

Thread Pool

監(jiān)視程序一直監(jiān)視隊(duì)列(通常是阻塞隊(duì)列BlockingQueue)以執(zhí)行任何新任務(wù)。一旦任務(wù)出現(xiàn),線程就會(huì)再次開始接收并執(zhí)行任務(wù)。

2. ThreadPoolExecutor

自Java 5以來(lái),Java并發(fā)API提供了一個(gè)機(jī)制執(zhí)行器框架。類似的接口為Executor接口、其子接口ExecutorService和實(shí)現(xiàn)這兩個(gè)接口的ThreadPoolExecutor類。

ThreadPoolExecutor分離任務(wù)的創(chuàng)建和執(zhí)行。使用ThreadPoolExecutor,您只需要實(shí)現(xiàn)可運(yùn)行對(duì)象并將它們發(fā)送到執(zhí)行程序。它負(fù)責(zé)它們的執(zhí)行、實(shí)例化和使用必要的線程運(yùn)行。

不僅如此,使用線程池還提高了性能。當(dāng)您將任務(wù)發(fā)送給執(zhí)行程序時(shí),它會(huì)嘗試使用一個(gè)線程池的線程來(lái)執(zhí)行此任務(wù),以避免線程的連續(xù)生成。

3.如何創(chuàng)建ThreadPoolExecutor

我們可以使用java.util.concurrent.Executors接口中的預(yù)先構(gòu)建的方法創(chuàng)建以下5種類型的線程池執(zhí)行器。

  1. 固定線程池執(zhí)行程序——?jiǎng)?chuàng)建一個(gè)線程池,該線程池重用固定數(shù)量的線程來(lái)執(zhí)行任意數(shù)量的任務(wù)。如果在所有線程都處于活動(dòng)狀態(tài)時(shí)提交了其他任務(wù),它們將在隊(duì)列中等待,直到線程可用。它最適合在現(xiàn)實(shí)生活中使用。
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
  2. 緩存的線程池執(zhí)行程序——?jiǎng)?chuàng)建一個(gè)線程池,該線程池根據(jù)需要?jiǎng)?chuàng)建新線程,但在可用時(shí)將重用以前構(gòu)造的線程。如果任務(wù)長(zhǎng)時(shí)間運(yùn)行,則不要使用此線程池。如果線程的數(shù)量超出了系統(tǒng)所能處理的范圍,則會(huì)導(dǎo)致系統(tǒng)崩潰。
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
  3. 調(diào)度線程池執(zhí)行程序——?jiǎng)?chuàng)建一個(gè)線程池,該線程池可以調(diào)度命令在給定延遲后運(yùn)行或定期執(zhí)行。
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newScheduledThreadPool(10);
  4. 單線程池執(zhí)行程序-創(chuàng)建單線程來(lái)執(zhí)行所有任務(wù)。當(dāng)您只有一個(gè)任務(wù)要執(zhí)行時(shí)就可以使用它。
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newSingleThreadExecutor();
  5. 工作竊取線程池執(zhí)行程序-創(chuàng)建一個(gè)線程池,該線程池維護(hù)足夠的線程來(lái)支持給定的并行級(jí)別。這里的并行級(jí)別是指在多處理器機(jī)器上,在單點(diǎn)時(shí)間內(nèi)執(zhí)行給定任務(wù)所使用的線程的最大數(shù)量。
    ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newWorkStealingPool(4);

4. ThreadPoolExecutor示例

4.1. 創(chuàng)建任務(wù)

讓我們創(chuàng)建一個(gè)任務(wù),每次都需要隨機(jī)時(shí)間來(lái)完成它。

public class Task implements Runnable {
    private String name;

    public Task(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void run() {
        try {
            Long duration = (long) (Math.random() * 10);
            System.out.println("Executing : " + name);
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

4.2. 使用線程池執(zhí)行器執(zhí)行任務(wù)

給定的程序創(chuàng)建5個(gè)任務(wù)并提交到executor隊(duì)列。Executor使用兩個(gè)線程執(zhí)行所有任務(wù)。

public class ThreadPoolExample {
    public static void main(String[] args) {
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);

        for (int i = 1; i <= 5; i++) {
            Task task = new Task("Task " + i);
            System.out.println("Created : " + task.getName());

            executor.execute(task);
        }
        executor.shutdown();
    }
}

Program output:

Created : Task 1
Created : Task 2
Created : Task 3
Created : Task 4
Created : Task 5
Executing : Task 1
Executing : Task 2
Executing : Task 3
Executing : Task 4
Executing : Task 5

5. ScheduledThreadPoolExecutor

當(dāng)您必須只執(zhí)行一次惟一任務(wù)時(shí),固定線程池或緩存線程池是很好的選擇。當(dāng)您需要執(zhí)行一個(gè)任務(wù),重復(fù)N次,或者N次固定次數(shù),或者在固定延遲之后無(wú)限次,您應(yīng)該使用ScheduledThreadPoolExecutor。

ScheduledThreadPoolExecutor提供了4種方法,它們提供不同的功能以重復(fù)的方式執(zhí)行任務(wù)。

  1. ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit)——?jiǎng)?chuàng)建并執(zhí)行在給定延遲之后啟用的任務(wù)。
  2. ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit)——?jiǎng)?chuàng)建并執(zhí)行在給定延遲之后啟用的ScheduledFuture。
  3. ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long delay, TimeUnit unit)——?jiǎng)?chuàng)建并執(zhí)行一個(gè)周期性動(dòng)作,該動(dòng)作在給定的初始延遲之后首先啟用,然后在給定的延遲期間啟用。如果此任務(wù)的任何執(zhí)行花費(fèi)的時(shí)間超過(guò)其周期,則后續(xù)執(zhí)行可能會(huì)延遲開始,但不會(huì)并發(fā)執(zhí)行。
  4. ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)——?jiǎng)?chuàng)建并執(zhí)行一個(gè)周期性動(dòng)作,該動(dòng)作在給定的初始延遲之后首先啟用,然后在給定的延遲期間啟用。無(wú)論一個(gè)長(zhǎng)時(shí)間運(yùn)行的任務(wù)花費(fèi)多少時(shí)間,兩次執(zhí)行之間都會(huì)有一個(gè)固定的延遲時(shí)間間隔。

5.1. ScheduledThreadPoolExecutor示例

public class ScheduledThreadPoolExecutorExample {
    public static void main(String[] args) {
        ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2);

        Task task = new Task("Repeat Task");
        System.out.println("Created : " + task.getName());

        executor.scheduleWithFixedDelay(task, 2, 2, TimeUnit.SECONDS);
    }
}
public class Task implements Runnable {
    private String name;

    public Task(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

    public void run() {
        try {
            Long duration = (long) (Math.random() * 10);
            System.out.println("Executing : " + name + ", Current Seconds : " + new Date());
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

輸出結(jié)果為:

Created : Repeat Task
Executing : Repeat Task, Current Seconds : Mon Mar 04 16:07:05 CST 2019
Executing : Repeat Task, Current Seconds : Mon Mar 04 16:07:13 CST 2019
Executing : Repeat Task, Current Seconds : Mon Mar 04 16:07:17 CST 2019
Executing : Repeat Task, Current Seconds : Mon Mar 04 16:07:22 CST 2019
Executing : Repeat Task, Current Seconds : Mon Mar 04 16:07:27 CST 2019

6. java中的自定義線程池實(shí)現(xiàn)

盡管Java通過(guò)Executor框架具有非常健壯的線程池功能。如果沒(méi)有executor,就不應(yīng)該創(chuàng)建自己的自定義線程池。我強(qiáng)烈反對(duì)這種企圖。然而,如果您想創(chuàng)建它來(lái)學(xué)習(xí),下面給出的是Java中的線程池實(shí)現(xiàn)。

@SuppressWarnings("unused")
public class CustomThreadPool {
    // 線程池大小
    private final int poolSize;

    // 線程池內(nèi)部是一個(gè)數(shù)組
    private final WorkerThread[] workers;

    // 先進(jìn)先出隊(duì)列
    private final LinkedBlockingQueue<Runnable> queue;

    public CustomThreadPool(int poolSize) {
        this.poolSize = poolSize;
        queue = new LinkedBlockingQueue<Runnable>();
        workers = new WorkerThread[poolSize];

        for (int i = 0; i < poolSize; i++) {
            workers[i] = new WorkerThread();
            workers[i].start();
        }
    }

    public void execute(Runnable task) {
        synchronized (queue) {
            queue.add(task);
            queue.notify();
        }
    }

    private class WorkerThread extends Thread {
        public void run() {
            Runnable task;

            while (true) {
                synchronized (queue) {
                    while (queue.isEmpty()) {
                        try {
                            queue.wait();
                        } catch (InterruptedException e) {
                            System.out.println("An error occurred while queue is waiting: " + e.getMessage());
                        }
                    }
                    task = queue.poll();
                }

                try {
                    task.run();
                } catch (RuntimeException e) {
                    System.out.println("Thread pool is interrupted due to an issue: " + e.getMessage());
                }
            }
        }
    }

    public void shutdown() {
        System.out.println("Shutting down thread pool");
        for (int i = 0; i < poolSize; i++) {
            workers[i] = null;
        }
    }
}

我們使用ThreadPoolExecutor執(zhí)行同樣的任務(wù)

public class CustomThreadPoolExample {
    public static void main(String[] args) {
        CustomThreadPool customThreadPool = new CustomThreadPool(2);

        for (int i = 1; i <= 5; i++) {
            Task task = new Task("Task " + i);
            System.out.println("Created : " + task.getName());

            customThreadPool.execute(task);
        }
    }
}

程序輸出為:

Created : Task 1
Created : Task 2
Created : Task 3
Created : Task 4
Created : Task 5
Executing : Task 1, Current Seconds : Mon Mar 04 16:24:53 CST 2019
Executing : Task 2, Current Seconds : Mon Mar 04 16:24:53 CST 2019
Executing : Task 3, Current Seconds : Mon Mar 04 16:24:58 CST 2019
Executing : Task 4, Current Seconds : Mon Mar 04 16:25:00 CST 2019
Executing : Task 5, Current Seconds : Mon Mar 04 16:25:01 CST 2019

上面是非常原始的線程池實(shí)現(xiàn),有很多改進(jìn)。但是,與其完善上面的代碼,不如專注于學(xué)習(xí)Java executor框架。

還要注意,不正確的池或隊(duì)列處理也會(huì)導(dǎo)致死鎖或資源抖動(dòng)。Java社區(qū)對(duì)Executor框架進(jìn)行了良好的測(cè)試,您當(dāng)然可以避免這些問(wèn)題。

7. 總結(jié)

  1. ThreadPoolExecutor類有四個(gè)不同的構(gòu)造函數(shù),但是由于它們的復(fù)雜性,Java并發(fā)API提供了Executors類來(lái)構(gòu)造executor和其他相關(guān)對(duì)象。盡管我們可以直接使用其構(gòu)造函數(shù)之一創(chuàng)建ThreadPoolExecutor,但建議使用Executors類。
  2. 我們?cè)谏厦鎰?chuàng)建的緩存線程池,如果需要執(zhí)行新任務(wù),它會(huì)創(chuàng)建新線程;如果已經(jīng)完成了正在運(yùn)行的任務(wù)(這些任務(wù)現(xiàn)在是可用的)的執(zhí)行,它會(huì)重用現(xiàn)有的線程。但是,緩存的線程池有一個(gè)缺點(diǎn),即新任務(wù)的線程總是處于不穩(wěn)定狀態(tài),因此,如果您向這個(gè)執(zhí)行程序發(fā)送太多的任務(wù),您可能會(huì)使系統(tǒng)過(guò)載。這可以通過(guò)使用固定線程池來(lái)解決,我們將在下一教程中學(xué)習(xí)這個(gè)問(wèn)題。
  3. ThreadPoolExecutor類和一般的執(zhí)行器的一個(gè)關(guān)鍵方面是,必須顯式地結(jié)束它。如果不這樣做,執(zhí)行程序?qū)⒗^續(xù)執(zhí)行,程序?qū)⒉粫?huì)結(jié)束。如果執(zhí)行程序沒(méi)有要執(zhí)行的任務(wù),它將繼續(xù)等待新任務(wù),并且不會(huì)結(jié)束執(zhí)行。Java應(yīng)用程序在其所有非守護(hù)進(jìn)程線程完成執(zhí)行之前不會(huì)結(jié)束,因此,如果不終止執(zhí)行程序,應(yīng)用程序?qū)⒂肋h(yuǎn)不會(huì)結(jié)束。
  4. 要向執(zhí)行程序表明想要完成它,可以使用ThreadPoolExecutor類的shutdown()方法。當(dāng)執(zhí)行程序完成所有掛起任務(wù)的執(zhí)行時(shí),它就完成了執(zhí)行。在您調(diào)用shutdown()方法之后,如果您嘗試將另一個(gè)任務(wù)發(fā)送給執(zhí)行程序,它將被拒絕,執(zhí)行程序?qū)伋?code>RejectedExecutionException異常。
  5. ThreadPoolExecutor類提供了許多方法來(lái)獲取關(guān)于其狀態(tài)的信息。在示例中,我們使用getPoolSize()、getActiveCount()getCompletedTaskCount()方法來(lái)獲取關(guān)于池大小、線程數(shù)和執(zhí)行程序完成的任務(wù)數(shù)的信息。您還可以使用getLargestPoolSize()方法,該方法每次返回池中已經(jīng)存在的最大線程數(shù)。
  6. ThreadPoolExecutor類還提供了與執(zhí)行器的終結(jié)相關(guān)的其他方法。這些方法有:
  • shutdownNow():該方法立即關(guān)閉執(zhí)行程序。它不執(zhí)行掛起的任務(wù)。它返回一個(gè)包含所有這些掛起任務(wù)的列表。調(diào)用此方法時(shí)正在運(yùn)行的任務(wù)將繼續(xù)執(zhí)行,但該方法不會(huì)等待它們的結(jié)束。
  • isTerminated():如果您調(diào)用了shutdown()shutdownNow()方法,并且執(zhí)行程序完成了關(guān)閉該方法的過(guò)程,則該方法返回true。
  • isShutdown():如果調(diào)用了executor的shutdown()方法,則該方法返回true。
  • awaitTermination(long timeout,TimeUnitunit):該方法阻塞調(diào)用線程,直到執(zhí)行程序的任務(wù)結(jié)束或超時(shí)發(fā)生。TimeUnit類是一個(gè)枚舉,它具有以下常量:DAYSHOURS,MICROSECONDS等。

參考資料

Java Thread Pool – ThreadPoolExecutor Example

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容