CountDownLatch

????CountDownLatch是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程執(zhí)行完后再執(zhí)行。
????它是通過一個計數(shù)器來實現(xiàn)的,計數(shù)器的初始化值為線程的數(shù)量。每當一個線程完成了自己的任務后,計數(shù)器的值就相應得減1。當計數(shù)器到達0時,表示所有的線程都已完成任務,然后在閉鎖上等待的線程就可以恢復執(zhí)行任務。

CountDownLatch的組成

image.png

????首先來一下Sync,它是CountDownLatch的同步控件(計數(shù)器),使用AQS狀態(tài)表示計數(shù)。

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }
      
        // 重寫AQS獲取共享鎖的方法,AQSstate==0的時候,表示鎖是空閑的
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
      
        // 重寫AQS釋放共享鎖的方法
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                 // 計數(shù)-1,CAS更新計數(shù)。等于0的時候釋放成功
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
構造函數(shù)
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

構造函數(shù)就是初始化Sync的計數(shù),也就是AQS的state屬性

等待方法

await 使當前線程等待,直到計數(shù)到零為止。除非該線程被中斷了

  public void await() throws InterruptedException {
         // 調(diào)用AQS的獲取共享鎖的方法(可中斷),加入同步隊列,并掛起當前線程
        sync.acquireSharedInterruptibly(1);
    }
減少計數(shù)方法

減少計數(shù),如果計數(shù)達到零,則釋放所有等待線程。

 public void countDown() {
        sync.releaseShared(1);
    }

????releaseShared源碼里可以看到只釋放了head.next節(jié)點的線程,它是怎么釋放所有等待線程的?
????這里貼一下releaseShared的源碼,可以看到releaseShared調(diào)用了tryReleaseShared獲取共享模式的鎖,如果獲取成功了則調(diào)用doReleaseShared喚醒后繼節(jié)點

 public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared必須要子類實現(xiàn),下面看下CountDownLatch的tryReleaseShared

protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                // 減CountDownLatch計數(shù)
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    // 如果計數(shù)是0那么返回true
                    return nextc == 0;
            }
        }

如果tryReleaseShared返回true就會調(diào)用doReleaseShared喚醒后繼節(jié)點

 private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 舊head的waitStatus=-1,那么調(diào)用unparkSuccessor喚醒后繼節(jié)點
                 // 并且把舊head的waitStatus更新成0
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                // 舊head的waitStatus已經(jīng)是0了,那么更新成-3,也就是PROPAGATE
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // head沒有變過,break
            if (h == head)                   // loop if head changed
                break;
        }
    }

????關鍵點就在調(diào)用unparkSuccessor這,喚醒了后繼節(jié)點的線程,那么這個被掛起的線程是在哪被掛起的呢?是調(diào)用了CountDownLatch的await方法之后最終調(diào)到AQS的doAcquireSharedInterruptibly方法,parkAndCheckInterrupt里面掛起的

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

????被喚醒的節(jié)點被掛起的線程,還會繼續(xù)循環(huán),此時tryAcquireShared的返回值一定會大于0,因為此時CountDownLatch的計數(shù)是0(上面tryAcquireShared方法)。那么進入到setHeadAndPropagate方法,這個方法會喚醒node.next節(jié)點的線程,依次類推,node.next還會喚醒node.next.next

????也就是說調(diào)用countDown方法,如果CountDownLatch的計數(shù)是0了,那么會喚醒head.next節(jié)點的線程,被喚醒的線程還會喚醒他后繼節(jié)點的線程.

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容