JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 五

JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 五

版本 作者 內(nèi)容
2018.7.4 chuIllusions J.U.C組件拓展

相關(guān)文章

JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 一 之 并發(fā)相關(guān)知識(shí)
JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 二 之 線程安全性、安全發(fā)布對(duì)象
JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 三 之 線程安全策略
JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 四 之 J.U.C之AQS
JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 六 之 線程池

J.U.C 組件拓展

FutureTask

Introduction

FutureTask這個(gè)組件是J.U.C里面的,但不是AQS的子類,但是這個(gè)類對(duì)線程處理的結(jié)果很值得我們學(xué)習(xí)和在項(xiàng)目中使用。
??在Java中一般通過繼承Thread類或者實(shí)現(xiàn)Runnable接口這兩種方式來創(chuàng)建多線程,但是這兩種方式都有個(gè)缺陷,就是不能在執(zhí)行完成后獲取執(zhí)行的結(jié)果,在Java 1.5之后提供了Callable和Future接口,通過它們就可以在任務(wù)執(zhí)行完畢之后得到任務(wù)的執(zhí)行結(jié)果。

Callable 與 Runnable

Callable接口定義,運(yùn)行Callable任務(wù)可以拿到一個(gè)Future對(duì)象,表示異步計(jì)算的結(jié)果。

@FunctionalInterface
public interface Callable<V> {
    /**
     * 計(jì)算結(jié)果或失敗時(shí)扔出異常
     * @since 1.5
     * @return 計(jì)算結(jié)果
     * @throws 計(jì)算失敗扔出異常
     */
    V call() throws Exception;
}

Runnable接口定義,由于run()方法返回值為void類型,所以在執(zhí)行完任務(wù)之后無法返回任何結(jié)果。

@FunctionalInterface
public interface Runnable {
    /**
     * 當(dāng)一個(gè)對(duì)象實(shí)現(xiàn)<code>Runnable</code>接口創(chuàng)建一個(gè)線程,這個(gè)對(duì)象通過覆寫
     * run方法處理線程邏輯,并且Thread類啟動(dòng)該線程,執(zhí)行Runnable處理線程邏輯
     * @since 1.0
     */
    public abstract void run();
}

&emsp可以看到Callable是個(gè)泛型接口,泛型V就是要call()方法返回的類型。Callable接口和Runnable接口很像,都可以被另外一個(gè)線程執(zhí)行,Callable功能更強(qiáng)大些,正如前面所說的,Runnable不會(huì)返回?cái)?shù)據(jù)也不能拋出異常,而Callable可以有返回值與可以拋出異常。

Future

Future接口代表異步計(jì)算的結(jié)果,通過Future接口提供的方法可以查看異步計(jì)算是否執(zhí)行完成,或者等待執(zhí)行結(jié)果并獲取執(zhí)行結(jié)果,同時(shí)還可以取消執(zhí)行。也就是說Future就是對(duì)于具體的Runnable或者Callable任務(wù)的執(zhí)行結(jié)果進(jìn)行取消、查詢是否完成、獲取結(jié)果。通常不能從線程中獲得方法的返回值,這時(shí)Future就出場(chǎng)了,Future可以監(jiān)控目標(biāo)線程調(diào)用call()的情況??偨Y(jié)來說,Future可以得到線程任務(wù)方法的返回值。

public interface Future<V> {
    /*
     * 取消異步任務(wù)的執(zhí)行。
     * 如果異步任務(wù)已經(jīng)完成或者已經(jīng)被取消,或者由于某些原因不能取消,則會(huì)返回false;
     * 如果任務(wù)還沒有被執(zhí)行,則會(huì)返回true并且異步任務(wù)不會(huì)被執(zhí)行;
     * 如果任務(wù)已經(jīng)開始執(zhí)行了但是還沒有執(zhí)行完成:
     *      若mayInterruptIfRunning為true,則會(huì)立即中斷執(zhí)行任務(wù)的線程并返回true
     *      若mayInterruptIfRunning為false,則會(huì)返回true且不會(huì)中斷任務(wù)執(zhí)行線程
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /*
     * 判斷任務(wù)是否被取消,如果任務(wù)在結(jié)束(正常執(zhí)行結(jié)束或者執(zhí)行異常結(jié)束)前被取消則返回true,否則返回false。
     */
    boolean isCancelled();

    /*
     * 判斷任務(wù)是否已經(jīng)完成,如果完成則返回true,否則返回false。需要注意的是:任務(wù)執(zhí)行過程中發(fā)生異常、任務(wù)被取消也屬于任務(wù)已完成,也會(huì)返回true。
     */
    boolean isDone();

    /*
     * 獲取任務(wù)執(zhí)行結(jié)果:
     *      如果任務(wù)還沒完成則會(huì)阻塞等待直到任務(wù)執(zhí)行完成
     *      如果任務(wù)被取消則會(huì)拋出CancellationException異常
     *      如果任務(wù)執(zhí)行過程發(fā)生異常則會(huì)拋出ExecutionException異常
     *      如果阻塞等待過程中被中斷則會(huì)拋出InterruptedException異常
     */
    V get() throws InterruptedException, ExecutionException;

    /*
     * 帶超時(shí)時(shí)間的get()版本,如果阻塞等待過程中超時(shí)則會(huì)拋出TimeoutException異常。
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

}

因?yàn)镕uture只是一個(gè)接口,所以是無法直接用來創(chuàng)建對(duì)象使用的,因此就有了下面的FutureTask

FutureTask

Future只是一個(gè)接口,不能直接用來創(chuàng)建對(duì)象,FutureTaskFuture的實(shí)現(xiàn)類。

public interface RunnableFuture<V> extends Runnable, Future<V> {}

public class FutureTask<V> implements RunnableFuture<V> {
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
}

從上面兩個(gè)類結(jié)構(gòu),可以得知FutureTask最終還是執(zhí)行Callable類型的任務(wù)。如果在FutureTask構(gòu)造函數(shù)中傳入Runnable,會(huì)轉(zhuǎn)換成Callable類型。

FutureTask實(shí)際上實(shí)現(xiàn)了RunnableFuture接口,所以它既可以作為Runnable被線程執(zhí)行,又可以作為Future得到Callable的返回值。好處:假設(shè)有個(gè)很費(fèi)時(shí)的邏輯需要計(jì)算,并且返回這個(gè)計(jì)算值,同時(shí)這個(gè)值又不是馬上需要,那么就可以使用這個(gè)組合,用另外一個(gè)線程計(jì)算返回值,而當(dāng)前線程在使用這個(gè)返回值之前,可以做其他的操作,等到需要這個(gè)返回值時(shí),才通過Future得到。

案例
@Slf4j
public class FutureExample {

    /**
     * Callable任務(wù)
     */
    static class MyCallable implements Callable<String> {

