CountDownLatch的使用與解析

引言

CountDownLatch是jdk1.5開始concurrent包里提供的,并發(fā)編程工具類。

這個(gè)類能夠使一個(gè)線程等待其他線程完成各自的工作后再執(zhí)行,可用于多線程的并發(fā)執(zhí)行。

例如,應(yīng)用程序的主線程希望在多個(gè)網(wǎng)絡(luò)請求線程并發(fā)執(zhí)行完后,刷新頁面,避免串行請求導(dǎo)致網(wǎng)絡(luò)請求耗時(shí)長。

CountDownLatch的使用

CountDownLatch的主要使用步驟是

1、初始化,指定線程個(gè)數(shù),CountDownLatch latch = new CountDownLatch(3);

參數(shù)4代表線程的總數(shù)

2、每個(gè)線程執(zhí)行后執(zhí)行latch.countDown();,代表一個(gè)線程執(zhí)行完成,待完成的線程數(shù)減1。

3、在線程添加latch.await();,阻塞該線程,等待其他子線程完成。

Demo如下

package com.example.zzh.myapplication;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        // Let us create task that is going to
        // wait for four threads before it starts
        CountDownLatch latch = new CountDownLatch(3);

        long start = System.currentTimeMillis();

        // Let us create four worker
        // threads and start them.
        WorkerThread first = new WorkerThread(1000, latch, "worker-1");
        WorkerThread second = new WorkerThread(2000, latch, "worker-2");
        WorkerThread third = new WorkerThread(3000, latch, "worker-3");

        first.start();
        second.start();
        third.start();

        // The main task waits for four threads
        latch.await();

        // Main thread has started
        System.out.println(Thread.currentThread().getName() + " has finished. Spend Time = " + (System.currentTimeMillis() - start));
    }

    // A class to represent threads for which
    // the main thread waits.
    static class WorkerThread extends Thread {

        private int delay;
        private CountDownLatch latch;

        public WorkerThread(int delay, CountDownLatch latch, String name) {
            super(name);
            this.delay = delay;
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(delay);
                latch.countDown();
                System.out.println(Thread.currentThread().getName() + " finished");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

運(yùn)行結(jié)果

worker-1 finished
worker-2 finished
worker-3 finished
main has finished. Spend Time = 3006

CountDownLatch的解析

1、什么是AQS(AbstractQueuedSynchronizer)

深入CountDownLatch源碼,需要了解AQS(AbstractQueuedSynchronizer),因?yàn)镃ountDownLatch的底層原理是通過AQS(AbstractQueuedSynchronizer)里面的共享鎖來實(shí)現(xiàn)的。

推薦閱讀:【死磕Java并發(fā)】—–J.U.C之AQS(一篇就夠了)

以下是上述文章的引用:

AQS:AbstractQueuedSynchronizer,即隊(duì)列同步器。它是構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,JUC并發(fā)包的作者(Doug Lea)期望它能夠成為實(shí)現(xiàn)大部分同步需求的基礎(chǔ)。它是JUC并發(fā)包中的核心基礎(chǔ)組件。

AQS解決了實(shí)現(xiàn)同步器時(shí)涉及當(dāng)?shù)拇罅考?xì)節(jié)問題,例如獲取同步狀態(tài)、FIFO同步隊(duì)列。基于AQS來構(gòu)建同步器可以帶來很多好處。它不僅能夠極大地減少實(shí)現(xiàn)工作,而且也不必處理在多個(gè)位置上發(fā)生的競爭問題。

AQS使用一個(gè)int類型的成員變量state來表示同步狀態(tài),當(dāng)state>0時(shí)表示已經(jīng)獲取了鎖,當(dāng)state = 0時(shí)表示釋放了鎖。它提供了三個(gè)方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態(tài)state進(jìn)行操作,當(dāng)然AQS可以確保對state的操作是安全的。

AQS通過內(nèi)置的FIFO同步隊(duì)列來完成資源獲取線程的排隊(duì)工作,如果當(dāng)前線程獲取同步狀態(tài)失?。ㄦi)時(shí),AQS則會將當(dāng)前線程以及等待狀態(tài)等信息構(gòu)造成一個(gè)節(jié)點(diǎn)(Node)并將其加入同步隊(duì)列,同時(shí)會阻塞當(dāng)前線程,當(dāng)同步狀態(tài)釋放時(shí),則會把節(jié)點(diǎn)中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。

