【Java并發(fā)編程】—–“J.U.C”:Executor體系結(jié)構(gòu)

前言

在以前使用線程池的時候,都是簡單的調(diào)用API。對于線程池體系結(jié)構(gòu)原理都沒有深究,直到最近在看看Netty的線程池源碼時發(fā)現(xiàn)其都是在JDK線程池的基礎(chǔ)上做了進(jìn)一步封裝,因此個人覺得應(yīng)該好好深入學(xué)習(xí)一下JDK本身的線程池,一來可以看看Doug Lea大神的設(shè)計,二來對于后續(xù)深入分析Netty的線程模型也有比較大的幫助。
本文所包含的內(nèi)容主要由以上兩方面(如有分析不對,請拍磚):

  • 使用線程池的理由
  • JDK線程池的體系結(jié)構(gòu)

1.為什么使用線程池

子曰:“利用有限的資源處理無限的任務(wù)”。

眾所周知,創(chuàng)建線程的開銷相比于創(chuàng)建進(jìn)程要小很多。然而當(dāng)我們創(chuàng)建一個線程在執(zhí)行任務(wù)的過程中,不僅要消耗CPU的時間和內(nèi)存資源,同時線程其自身對應(yīng)的VM棧、本地方法棧、程序計數(shù)器也同樣會占據(jù)內(nèi)存空間,因此線程的創(chuàng)建與銷毀依舊還是一件“奢侈”的事情。因此,如果我們?yōu)槊恳粋€執(zhí)行的任務(wù)都單獨(dú)的創(chuàng)建一個線程來執(zhí)行,可能會創(chuàng)建執(zhí)行任務(wù)的時間要遠(yuǎn)遠(yuǎn)小于創(chuàng)建線程所花費(fèi)時間,很顯然這是一種“浪費(fèi)的表現(xiàn)”。其次如果我們沒有對線程的數(shù)據(jù)量加以限制,那么無限制的創(chuàng)建勢必會引發(fā)堆內(nèi)存溢出,從而引發(fā)OOM(OutOfMemory)。

由于上述各種原因,線程池出現(xiàn)了,它可以對創(chuàng)建的線程進(jìn)行復(fù)用,從而可以減少線程和銷毀線程的時間;其次,由于使用了線程池之后,線程的管理將不在由用戶所處理,它可以很好控制線程的數(shù)量,從而預(yù)防OOM的發(fā)生。

2.JDK線程池的體系結(jié)構(gòu)

線程池的繼承體系.png

上圖標(biāo)紅的3個接口,是構(gòu)成JDK線程池體系的核心接口,了解這些核心組成接口無論對于我們在使用以及后面的源碼分析都有很大幫助,因此下面我也將針對這3個接口進(jìn)行詳細(xì)的分析。

2.1 Executor(解耦利器)

在Executor接口中只定義了一個方法:

 /*
    該方法的目的就是用于執(zhí)行提交的給定任務(wù),而任務(wù)可能不是立刻執(zhí)行。 
     底層具體執(zhí)行執(zhí)行的線程可能是新創(chuàng)建的線程、或者線程池中已經(jīng)存在的線程,甚至可以是當(dāng)前調(diào)用線程,這一切都取決于具體的實(shí)現(xiàn)。
     如果Executor無法接受提交的任務(wù),則拋出RejectedExecutionException。
     如果提交的任務(wù)是null,則拋出NullPointerException
 */
    void execute(Runnable command);

定義Executor的目的是為了完成任務(wù)的提交與任務(wù)的執(zhí)行之間的解耦,對用戶屏蔽底層線程的實(shí)現(xiàn)與調(diào)度細(xì)節(jié),這是一種典型命令設(shè)計模式的應(yīng)用。

2.2 ExecutorService(可關(guān)閉的線程池)

ExecutorService在Executor的基礎(chǔ)上,提供了更加豐富的功能。

(1). 線程池的關(guān)閉
在ExecutorService中,分別提供了shutdown()和shutdownNow()方法用于線程池的關(guān)閉。

    /*
      拒絕之后提交的任務(wù)
      但是在線程池的銷毀前,會讓之前提交的任務(wù)繼續(xù)執(zhí)行。
    */
    void shutdown();
    
    
    /*
      拒絕之后提交的任務(wù),并且會停止正在等待執(zhí)行的任務(wù)執(zhí)行
      還將停止當(dāng)前正在執(zhí)行的任務(wù)
      注意:該方法返回的任務(wù)列表是正在等待執(zhí)行的任務(wù)列表
    */    
     List<Runnable> shutdownNow();
     