        @Override
        public String call() throws Exception {
            log.info("do something in callable");
            Thread.sleep(5000);
            return "Done";
        }
    }

    public static void main(String[] args) throws Exception {
        //1.生成線程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //線程池提交Callable任務(wù),并且得到Future
        Future<String> future = executorService.submit(new MyCallable());
        log.info("do something in main");
        Thread.sleep(1000);
        //調(diào)用Future.get()時(shí),如果任務(wù)線程還未執(zhí)行完畢,則會(huì)一直阻塞在此,等待線程任務(wù)完成,然后拿到結(jié)果
        String result = future.get();
        log.info("result:{}", result);
    }
}

以上Future與以下FutureTask要實(shí)現(xiàn)的效果是一樣的。

@Slf4j
public class FutureTaskExample {

    public static void main(String[] args) throws Exception {
        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                log.info("do something in callable");
                Thread.sleep(5000);
                return "Done";
            }
        });

        new Thread(futureTask).start();
        log.info("do something in main");
        // 1\. 調(diào)用isDone()判斷任務(wù)是否結(jié)束
        if(!futureTask.isDone()) {
            System.out.println("Task is not done");
            try {
                //阻塞主線程一秒鐘
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        String result = futureTask.get();
        log.info("result:{}", result);
    }
}

參考深入學(xué)習(xí) FutureTask

Fork/Join

Introduction

Fork/Join框架是Java7提供了的一個(gè)用于并行執(zhí)行任務(wù)的框架, 是一個(gè)把大任務(wù)分割成若干個(gè)小任務(wù),最終匯總每個(gè)小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的框架。它的思想與MapReduce類似,從字面上理解,F(xiàn)ork即把一個(gè)大任務(wù),切割成若干個(gè)子任務(wù)并行執(zhí)行,Join即把若干個(gè)子任務(wù)結(jié)果進(jìn)行合并,最后得到大任務(wù)的結(jié)果,主要采取工作竊取算法。
??工作竊?。╳ork-stealing)算法是指某個(gè)線程從其他隊(duì)列里竊取任務(wù)來執(zhí)行。

image

假如我們需要做一個(gè)比較大的任務(wù),我們可以把這個(gè)任務(wù)分割為若干互不依賴的子任務(wù),為了減少線程間的競(jìng)爭(zhēng),于是把這些子任務(wù)分別放到不同的隊(duì)列里,并為每個(gè)隊(duì)列創(chuàng)建一個(gè)單獨(dú)的線程來執(zhí)行隊(duì)列里的任務(wù),線程和隊(duì)列一一對(duì)應(yīng),比如A線程負(fù)責(zé)處理A隊(duì)列里的任務(wù)。但是有的線程會(huì)先把自己隊(duì)列里的任務(wù)干完,而其他線程對(duì)應(yīng)的隊(duì)列里還有任務(wù)等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊(duì)列里竊取一個(gè)任務(wù)來執(zhí)行。而在這時(shí)它們會(huì)訪問同一個(gè)隊(duì)列,所以為了減少竊取任務(wù)線程和被竊取任務(wù)線程之間的競(jìng)爭(zhēng),通常會(huì)使用雙端隊(duì)列,被竊取任務(wù)線程永遠(yuǎn)從雙端隊(duì)列的頭部拿任務(wù)執(zhí)行,而竊取任務(wù)的線程永遠(yuǎn)從雙端隊(duì)列的尾部拿任務(wù)執(zhí)行。

工作竊取算法的優(yōu)點(diǎn)是充分利用線程進(jìn)行并行計(jì)算,并減少了線程間的競(jìng)爭(zhēng),其缺點(diǎn)是在某些情況下還是存在競(jìng)爭(zhēng),比如雙端隊(duì)列里只有一個(gè)任務(wù)時(shí)。并且消耗了更多的系統(tǒng)資源,比如創(chuàng)建多個(gè)線程和多個(gè)雙端隊(duì)列。

對(duì)于Fork/Join框架而言,當(dāng)一個(gè)任務(wù)正在等待它使用Join操作創(chuàng)建的子任務(wù)結(jié)束時(shí),執(zhí)行這個(gè)任務(wù)的工作線程,尋找其他并未被執(zhí)行的任務(wù),并開始執(zhí)行,通過這種方式,線程充分利用它們的運(yùn)行時(shí)間,來提高應(yīng)用程序的性能。為了實(shí)現(xiàn)這個(gè)目標(biāo),F(xiàn)ork/Join框架執(zhí)行的任務(wù)有一些局限性:

  1. 任務(wù)只能使用Fork、Join操作來作為同步機(jī)制,如果使用了其他同步機(jī)制,那他們?cè)谕讲僮鲿r(shí),工作線程則不能執(zhí)行其他任務(wù)。如:在框架的操作中,使任務(wù)進(jìn)入睡眠,那么在這個(gè)睡眠期間內(nèi),正在執(zhí)行這個(gè)任務(wù)的工作線程,將不會(huì)執(zhí)行其他任務(wù)
  2. 所執(zhí)行的任務(wù),不應(yīng)該執(zhí)行IO操作,如讀和寫數(shù)據(jù)文件
  3. 任務(wù)不能拋出檢查型異常,必須通過必要的代碼處理它們

核心是兩個(gè)類:ForkJoinTaskForkJoinPool。Pool主要負(fù)責(zé)實(shí)現(xiàn),包括上面所介紹的工作竊取算法,管理工作線程和提供關(guān)于任務(wù)的狀態(tài)以及它們的執(zhí)行信息;Task主要提供在任務(wù)中,執(zhí)行Fork與Join操作的機(jī)制。

引用[并行流與串行流 Fork/Join框架的一張圖來說明過程

image
Example

我們先來看一下Fork/Join框架的演示:

@Slf4j
//Recursive遞歸的意思,把大任務(wù)不斷的拆分成小任務(wù),即是一個(gè)遞歸拆分任務(wù)的一個(gè)過程
//RecursiveTask<T>,T表示任務(wù)的返回值
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
    //設(shè)置分割的閾值
    public static final int threshold = 2;
    private int start;
    private int end;

    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }

    //任務(wù)
    @Override
    protected Integer compute() {
        int sum = 0;

        //如果任務(wù)足夠小就計(jì)算任務(wù)
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            //任務(wù)足夠小的時(shí)候,直接計(jì)算,不進(jìn)行分裂計(jì)算
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任務(wù)大于閾值,就分裂成兩個(gè)子任務(wù)計(jì)算
            int middle = (start + end) / 2;

            /**
             * 下面可能會(huì)產(chǎn)生遞歸操作
             */
            //繼續(xù)分裂任務(wù)
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

            // 執(zhí)行子任務(wù)
            leftTask.fork();
            rightTask.fork();

            // 等待任務(wù)執(zhí)行結(jié)束合并其結(jié)果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合并子任務(wù)
            sum = leftResult + rightResult;
        }
        //返回結(jié)果
        return sum;
    }

    public static void main(String[] args) {
        //生成一個(gè)池
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一個(gè)計(jì)算任務(wù),計(jì)算1+2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

        //執(zhí)行一個(gè)任務(wù),將任務(wù)放入池中,并開始執(zhí)行,并用Future接收
        Future<Integer> result = forkjoinPool.submit(task);

        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}