AQS的使用方式是繼承,子類通過繼承同步器并實(shí)現(xiàn)它的抽象方法來管理同步狀態(tài)。AQS提供了獨(dú)占鎖和共享鎖必須實(shí)現(xiàn)的方法。

共享鎖則是一種樂觀鎖,它放寬了加鎖策略,允許多個(gè)執(zhí)行讀操作的線程同時(shí)訪問共享資源。對應(yīng)的是獨(dú)占鎖,是一種悲觀鎖,它避免了讀/讀沖突,如果某個(gè)只讀線程獲取鎖,則其他讀線程都只能等待,這樣就限制了不必要的并發(fā)性,因?yàn)樽x操作并不會影響數(shù)據(jù)的一致性。

在AQS中,共享鎖獲取鎖,節(jié)點(diǎn)模式則為Node.SHARED。獨(dú)占鎖獲取鎖時(shí),設(shè)置節(jié)點(diǎn)模式為Node.EXCLUSIVE

CountDownLatch使用的是共享鎖,繼承AQS的方法有:

  • tryAcquireShared(int arg):共享式獲取同步狀態(tài),返回值大于等于0則表示獲取成功,否則獲取失?。?/p>

  • tryReleaseShared(int arg):共享式釋放同步狀態(tài)。

上面Demo的隊(duì)列同步器模型如下(參考這里

image.png

2、初始化源碼解析

/**
 * Synchronization control For CountDownLatch.
 * Uses AQS state to represent count.
 */
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState(); // 獲取主存中的state值
            if (c == 0)
                return false; //state已經(jīng)為0 直接退出
            int nextc = c-1; // 減一 準(zhǔn)備cas更新該值
            if (compareAndSetState(c, nextc)) //cas更新status值為nextc
                return nextc == 0; //更新成功 判斷是否為0 退出;更新失敗則繼續(xù)for循環(huán),直到線程并發(fā)更新成功
        }
    }
}

private final Sync sync;

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

初始化做的工作是創(chuàng)建同步器實(shí)例,這個(gè)同步器就是上文提到的繼承AQS的類,并實(shí)現(xiàn)共享鎖方法。

3、latch.countDown()解析

public void countDown() {
    sync.releaseShared(1);
}
    
//AbstractQueuedSynchronizer.java
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

其中tryReleaseShared是上文實(shí)現(xiàn)的方法,主要的工作是CAS更新state值減一,并判斷是否為0,如果為0返回true,說明所有線程都執(zhí)行完成,可以做喚醒的工作doReleaseShared。

//AbstractQueuedSynchronizer.java
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

上面的邏輯是:

如果當(dāng)前節(jié)點(diǎn)是SIGNAL意味著,它正在等待一個(gè)信號,或者說它在等待被喚醒,因此做兩件事,一是重置waitStatus標(biāo)志位,二是重置成功后,喚醒下一個(gè)節(jié)點(diǎn)。

如果本身頭節(jié)點(diǎn)的waitStatus是出于重置狀態(tài)(waitStatus==0)的,將其設(shè)置為“傳播”狀態(tài)。意味著需要將狀態(tài)向后一個(gè)節(jié)點(diǎn)傳播。

這個(gè)死循環(huán),退出的路只有一條,那就是h==head,即該線程是頭節(jié)點(diǎn),且狀態(tài)為共享狀態(tài)。

4、latch.await()解析

await是阻塞當(dāng)前線程(中斷被拋中斷異常),等待被喚醒,源碼如下

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

//AbstractQueuedSynchronizer.java
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

上面的邏輯是:

如果線程被中斷,則拋出異常。然后判斷tryAcquireShared方法的返回值是否小于0,這個(gè)方法是第2步初始化實(shí)現(xiàn)的,當(dāng)(getState() == 0)時(shí)則返回1,否則返回-1,即當(dāng)state還沒有減少到0時(shí),則執(zhí)行doAcquireSharedInterruptibly(arg)