如果線程池一旦處于Termination的狀態(tài),那么線程池中將不存在任何正在執(zhí)行的任務(wù)、等待執(zhí)行的任務(wù),同時任務(wù)也不能再被提交。

(2). 提交帶返回值的任務(wù)

    /*
      提交帶返回值的任務(wù)(Callable),返回一個Future對象,
      通過Future可以取消正在執(zhí)行的任務(wù)或者等待任務(wù)的結(jié)束
    */
    <T> Future<T> submit(Callable<T> task);
    
    /*
        提交Runnable任務(wù)執(zhí)行,返回Future。
        一旦任務(wù)結(jié)束,Future的get方法會返回null,
        否則將一直處于等待狀態(tài)
    */
    <T> Future<T> submit(Runnable task);
    
    
    /*
      提交Runnable任務(wù)執(zhí)行,并且指定返回值。
      注意:由于Runnable任務(wù)是沒有返回值的,JDK的線程池
      實(shí)現(xiàn)類在底層會將Runnable統(tǒng)一的將通過一個適配器轉(zhuǎn)成
      Callable來進(jìn)行執(zhí)行。因此傳入的參數(shù)result,就是Future
      的get方法所得到的結(jié)果。
    */
    <T> Future<T> submit(Runnable task, T result);

(3). 批量提交任務(wù)

    /*
      批量執(zhí)行一組任務(wù)集合,當(dāng)所有的任務(wù)全部執(zhí)行完畢之后才返回。
      返回的結(jié)果為一個存放Future的List,其中每一個元素都保存
      了其對應(yīng)任務(wù)的執(zhí)行結(jié)果和狀態(tài)。Future的isDone()方法都是返回true
      特別注意:任務(wù)的結(jié)束可能有兩種情況,一是正常執(zhí)行完畢,或者
      由于在執(zhí)行的過程中拋出了異常而結(jié)束。
      如果拋出異常,只有在返回的Future調(diào)用get方法是才會拋出異常
      否則,線程池會將異常隱藏。
      (執(zhí)行下面的TestBulkExecuteTask,默認(rèn)情況下程序會正常退出)
    
    */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
 
    /*
     批量執(zhí)行一組任務(wù),并且顯示等待任務(wù)的結(jié)束。
     當(dāng)所有任務(wù)執(zhí)行完畢或者超時返回。
     Future的isDone()方法都是返回true。
     如果一旦超時返回,那么未執(zhí)行完的任務(wù)都會被取消。
     如果任務(wù)因?yàn)槿∠Y(jié)束,在調(diào)用Future的get方法時會引發(fā)CancellationException。
    */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

    /*
      執(zhí)行給定的任務(wù)集合,返回任務(wù)中成功執(zhí)行完成的一個任務(wù)的返回值。 
      比如有3個任務(wù),第一個任務(wù)由于異常而結(jié)束,此時不會返回,
      而是等待其他兩個任務(wù)中的某一個成功執(zhí)行完成才返回。(參考TestInvokeAny)
      如果所有的任務(wù)都由于異常而結(jié)束,此時會拋出異常。
      無論是成功返回還是由于異常而返回,都不會取消正在執(zhí)行的任務(wù)    
    */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
      
    /*
       執(zhí)行給定的任務(wù)集合,并且指定在給定的時間內(nèi)等待獲取任務(wù)中成功執(zhí)行完成的一個任務(wù)的返回值。
       一旦超時,依然還沒有一個任務(wù)成功結(jié)束,則拋出TimeoutException。
       但是此時不會取消正在執(zhí)行的任務(wù)。  
    
    */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;        

package concurrency.threadpool;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 提交批量任務(wù)到線程池執(zhí)行
 */
public class TestBulkExecuteTask {

    public static void main(String[] args) throws Exception {

        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        List<Future<Integer>> futures = threadPool.invokeAll(Arrays.asList(
                new Task1(), new Task2(),new Task1()));
        
        /*
         * 批量執(zhí)行Callable任務(wù),當(dāng)任務(wù)由于異常而結(jié)束,線程池并不會將異常拋出
         * 只有當(dāng)通過Future獲取任務(wù)的返回結(jié)果時才會真正拋出異常
         */
    //      for (Future<Integer> future : futures) {
    //          System.out.println(future.get());
    //      }
        
        threadPool.shutdown();
    }