通過這個(gè)例子讓我們?cè)賮磉M(jìn)一步了解ForkJoinTask,任務(wù)類繼承RecursiveTaskForkJoinTask與一般的任務(wù)的主要區(qū)別在于它需要實(shí)現(xiàn)compute()方法,在這個(gè)方法里,首先需要判斷任務(wù)是否足夠小,如果足夠小就直接執(zhí)行任務(wù)。如果不足夠小,就必須分割成兩個(gè)子任務(wù),每個(gè)子任務(wù)在調(diào)用fork()方法時(shí),又會(huì)進(jìn)入compute()方法,看看當(dāng)前子任務(wù)是否需要繼續(xù)分割成孫任務(wù),如果不需要繼續(xù)分割,則執(zhí)行當(dāng)前子任務(wù)并返回結(jié)果。使用join()方法會(huì)等待子任務(wù)執(zhí)行完并得到其結(jié)果。

Main Class

上面提到,F(xiàn)ork/Join框架中的兩個(gè)核心類ForkJoinTaskForkJoinPool,并且從上面的例子可以知道,聲明ForkJoinTask后,將其加入到ForkJoinPool中,并返回一個(gè)Future對(duì)象。

  • ForkJoinPoolForkJoinTask需要通過ForkJoinPool來執(zhí)行,任務(wù)分割出的子任務(wù)會(huì)添加到當(dāng)前工作線程所維護(hù)的雙端隊(duì)列中,進(jìn)入隊(duì)列的頭部。當(dāng)一個(gè)工作線程的隊(duì)列里暫時(shí)沒有任務(wù)時(shí),它會(huì)隨機(jī)從其他工作線程的隊(duì)列的尾部獲取一個(gè)任務(wù)。
  • ForkJoinTask:我們要使用ForkJoin框架,必須首先創(chuàng)建一個(gè)ForkJoin任務(wù)。它提供在任務(wù)中執(zhí)行fork()join()操作的機(jī)制,通常情況下我們不需要直接繼承ForkJoinTask類,而只需要繼承它的子類,Fork/Join框架提供了以下兩個(gè)子類:
    • RecursiveAction:用于沒有返回結(jié)果的任務(wù)。
    • RecursiveTask :用于有返回結(jié)果的任務(wù)。
image
Exception

ForkJoinTask在執(zhí)行的時(shí)候可能會(huì)拋出異常,但是我們沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務(wù)是否已經(jīng)拋出異?;蛞呀?jīng)被取消了,并且可以通過ForkJoinTaskgetException()方法獲取異常。

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
    /** ForkJoinTask運(yùn)行狀態(tài) */
    volatile int status; // 直接被ForkJoin池和工作線程訪問
    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
    static final int NORMAL      = 0xf0000000;  // must be negative
    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
    static final int SMASK       = 0x0000ffff;  // short bits for tags

    /**
     * @Ruturn 任務(wù)是否扔出異常或被取消
     */
    public final boolean isCompletedAbnormally() {
        return status < NORMAL;
    }

    /**
     * 如果計(jì)算扔出異常,則返回異常
     * 如果任務(wù)被取消了則返回CancellationException。如果任務(wù)沒有完成或者沒有拋出異常則返回null
     */
    public final Throwable getException() {
        int s = status & DONE_MASK;
        return ((s >= NORMAL)    ? null :
                (s == CANCELLED) ? new CancellationException() :
                getThrowableException());
    }
}

Analysis
ForkJoinPool
public class ForkJoinPool extends AbstractExecutorService {
    /**
     * ForkJoinPool,它同ThreadPoolExecutor一樣,也實(shí)現(xiàn)了Executor和ExecutorService接口。它使用了
     * 一個(gè)無限隊(duì)列來保存需要執(zhí)行的任務(wù),而線程的數(shù)量則是通過構(gòu)造函數(shù)傳入,如果沒有向構(gòu)造函數(shù)中傳入希
     * 望的線程數(shù)量,那么當(dāng)前計(jì)算機(jī)可用的CPU數(shù)量會(huì)被設(shè)置為線程數(shù)量作為默認(rèn)值。
     */
    public ForkJoinPool() {
        this(Math.min(MAX_CAP,Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false);
    }
    public ForkJoinPool(int parallelism) {
        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
    }
    //有多個(gè)構(gòu)造器,這里省略

    volatile WorkQueue[] workQueues;     // main registry
    static final class WorkQueue {
        final ForkJoinWorkerThread owner; // 工作線程
        ForkJoinTask<?>[] array;   // 任務(wù)

        //傳入的是ForkJoinPool與指定的一個(gè)工作線程
        WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
            this.pool = pool;
            this.owner = owner;
            // Place indices in the center of array (that is not yet allocated)
            base = top = INITIAL_QUEUE_CAPACITY >>> 1;
        }

    }
}

ForkJoinPool中源碼挺強(qiáng)大的,我只抽取了重要的部分進(jìn)行分析。

  • ForkJoinPool中維護(hù)了一組WorkQueue,也就是工作隊(duì)列,工作隊(duì)列中又維護(hù)了一個(gè)工作線程ForkJoinWorkerThread與一組工作任務(wù)ForkJoinTask
  • WorkQueue是一個(gè)雙端隊(duì)列(Deque),即 Double Ended Queue ,Deque是一種具有隊(duì)列和棧的性質(zhì)的數(shù)據(jù)結(jié)構(gòu),雙端隊(duì)列中的元素可以從兩端彈出,其限定插入和刪除操作在表的兩端進(jìn)行。
  • 每個(gè)工作線程在運(yùn)行中產(chǎn)生新的任務(wù)(通常是因?yàn)檎{(diào)用了fork())時(shí),會(huì)放入工作隊(duì)列的隊(duì)尾,并且工作線程在處理自己的工作隊(duì)列時(shí),使用的是LIFO 方式,也就是說每次從隊(duì)尾取出任務(wù)來執(zhí)行。
  • 每個(gè)工作線程在處理自己的工作隊(duì)列同時(shí),會(huì)嘗試竊取一個(gè)任務(wù)(或是來自于剛剛提交到 pool 的任務(wù),或是來自于其他工作線程的工作隊(duì)列),竊取的任務(wù)位于其他線程的工作隊(duì)列的隊(duì)首,也就是說工作線程在竊取其他工作線程的任務(wù)時(shí),使用的是 FIFO 方式。
  • 在遇到 join() 時(shí),如果需要 join 的任務(wù)尚未完成,則會(huì)先處理其他任務(wù),并等待其完成。
  • 在既沒有自己的任務(wù),也沒有可以竊取的任務(wù)時(shí),進(jìn)入休眠。