//AbstractQueuedSynchronizer.java
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);// 往同步隊(duì)列中添加節(jié)點(diǎn)
    boolean failed = true;
    try {
        for (;;) { // 一個(gè)死循環(huán) 跳出循環(huán)只有下面兩個(gè)途徑
            final Node p = node.predecessor(); // 當(dāng)前線程的前一個(gè)節(jié)點(diǎn)
            if (p == head) {
                int r = tryAcquireShared(arg); //當(dāng)getState()==0時(shí)則返回1,否則返回-1
                if (r >= 0) {
                    setHeadAndPropagate(node, r);// 處理后續(xù)節(jié)點(diǎn)
                    p.next = null; // help GC
                    failed = false;
                    return;//當(dāng)getState為0,并且為頭節(jié)點(diǎn),則跳出循環(huán)
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();// 響應(yīng)打斷 跳出循環(huán)
        }
    } finally {
        if (failed)
            cancelAcquire(node); //如果是打斷退出的,則移除同步隊(duì)列節(jié)點(diǎn)
    }
}

在同步隊(duì)列中掛起的線程,它們自旋的形式查看自己是否滿足條件醒來(state==0,且為頭節(jié)點(diǎn)),如果成立(即被喚醒),將調(diào)用setHeadAndPropagate這個(gè)方法

private void setHeadAndPropagate(Node node, int propagate) {
     Node h = head; // Record old head for check below
     setHead(node);
     if (propagate > 0 || h == null || h.waitStatus < 0) {
         Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

這個(gè)方法是將當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn),且它也調(diào)用了doReleaseShared這個(gè)方法,在第3步解析latch.countDown中提到,這個(gè)方法就是將頭節(jié)點(diǎn)設(shè)置為共享狀態(tài)的,由此,共享狀態(tài)傳播下去。

擴(kuò)展內(nèi)容

1、CountDownLatch的優(yōu)缺點(diǎn)

優(yōu)點(diǎn):

對使用者而言,你只需要傳入一個(gè)int型變量控制任務(wù)數(shù)量即可,至于同步隊(duì)列的出隊(duì)入隊(duì)維護(hù),state變量值的維護(hù)對使用者都是透明的,使用方便。

缺點(diǎn):

CountDownLatch設(shè)置了state后就不能更改,也不能循環(huán)使用。

2、CountDownLatch的超時(shí)處理

如果線程等待超過一定時(shí)間,可以取消阻塞被喚醒,那么可以通過設(shè)置await的參數(shù)

//等待超過2s,自動被喚醒
latch.await(2000, TimeUnit.MILLISECONDS);

參考

Java CountDownLatch解析(上)

Java CountDownLatch解析(下)

【死磕Java并發(fā)】—–J.U.C之AQS(一篇就夠了)

Java并發(fā)-獨(dú)占鎖與共享鎖

java共享鎖實(shí)現(xiàn)原理及CountDownLatch解析

CountDownLatch in Java

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文是我自己在秋招復(fù)習(xí)時(shí)的讀書筆記,整理的知識點(diǎn),也是為了防止忘記,尊重勞動成果,轉(zhuǎn)載注明出處哦!如果你也喜歡,那...
    波波波先森閱讀 11,652評論 4 56
  • CountDownLatch 介紹 CountDownLatch是一個(gè)同步協(xié)助類,允許一個(gè)或多個(gè)線程等待,直到其他...
    tomas家的小撥浪鼓閱讀 3,411評論 0 9
  • 理解多線程的并發(fā)鎖,可結(jié)合多進(jìn)程的分布式鎖(如Zookeeper的互斥鎖、讀寫鎖的實(shí)現(xiàn)原理),本質(zhì)是相通的 介紹 ...
    jiangmo閱讀 822評論 0 1
  • 雙十一最想打折的是什么? 自己的手。
    ifuntouch閱讀 141評論 0 0
  • 將那一點(diǎn)童話的玻璃心 小心封存 不讓變了模樣的自己 去褻瀆 于是 美好從不可奢求 變成了 人們心底的柔軟
    久又九閱讀 416評論 0 2

友情鏈接更多精彩內(nèi)容