    static class Task1 implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName()+" execute!");
            return (int) (Math.random() * 1000);
        }
    }

    static class Task2 implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            //拋出的異常會被線程池隱藏
            throw new RuntimeException("執(zhí)行出錯");
        }
    }
}

通過上面的例子可以看到,在提交Callable任務(wù)時,我們應(yīng)該使用get方法去檢測任務(wù)是否正常執(zhí)行,否則即使任務(wù)由于異常而終止,我們也不得而知。

package concurrency.threadpool;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestInvokeAny {

    public static void main(String[] args) throws Exception {

        ExecutorService threadPool = Executors.newFixedThreadPool(5);
        //如果3個任務(wù)都是拋出異常的,此時線程池會拋出異常
        Integer value = threadPool.invokeAny(
                Arrays.asList(new Task1(), new Task1(),new Task2()));
        System.out.println(value);
        threadPool.shutdown();
    }

    static class Task1 implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName()+" execute!");
            return (int) (Math.random() * 1000);
        }
    }

    static class Task2 implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            // 拋出的異常會被線程池隱藏
            throw new RuntimeException("執(zhí)行出錯");
        }
    }
}

2.2 ScheduledExecutorService(可調(diào)度的線程池)

ScheduledExecutorService是一個可以對任務(wù)進(jìn)行調(diào)度的ExecutorService,它可以對任務(wù)延遲執(zhí)行或者周期性地去執(zhí)行任務(wù)。ScheduledExecutorService中所定義的方法都返回一個ScheduledFuture對象,它可以判斷當(dāng)前任務(wù)是否已經(jīng)執(zhí)行完畢或者取消任務(wù)的執(zhí)行。

    /*
     在給定的延遲時候后執(zhí)行一個不帶返回值的任務(wù),該任務(wù)只會被執(zhí)行一次。
     如果傳入的時間小于等于0(參考下面的說明1,分析了JDK的線程池對傳入的延遲時間的觸發(fā)),那么任務(wù)會立即觸發(fā)執(zhí)行。
     由于傳入的是Runnable接口,因此ScheduleFuture的get方法返回的結(jié)果是null。
    */
     public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
    
    /*
     在給定的延遲時候后執(zhí)行一個帶返回值的任務(wù),該任務(wù)同樣也只會被執(zhí)行一次。     
    ScheduleFuture的get方法返回的結(jié)果是將是Callable執(zhí)行返回的結(jié)果。
    */
     public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
    
     /*
      創(chuàng)建任務(wù)并且以固定頻率周期性地執(zhí)行。
      初始任務(wù)的開始執(zhí)行是在給定的initialDelay之后;
      而后續(xù)第1個任務(wù)的開始于initialDelay+2*period,
      第2個任務(wù)開始于initialDelay+2*period,以此類推...
      如果一個任務(wù)的執(zhí)行時間大于給定的執(zhí)行周期,那么會導(dǎo)致其后面的一個任務(wù)執(zhí)行時間被延后。如果發(fā)生這種情況,后面的任務(wù)會在當(dāng)前任務(wù)執(zhí)行完畢后立即執(zhí)行,JDK的線程池不會這樣延遲而新創(chuàng)建一個線程讓后面的任務(wù)與當(dāng)前任務(wù)并發(fā)執(zhí)行。
      參考(TestScheduleAtFixedRate和TestscheduleAtFixedRate2)
      如果任務(wù)在執(zhí)行中的由于異常而停止,那么后續(xù)的任務(wù)也都不會再執(zhí)行
      (因此對于調(diào)度的任務(wù),我們在編寫程序時必須確保異常能夠得到正確處理,從而避免因此異常而導(dǎo)致整個調(diào)度任務(wù)的終結(jié)),
      否則如果想要停止定時任務(wù)的話,只能通過ScheduledFuture來取消任務(wù)或者關(guān)閉線程池。
     */
     public ScheduledFuture<?> scheduleAtFixedRate( command,long initialDelay,long period,TimeUnit unit);


     /*
      創(chuàng)建任務(wù)并且以固定的延遲時間周期性地執(zhí)行。    
      初始任務(wù)的開始執(zhí)行是在給定的initialDelay之后;
      而后續(xù)的任務(wù)都將在上一個任務(wù)執(zhí)行完畢后,
      再經(jīng)過delay,然后開始執(zhí)行。
      對于該方法,如果在調(diào)度的過程中發(fā)生異常,那么也將導(dǎo)致后續(xù)任務(wù)的終結(jié)。
      (參考TestScheduledWithFixedDelay)
     */
     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);