public class ForkJoinPool extends AbstractExecutorService {
    public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {}
    public <T> ForkJoinTask<T> submit(Callable<T> task) {}
    public <T> ForkJoinTask<T> submit(Runnable task, T result) {}
    public ForkJoinTask<?> submit(Runnable task) {}
}

從上面來看,ForkJoinPool所提供的submit()方法中,有幾個(gè)重載。

ForkJoinPool自身也擁有工作隊(duì)列,這些工作隊(duì)列的作用是用來接收由外部線程(非 ForkJoinThread 線程)提交過來的任務(wù),而這些工作隊(duì)列被稱為 submitting queue 。

ForkJoinTask

從上面的例子,我們可以知道,任務(wù)的操作,重要的是fork()join(),我們可以假設(shè)這兩個(gè)的作用:

  • fork():開啟一個(gè)新線程(或是重用線程池內(nèi)的空閑線程),將任務(wù)交給該線程處理。
  • join():等待該任務(wù)的處理線程處理完畢,獲得返回值。

但對(duì)我的這個(gè)假設(shè),很明顯就不對(duì)的,當(dāng)任務(wù)分解得越來越細(xì)時(shí),所需要的線程數(shù)就會(huì)越來越多,而且大部分線程處于等待狀態(tài)。從ForkJoinPool的構(gòu)造函數(shù)中,可以知道,工作線程的數(shù)量是指定的,或者說是按照系統(tǒng)默認(rèn)的。
??可以得出,我的假設(shè)是錯(cuò)誤的,因此,并不是每個(gè) fork() 都會(huì)促成一個(gè)新線程被創(chuàng)建,而每個(gè) join() 也不是一定會(huì)造成線程被阻塞。這一點(diǎn)可以體現(xiàn)出work stealing 算法的優(yōu)勢(shì)。

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
    /**
     * 在當(dāng)前任務(wù)正在運(yùn)行的池中異步執(zhí)行此任務(wù)(如果適用)
     * 或使用ForkJoinPool.commonPool()(如果不是ForkJoinWorkerThread實(shí)例)進(jìn)行異步執(zhí)行 
     */
    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }

    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }
}

  1. fork() 做的工作只有一件事,既是把任務(wù)推入當(dāng)前工作線程的工作隊(duì)列里。
  2. join() 的工作則復(fù)雜得多,也是join() 可以使得線程免于被阻塞的原因
    1. 檢查調(diào)用 join() 的線程是否是 ForkJoinThread 線程。如果不是(例如 main 線程),則阻塞當(dāng)前線程,等待任務(wù)完成。如果是,則不阻塞。
    2. 查看任務(wù)的完成狀態(tài),如果已經(jīng)完成,直接返回結(jié)果。
    3. 如果任務(wù)尚未完成,但處于自己的工作隊(duì)列內(nèi),則完成它。
    4. 如果任務(wù)已經(jīng)被其他的工作線程偷走,則竊取這個(gè)小偷的工作隊(duì)列內(nèi)的任務(wù)(以 FIFO 方式),執(zhí)行,以期幫助它早日完成欲 join 的任務(wù)。
    5. 如果偷走任務(wù)的小偷也已經(jīng)把自己的任務(wù)全部做完,正在等待需要 join 的任務(wù)時(shí),則找到小偷的小偷,幫助它完成它的任務(wù)。
    6. 遞歸地執(zhí)行第5步。
image

以上部分內(nèi)容引用于Java 并發(fā)編程筆記:如何使用 ForkJoinPool 以及原理

BlockingQueue

引用一篇相關(guān)文章的一段話,初探BlockingQueue:BlockingQueue
??多線程環(huán)境中,通過隊(duì)列可以很容易實(shí)現(xiàn)數(shù)據(jù)共享,比如經(jīng)典的“生產(chǎn)者”和“消費(fèi)者”模型中,通過隊(duì)列可以很便利地實(shí)現(xiàn)兩者之間的數(shù)據(jù)共享。假設(shè)我們有若干生產(chǎn)者線程,另外又有若干個(gè)消費(fèi)者線程。如果生產(chǎn)者線程需要把準(zhǔn)備好的數(shù)據(jù)共享給消費(fèi)者線程,利用隊(duì)列的方式來傳遞數(shù)據(jù),就可以很方便地解決他們之間的數(shù)據(jù)共享問題。但如果生產(chǎn)者和消費(fèi)者在某個(gè)時(shí)間段內(nèi),萬一發(fā)生數(shù)據(jù)處理速度不匹配的情況呢?理想情況下,如果生產(chǎn)者產(chǎn)出數(shù)據(jù)的速度大于消費(fèi)者消費(fèi)的速度,并且當(dāng)生產(chǎn)出來的數(shù)據(jù)累積到一定程度的時(shí)候,那么生產(chǎn)者必須暫停等待一下(阻塞生產(chǎn)者線程),以便等待消費(fèi)者線程把累積的數(shù)據(jù)處理完畢,反之亦然。然而,在concurrent包發(fā)布以前,在多線程環(huán)境下,我們每個(gè)程序員都必須去自己控制這些細(xì)節(jié),尤其還要兼顧效率和線程安全,而這會(huì)給我們的程序帶來不小的復(fù)雜度。好在此時(shí),強(qiáng)大的concurrent包橫空出世了,而他也給我們帶來了強(qiáng)大的BlockingQueue。(在多線程領(lǐng)域:所謂阻塞,在某些情況下會(huì)掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會(huì)自動(dòng)被喚醒)

image

BlockingQueue即為阻塞隊(duì)列,是一個(gè)先進(jìn)先出的隊(duì)列,在某些情況下,對(duì)阻塞隊(duì)列的訪問可能會(huì)造成阻塞,被阻塞的情況主要有兩種:

  1. 當(dāng)隊(duì)列滿時(shí),進(jìn)行入隊(duì)列操作。當(dāng)一個(gè)線程試圖對(duì)一個(gè)已經(jīng)滿了的隊(duì)列進(jìn)行入隊(duì)操作時(shí), 他將會(huì)阻塞,除非有另一個(gè)線程做了出隊(duì)列的操作。
  2. 當(dāng)隊(duì)列空時(shí),進(jìn)行出隊(duì)列操作。當(dāng)一個(gè)線程試圖對(duì)一個(gè)空隊(duì)列進(jìn)行出隊(duì)操作時(shí),他也將會(huì)被阻塞,除非有另一個(gè)線程做了入隊(duì)的操作。