說明1:JDK底層的實(shí)現(xiàn)會將傳入的時間轉(zhuǎn)換成納秒來進(jìn)行處理,可以查看java.util.concurrent.ScheduledThreadPoolExecutor第489行代碼

    /**
     * 返回一個延遲任務(wù)的觸發(fā)時間
     */
    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }

說明2:上面的TimeUnit是JDk 1.5所引入的枚舉,它里面定義了天、小時、分鐘、秒、毫秒、微秒、納秒這些單位以及它們之間的互相轉(zhuǎn)換的方法。

在分析了ScheduledExecutorService之后,下面我們通過代碼再來看看這些方法之間的差異。

package concurrency.threadpool;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
 * 固定延遲時間地周期性調(diào)度
 */
public class TestScheduledWithFixedDelay {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool = Executors
                .newScheduledThreadPool(1);
        System.out.println("start schedule: " + System.currentTimeMillis()
                / 1000);
        /*
         * scheduleWithFixedDelay執(zhí)行說明: 第一個任務(wù)的執(zhí)行是在給定的延遲時間后執(zhí)行,這里的延遲時間為2秒
         * 后續(xù)的任務(wù)是在上一個任務(wù)執(zhí)行完成后,經(jīng)過delay后繼續(xù)執(zhí)行(這里的delay為3秒)
         */
        ScheduledFuture<?> future = scheduledThreadPool.scheduleWithFixedDelay(
                new Task(), 2, 3, TimeUnit.SECONDS);
    }

    static class Task implements Runnable {
        @Override
        public void run() {
            try {
                System.out.println("currentTime: "
                        + (System.currentTimeMillis() / 1000) + ","
                        + Thread.currentThread().getName() + " doSomething!");
                //休息5秒
                Thread.sleep(6000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}


TestScheduleFixDelay結(jié)果.png
package concurrency.threadpool;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TestScheduleAtFixedRate {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool = Executors
                .newScheduledThreadPool(1);
        
        System.out.println("Start schedule: "+System.currentTimeMillis()/1000);
        //初始任務(wù)執(zhí)行延遲2秒,每隔4秒執(zhí)行下一個任務(wù)
        scheduledThreadPool.scheduleAtFixedRate(new Task(), 2, 4,
                TimeUnit.SECONDS);
    }
    
    static class Task implements Runnable{
        @Override
        public void run() {
            try {
                System.out.println("currentTime: " + System.currentTimeMillis() /1000
                        + "," + Thread.currentThread().getName()
                        + " doSomething!");
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
}
TestScheduleFixRate.png
package concurrency.threadpool;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TestScheduleAtFixedRate2 {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool = Executors
                .newScheduledThreadPool(10);
        System.out.println("Start schedule: "+System.currentTimeMillis() /1000);
        /*
         * 任務(wù)的調(diào)度周期為4秒,但是每個任務(wù)的執(zhí)行卻是6秒,此時每個任務(wù)的執(zhí)行時間都會被延后
         * 其執(zhí)行的時間與上一個任務(wù)之間相差為6秒
         */
        scheduledThreadPool.scheduleAtFixedRate(new Task(), 2, 4,
                TimeUnit.SECONDS);
    }
    
    static class Task implements Runnable{
        @Override
        public void run() {
            try {
                System.out.println("currentTime: " + System.currentTimeMillis() / 1000
                        + "," + Thread.currentThread().getName()
                        + " doSomething!");
                //任務(wù)執(zhí)行時間為6秒
                Thread.sleep(6000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
}

TestScheduleFixRate2.png

好了,JDK線程池的體系結(jié)構(gòu)就分析到這里,下篇內(nèi)容將會分析ThreadPoolExecutor這個線程池的具體實(shí)現(xiàn)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容