阻塞隊(duì)列是線程安全的,主要用在生產(chǎn)者與消費(fèi)者的場(chǎng)景。上圖就是線程生產(chǎn)和消費(fèi)的場(chǎng)景,負(fù)責(zé)生產(chǎn)的線程不斷的制造新對(duì)象并插入到阻塞隊(duì)列中,直到達(dá)到隊(duì)列的上限值,從而被阻塞,直到消費(fèi)線程對(duì)隊(duì)列進(jìn)行消費(fèi)。同理,負(fù)責(zé)消費(fèi)的線程不斷的從隊(duì)列中消費(fèi)對(duì)象,直到這個(gè)隊(duì)列為空,這時(shí)消費(fèi)線程將會(huì)被阻塞,除非隊(duì)列中有新的隊(duì)列被生產(chǎn)加入。

public interface BlockingQueue<E> extends Queue<E> {}
public interface Queue<E> extends Collection<E> {}

BlockingQueue 是一個(gè)接口,繼承自 Queue,所以其實(shí)現(xiàn)類也可以作為 Queue 的實(shí)現(xiàn)來使用,而 Queue 又繼承自 Collection 接口。

BlockingQueue對(duì)插入操作、移除操作、獲取元素操作提供了四種不同的方法用于不同的場(chǎng)景中使用。我們使用不同的方法,都會(huì)有不同的表現(xiàn)。BlockingQueue 的各個(gè)實(shí)現(xiàn)都遵循了這些規(guī)則:

Throws Exception Special Value Blocks Times Out
insert add(o) offer(o) put(o) offer(o,timeout,timeunit)
remove remove(o) poll() take() poll(timeout,timeunit)
examine element() peek() not applicable not applicable
  1. Throws Exception:拋出異常。如果不能馬上進(jìn)行,則拋出異常。
  2. Special Value:如果不能馬上進(jìn)行,則返回特殊值,一般是True或False
  3. Blocks:如果不能馬上進(jìn)行,則操作會(huì)被阻塞,直到這個(gè)操作成功
  4. Times Out:如果不能馬上進(jìn)行,操作會(huì)被阻塞指定的時(shí)間。如果指定時(shí)間還未執(zhí)行,則返回特殊值,一般是True或False。

對(duì)于BlockingQueue,關(guān)注點(diǎn)應(yīng)該在它的puttake方法上,因?yàn)檫@兩個(gè)方法是帶阻塞的。

BlockingQueue 不接受 null 值的插入,相應(yīng)的方法在碰到null 的插入時(shí)會(huì)拋出 NullPointerException 異常。null 值在這里通常用于作為特殊值返回(表格中的第三列),代表 poll 失敗。所以,如果允許插入 null 值的話,那獲取的時(shí)候,就不能很好地用 null 來判斷到底是代表失敗,還是獲取的值就是 null 值。

前面說了,它實(shí)現(xiàn)了 java.util.Collection 接口。例如,我們可以用 remove(x) 來刪除任意一個(gè)元素,但是,這類操作通常并不高效,所以盡量只在少數(shù)的場(chǎng)合使用,比如一條消息已經(jīng)入隊(duì),但是需要做取消操作的時(shí)候。

BlockingQueue 的實(shí)現(xiàn)都是線程安全的,但是批量的集合操作如 addAll, containsAll, retainAllremoveAll 不一定是原子操作。如 addAll(c) 有可能在添加了一些元素后中途拋出異常,此時(shí) BlockingQueue 中已經(jīng)添加了部分元素,這個(gè)是允許的,取決于具體的實(shí)現(xiàn)。

BlockingQueue 在生產(chǎn)者-消費(fèi)者的場(chǎng)景中,是支持多消費(fèi)者和多生產(chǎn)者的,說的其實(shí)就是線程安全問題。BlockingQueue 是一個(gè)比較簡(jiǎn)單的線程安全容器。作為BlockingQueue的使用者,我們?cè)僖膊恍枰P(guān)心什么時(shí)候需要阻塞線程,什么時(shí)候需要喚醒線程,因?yàn)檫@一切BlockingQueue都給你一手包辦了。

這里補(bǔ)充一點(diǎn),一般所說的無界隊(duì)列,并不是大小不限制的,只是它的大小是Integer.MAX_VALUE,即int類型能夠表示的最大值,也可以理解為大小是(2的31次方)-1

BlockingQueue家庭中實(shí)現(xiàn)類主要有以下幾個(gè),常用的是ArrayBlockingQueueLinkedBlockingQueue,下文將會(huì)對(duì)這兩個(gè)類作詳細(xì)介紹。其他成員將簡(jiǎn)單介紹。

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • DelayQueue:
  • PriorityBlockingQueue
  • SynchronousQueue

參考:解讀 Java 并發(fā)隊(duì)列 BlockingQueue

ArrayBlockingQueue
Introdution

有界的阻塞隊(duì)列,內(nèi)部實(shí)現(xiàn)是一個(gè)數(shù)組,有邊界的意思是:容量是有限的,必須初始化時(shí),指定它的容量大小,以先進(jìn)先出的方式存儲(chǔ)數(shù)據(jù),最新插入的對(duì)象在尾部,最先移除的對(duì)象在頭部。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
    /** 隊(duì)列元素 */
    final Object[] items;

    /** 下一次讀取操作的位置, poll, peek or remove */
    int takeIndex;

    /** 下一次寫入操作的位置, offer, or add */
    int putIndex;

    /** 元素?cái)?shù)量 */
    int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     * 它采用一個(gè) ReentrantLock 和相應(yīng)的兩個(gè) Condition 來實(shí)現(xiàn)。
     */

    /** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;

    /** 指定大小 */
    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    /** 
     * 指定容量大小與指定訪問策略 
     * @param fair 指定獨(dú)占鎖是公平鎖還是非公平鎖。非公平鎖的吞吐量比較高,公平鎖可以保證每次都是等待最久的線程獲取到鎖;
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {}

    /** 
     * 指定容量大小、指定訪問策略與最初包含給定集合中的元素 
     * @param c 將此集合中的元素在構(gòu)造方法期間就先添加到隊(duì)列中 
     */
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {}
}

從上面的類結(jié)構(gòu),可以知道:

  1. ArrayBlockingQueue 在生產(chǎn)者放入數(shù)據(jù)和消費(fèi)者獲取數(shù)據(jù),都是共用同一個(gè)鎖對(duì)象,由此也意味著兩者無法真正并行運(yùn)行。按照實(shí)現(xiàn)原理來分析,ArrayBlockingQueue完全可以采用分離鎖,從而實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者操作的完全并行運(yùn)行。Doug Lea之所以沒這樣去做,也許是因?yàn)?code>ArrayBlockingQueue的數(shù)據(jù)寫入和獲取操作已經(jīng)足夠輕巧,以至于引入獨(dú)立的鎖機(jī)制,除了給代碼帶來額外的復(fù)雜性外,其在性能上完全占不到任何便宜。
  2. 通過構(gòu)造函數(shù)得知,參數(shù)fair控制對(duì)象的內(nèi)部鎖是否采用公平鎖,默認(rèn)采用非公平鎖。
  3. items、takeIndex、putIndex、count等屬性并沒有使用volatile修飾,這是因?yàn)樵L問這些變量(通過方法獲?。┦褂枚际窃阪i塊內(nèi),并不存在可見性問題,如size()
  4. 另外有個(gè)獨(dú)占鎖lock用來對(duì)出入隊(duì)操作加鎖,這導(dǎo)致同時(shí)只有一個(gè)線程可以訪問入隊(duì)出隊(duì)。
Put()

我們通過源碼,分析一下Put方法的實(shí)現(xiàn):

    /** 進(jìn)行入隊(duì)操作 */
    public void put(E e) throws InterruptedException {
        //e為null,則拋出NullPointerException異常
        checkNotNull(e);
        //獲取獨(dú)占鎖
        final ReentrantLock lock = this.lock;
        /**
         * lockInterruptibly()
         * 獲取鎖定,除非當(dāng)前線程為interrupted
         * 如果鎖沒有被另一個(gè)線程占用并且立即返回,則將鎖定計(jì)數(shù)設(shè)置為1。
         * 如果當(dāng)前線程已經(jīng)保存此鎖,則保持計(jì)數(shù)將遞增1,該方法立即返回。
         * 如果鎖被另一個(gè)線程保持,則當(dāng)前線程將被禁用以進(jìn)行線程調(diào)度,并且處于休眠狀態(tài)
         * 
         */
        lock.lockInterruptibly();
        try {
            //空隊(duì)列
            while (count == items.length)
                //進(jìn)行條件等待處理
                notFull.await();
            //入隊(duì)操作
            enqueue(e);
        } finally {
            //釋放鎖
            lock.unlock();
        }
    }

    /** 真正的入隊(duì) */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        //獲取當(dāng)前元素
        final Object[] items = this.items;
        //按下一個(gè)插入索引進(jìn)行元素添加
        items[putIndex] = x;
        // 計(jì)算下一個(gè)元素應(yīng)該存放的下標(biāo),可以理解為循環(huán)隊(duì)列
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        //喚起消費(fèi)者
        notEmpty.signal();
    }

這里由于在操作共享變量前加了鎖,所以不存在內(nèi)存不可見問題,加過鎖后獲取的共享變量都是從主內(nèi)存獲取的,而不是在CPU緩存或者寄存器里面的值,釋放鎖后修改的共享變量值會(huì)刷新會(huì)主內(nèi)存中。

另外這個(gè)隊(duì)列是使用循環(huán)數(shù)組實(shí)現(xiàn),所以計(jì)算下一個(gè)元素存放下標(biāo)時(shí)候有些特殊。另外insert后調(diào)用 notEmpty.signal();是為了激活調(diào)用notEmpty.await();阻塞后放入notEmpty條件隊(duì)列中的線程。

Take()

我們通過源碼,分析一下take方法的實(shí)現(xiàn):

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        //這里有些特殊
        if (itrs != null)
            //保持隊(duì)列中的元素和迭代器的元素一致
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

從上面分析可以知道,其實(shí)Put操作與Take操作很相似。但是有一點(diǎn)我在上面代碼中標(biāo)識(shí)了,繼續(xù)深入了解:

    //該類的迭代器,所有的迭代器共享數(shù)據(jù),隊(duì)列改變會(huì)影響所有的迭代器
    transient Itrs itrs = null; //其存放了目前所創(chuàng)建的所有迭代器。
    /**
      * 迭代器和它們的隊(duì)列之間的共享數(shù)據(jù),允許隊(duì)列元素被刪除時(shí)更新迭代器的修改。
      */
    class Itrs {
        void elementDequeued() {
            // assert lock.getHoldCount() == 1;
            if (count == 0)
                //隊(duì)列中數(shù)量為0的時(shí)候,隊(duì)列就是空的,會(huì)將所有迭代器進(jìn)行清理并移除
                queueIsEmpty();
            //takeIndex的下標(biāo)是0,意味著隊(duì)列從尾中取完了,又回到頭部獲取
            else if (takeIndex == 0)
                takeIndexWrapped();
        }

        /**
         * 當(dāng)隊(duì)列為空的時(shí)候做的事情
         * 1\. 通知所有迭代器隊(duì)列已經(jīng)為空
         * 2\. 清空所有的弱引用,并且將迭代器置空
         */
        void queueIsEmpty() {}

        /**
         * 將takeIndex包裝成0
         * 并且通知所有的迭代器,并且刪除已經(jīng)過期的任何對(duì)象(個(gè)人理解是置空對(duì)象)
         * 也直接的說就是在Blocking隊(duì)列進(jìn)行出隊(duì)的時(shí)候,進(jìn)行迭代器中的數(shù)據(jù)同步,保持隊(duì)列中的元素和迭代器的元素是一致的。
         */
        void takeIndexWrapped() {}
    }

分析到這里,就有個(gè)疑問了,這個(gè)迭代器到底是什么時(shí)候生成的呢?而且他在出隊(duì)時(shí),是判斷了迭代器不為空的時(shí)候才進(jìn)行操作,而肯定會(huì)存在一種情況,那就是迭代器是空的,并未創(chuàng)建,則不進(jìn)行操作。

通過在源碼奔走,我找到了相關(guān)內(nèi)容,如下,還是在我們的ArrayBlockingQueue的源碼中:

    //從這里知道,在ArrayBlockingQueue對(duì)象中調(diào)用此方法,才會(huì)生成這個(gè)對(duì)象
    //那么就可以理解為,只要并未調(diào)用此方法,則ArrayBlockingQueue對(duì)象中的Itrs對(duì)象則為空
    public Iterator<E> iterator() {
        return new Itr();
    }

    private class Itr implements Iterator<E> {
        Itr() {
            //這里就是生產(chǎn)它的地方
            //count等于0的時(shí)候,創(chuàng)建的這個(gè)迭代器是個(gè)無用的迭代器,可以直接移除,進(jìn)入detach模式。
            //否則就把當(dāng)前隊(duì)列的讀取位置給迭代器當(dāng)做下一個(gè)元素,cursor存儲(chǔ)下個(gè)元素的位置。
            if (count == 0) {
                // assert itrs == null;
                cursor = NONE;
                nextIndex = NONE;
                prevTakeIndex = DETACHED;
            } else {
                final int takeIndex = ArrayBlockingQueue.this.takeIndex;
                prevTakeIndex = takeIndex;
                nextItem = itemAt(nextIndex = takeIndex);
                cursor = incCursor(takeIndex);
                if (itrs == null) {
                    itrs = new Itrs(this);
                } else {
                    itrs.register(this); // in this order
                    itrs.doSomeSweeping(false);
                }
                prevCycles = itrs.cycles;
                // assert takeIndex >= 0;
                // assert prevTakeIndex == takeIndex;
                // assert nextIndex >= 0;
                // assert nextItem != null;
                }
        }
    }

LinkedBlockingQueue
Introduction

基于鏈表的阻塞隊(duì)列,同ArrayListBlockingQueue類似,其內(nèi)部也維持著一個(gè)數(shù)據(jù)緩沖隊(duì)列(該隊(duì)列由一個(gè)鏈表構(gòu)成),當(dāng)生產(chǎn)者往隊(duì)列中放入一個(gè)數(shù)據(jù)時(shí),隊(duì)列會(huì)從生產(chǎn)者手中獲取數(shù)據(jù),并緩存在隊(duì)列內(nèi)部,而生產(chǎn)者立即返回;只有當(dāng)隊(duì)列緩沖區(qū)達(dá)到最大值緩存容量時(shí)(LinkedBlockingQueue可以通過構(gòu)造函數(shù)指定該值),才會(huì)阻塞生產(chǎn)者隊(duì)列,直到消費(fèi)者從隊(duì)列中消費(fèi)掉一份數(shù)據(jù),生產(chǎn)者線程會(huì)被喚醒,反之對(duì)于消費(fèi)者這端的處理也基于同樣的原理。

LinkedBlockingQueue之所以能夠高效的處理并發(fā)數(shù)據(jù),還因?yàn)槠鋵?duì)于生產(chǎn)者端和消費(fèi)者端分別采用了獨(dú)立的鎖來控制數(shù)據(jù)同步,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊(duì)列中的數(shù)據(jù),以此來提高整個(gè)隊(duì)列的并發(fā)性能。
??作為開發(fā)者,我們需要注意的是,如果構(gòu)造一個(gè)LinkedBlockingQueue對(duì)象,而沒有指定其容量大小,LinkedBlockingQueue會(huì)默認(rèn)一個(gè)類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產(chǎn)者的速度一旦大于消費(fèi)者的速度,也許還沒有等到隊(duì)列滿阻塞產(chǎn)生,系統(tǒng)內(nèi)存就有可能已被消耗殆盡了。

LinkedBlockingQueue是一個(gè)使用鏈表完成隊(duì)列操作的阻塞隊(duì)列。鏈表是單向鏈表,而不是雙向鏈表。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
    //隊(duì)列的容量,指定大小或?yàn)槟J(rèn)值Integer.MAX_VALUE
    private final int capacity;

    //元素的數(shù)量
    private final AtomicInteger count = new AtomicInteger();

    //隊(duì)列頭節(jié)點(diǎn),始終滿足head.item==null
    transient Node<E> head;

    //隊(duì)列的尾節(jié)點(diǎn),始終滿足last.next==null
    private transient Node<E> last;

    /** Lock held by take, poll, etc */
    //出隊(duì)的鎖:take, poll, peek 等讀操作的方法需要獲取到這個(gè)鎖
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    //當(dāng)隊(duì)列為空時(shí),保存執(zhí)行出隊(duì)的線程:如果讀操作的時(shí)候隊(duì)列是空的,那么等待 notEmpty 條件
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    //入隊(duì)的鎖:put, offer 等寫操作的方法需要獲取到這個(gè)鎖
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    //當(dāng)隊(duì)列滿時(shí),保存執(zhí)行入隊(duì)的線程:如果寫操作的時(shí)候隊(duì)列是滿的,那么等待 notFull 條件
    private final Condition notFull = putLock.newCondition();

    //傳說中的無界隊(duì)列
    public LinkedBlockingQueue() {}
    //傳說中的有界隊(duì)列
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }
    //傳說中的無界隊(duì)列
    public LinkedBlockingQueue(Collection<? extends E> c){}

    /**
     * 鏈表節(jié)點(diǎn)類
     */
    static class Node<E> {
        E item;

        /**
         * One of:
         * - 真正的繼任者節(jié)點(diǎn)
         * - 這個(gè)節(jié)點(diǎn),意味著繼任者是head.next
         * - 空,意味著沒有后繼者(這是最后一個(gè)節(jié)點(diǎn))
         */
        Node<E> next;

        Node(E x) { item = x; }
    }
}

通過其構(gòu)造函數(shù),得知其可以當(dāng)做無界隊(duì)列也可以當(dāng)做有界隊(duì)列來使用。

這里用了兩把鎖分別是takeLockputLock、兩個(gè)Condition分別是notEmptynotFull,它們是這樣搭配的:

  • 如果要獲?。╰ake)一個(gè)元素,需要獲取 takeLock 鎖,但是獲取了鎖還不夠,如果隊(duì)列此時(shí)為空,還需要隊(duì)列不為空(notEmpty)這個(gè)條件(Condition)。
  • 如果要插入(put)一個(gè)元素,需要獲取 putLock 鎖,但是獲取了鎖還不夠,如果隊(duì)列此時(shí)已滿,還需要隊(duì)列不是滿的(notFull)這個(gè)條件(Condition)。

注意:從上面的構(gòu)造函數(shù)中,這里會(huì)初始化一個(gè)空的頭結(jié)點(diǎn),那么第一個(gè)元素入隊(duì)的時(shí)候,隊(duì)列中就會(huì)有兩個(gè)元素。讀取元素時(shí),也總是獲取頭節(jié)點(diǎn)后面的一個(gè)節(jié)點(diǎn)。count 的計(jì)數(shù)值不包括這個(gè)頭節(jié)點(diǎn)。

Put()

通過源碼分析,透析put()方法的流程

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {    
    /**
     * 將指定元素插入到此隊(duì)列的尾部,如有必要,則等待空間變得可用。
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // 如果你糾結(jié)這里為什么是 -1,可以看看 offer 方法。這就是個(gè)標(biāo)識(shí)成功、失敗的標(biāo)志而已。
        int c = -1;
        //包裝成node節(jié)點(diǎn)
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //獲取鎖定
        putLock.lockInterruptibly();
        try {
            /** 如果隊(duì)列滿,等待 notFull 的條件滿足。 */
            while (count.get() == capacity) {
                notFull.await();
            }
            //入隊(duì)
            enqueue(node);
            //原子性自增
            c = count.getAndIncrement();
            // 如果這個(gè)元素入隊(duì)后,還有至少一個(gè)槽可以使用,調(diào)用 notFull.signal() 喚醒等待線程。
            // 哪些線程會(huì)等待在 notFull 這個(gè) Condition 上呢?
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
        //解鎖
            putLock.unlock();
        }
        // 如果 c == 0,那么代表隊(duì)列在這個(gè)元素入隊(duì)前是空的(不包括head空節(jié)點(diǎn)),
        // 那么所有的讀線程都在等待 notEmpty 這個(gè)條件,等待喚醒,這里做一次喚醒操作
        if (c == 0)
            signalNotEmpty();
    }

    /** 鏈接節(jié)點(diǎn)在隊(duì)列末尾 */
    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        // 入隊(duì)的代碼非常簡(jiǎn)單,就是將 last 屬性指向這個(gè)新元素,并且讓原隊(duì)尾的 next 指向這個(gè)元素
        //last.next = node;
        //last = node;
        // 這里入隊(duì)沒有并發(fā)問題,因?yàn)橹挥蝎@取到 putLock 獨(dú)占鎖以后,才可以進(jìn)行此操作
        last = last.next = node;
    }

    /**
     * 等待PUT信號(hào)
     * 僅在 take/poll 中調(diào)用
     * 也就是說:元素入隊(duì)后,如果需要,則會(huì)調(diào)用這個(gè)方法喚醒讀線程來讀
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();//喚醒
        } finally {
            putLock.unlock();
        }
    }
}

Take
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {   
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        //首先,需要獲取到 takeLock 才能進(jìn)行出隊(duì)操作
        takeLock.lockInterruptibly();
        try {
            // 如果隊(duì)列為空,等待 notEmpty 這個(gè)條件滿足再繼續(xù)執(zhí)行
            while (count.get() == 0) {
                notEmpty.await();
            }
            //// 出隊(duì)
            x = dequeue();
            //count 進(jìn)行原子減 1
            c = count.getAndDecrement();
            // 如果這次出隊(duì)后,隊(duì)列中至少還有一個(gè)元素,那么調(diào)用 notEmpty.signal() 喚醒其他的讀線程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    /**
     * 出隊(duì)
     */
    private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

    /**
     * Signals a waiting put. Called only from take/poll.
     */
    private void signalNotFull() {
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            notFull.signal();
        } finally {
            putLock.unlock();
        }
    }
}

與 ArrayBlockingQueue 對(duì)比
  1. ArrayBlockingQueue是共享鎖,粒度大,入隊(duì)與出隊(duì)的時(shí)候只能有1個(gè)被執(zhí)行,不允許并行執(zhí)行。LinkedBlockingQueue是獨(dú)占鎖,入隊(duì)與出隊(duì)是可以并行進(jìn)行的。當(dāng)然這里說的是讀和寫進(jìn)行并行,兩者的讀讀與寫寫是不能并行的??偨Y(jié)就是LinkedBlockingQueue可以并發(fā)讀寫。
  2. ArrayBlockingQueueLinkedBlockingQueue間還有一個(gè)明顯的不同之處在于,前者在插入或刪除元素時(shí)不會(huì)產(chǎn)生或銷毀任何額外的對(duì)象實(shí)例,而后者則會(huì)生成一個(gè)額外的Node對(duì)象。這在長(zhǎng)時(shí)間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的系統(tǒng)中,其對(duì)于GC的影響還是存在一定的區(qū)別。
DelayQueue

DelayQueue是一個(gè)無界阻塞隊(duì)列,只有在延遲期滿時(shí)才能從中提取元素。該隊(duì)列的頭部是延遲期滿后保存時(shí)間最長(zhǎng)的Delayed元素。
??存放到DelayDeque的元素必須繼承Delayed接口。Delayed接口使對(duì)象成為延遲對(duì)象,它使存放在DelayQueue類中的對(duì)象具有了激活日期,該接口強(qiáng)制執(zhí)行下列兩個(gè)方法:

  1. CompareTo(Delayed o):Delayed接口繼承了Comparable接口,因此有了這個(gè)方法
  2. getDelay(TimeUnit unit):這個(gè)方法返回到激活日期的剩余時(shí)間,時(shí)間單位由單位參數(shù)指定

使用場(chǎng)景

  1. 關(guān)閉空閑連接。服務(wù)器中,有很多客戶端的連接,空閑一段時(shí)間之后需要關(guān)閉之。
  2. 緩存。緩存中的對(duì)象,超過了空閑時(shí)間,需要從緩存中移出。
  3. 任務(wù)超時(shí)處理。在網(wǎng)絡(luò)協(xié)議滑動(dòng)窗口請(qǐng)求應(yīng)答式交互時(shí),處理超時(shí)未響應(yīng)的請(qǐng)求。
PriorityBlockingQueue
SynchronousQueue

它是一個(gè)特殊的隊(duì)列,它的名字其實(shí)就蘊(yùn)含了它的特征 – - 同步的隊(duì)列。為什么說是同步的呢?這里說的并不是多線程的并發(fā)問題,而是因?yàn)楫?dāng)一個(gè)線程往隊(duì)列中寫入一個(gè)元素時(shí),寫入操作不會(huì)立即返回,需要等待另一個(gè)線程來將這個(gè)元素拿走;同理,當(dāng)一個(gè)讀線程做讀操作的時(shí)候,同樣需要一個(gè)相匹配的寫線程的寫操作。這里的 Synchronous 指的就是讀線程和寫線程需要同步,一個(gè)讀線程匹配一個(gè)寫線程,同理一個(gè)寫線程匹配一個(gè)讀線程。

不像ArrayBlockingQueueLinkedBlockingDeque之類的阻塞隊(duì)列依賴AQS實(shí)現(xiàn)并發(fā)操作,SynchronousQueue直接使用CAS實(shí)現(xiàn)線程的安全訪問。

較少使用到 SynchronousQueue 這個(gè)類,不過它在線程池的實(shí)現(xiàn)類 ScheduledThreadPoolExecutor 中得到了應(yīng)用。

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    //內(nèi)部棧
    static final class TransferStack<E> extends Transferer<E> {}
    //內(nèi)部隊(duì)列
    static final class TransferQueue<E> extends Transferer<E> {}
    public SynchronousQueue() {this(false);}
    public SynchronousQueue(boolean fair) {
        transferer = fair ? 
                 new TransferQueue<E>() : new TransferStack<E>();
    }
}

可以參考:java并發(fā)之SynchronousQueue實(shí)現(xiàn)原理

原文鏈接:http://m.itdecent.cn/p/37df9fe68ece

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容