????CountDownLatch是一個同步工具類,它允許一個或多個線程一直等待,直到其他線程執(zhí)行完后再執(zhí)行。
????它是通過一個計數(shù)器來實現(xiàn)的,計數(shù)器的初始化值為線程的數(shù)量。每當一個線程完成了自己的任務后,計數(shù)器的值就相應得減1。當計數(shù)器到達0時,表示所有的線程都已完成任務,然后在閉鎖上等待的線程就可以恢復執(zhí)行任務。
CountDownLatch的組成

????首先來一下Sync,它是CountDownLatch的同步控件(計數(shù)器),使用AQS狀態(tài)表示計數(shù)。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
// 重寫AQS獲取共享鎖的方法,AQSstate==0的時候,表示鎖是空閑的
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 重寫AQS釋放共享鎖的方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
// 計數(shù)-1,CAS更新計數(shù)。等于0的時候釋放成功
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
構造函數(shù)
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
構造函數(shù)就是初始化Sync的計數(shù),也就是AQS的state屬性
等待方法
await 使當前線程等待,直到計數(shù)到零為止。除非該線程被中斷了
public void await() throws InterruptedException {
// 調(diào)用AQS的獲取共享鎖的方法(可中斷),加入同步隊列,并掛起當前線程
sync.acquireSharedInterruptibly(1);
}
減少計數(shù)方法
減少計數(shù),如果計數(shù)達到零,則釋放所有等待線程。
public void countDown() {
sync.releaseShared(1);
}
????releaseShared源碼里可以看到只釋放了head.next節(jié)點的線程,它是怎么釋放所有等待線程的?
????這里貼一下releaseShared的源碼,可以看到releaseShared調(diào)用了tryReleaseShared獲取共享模式的鎖,如果獲取成功了則調(diào)用doReleaseShared喚醒后繼節(jié)點
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared必須要子類實現(xiàn),下面看下CountDownLatch的tryReleaseShared
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
// 減CountDownLatch計數(shù)
int nextc = c-1;
if (compareAndSetState(c, nextc))
// 如果計數(shù)是0那么返回true
return nextc == 0;
}
}
如果tryReleaseShared返回true就會調(diào)用doReleaseShared喚醒后繼節(jié)點
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 舊head的waitStatus=-1,那么調(diào)用unparkSuccessor喚醒后繼節(jié)點
// 并且把舊head的waitStatus更新成0
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 舊head的waitStatus已經(jīng)是0了,那么更新成-3,也就是PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// head沒有變過,break
if (h == head) // loop if head changed
break;
}
}
????關鍵點就在調(diào)用unparkSuccessor這,喚醒了后繼節(jié)點的線程,那么這個被掛起的線程是在哪被掛起的呢?是調(diào)用了CountDownLatch的await方法之后最終調(diào)到AQS的doAcquireSharedInterruptibly方法,parkAndCheckInterrupt里面掛起的
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
????被喚醒的節(jié)點被掛起的線程,還會繼續(xù)循環(huán),此時tryAcquireShared的返回值一定會大于0,因為此時CountDownLatch的計數(shù)是0(上面tryAcquireShared方法)。那么進入到setHeadAndPropagate方法,這個方法會喚醒node.next節(jié)點的線程,依次類推,node.next還會喚醒node.next.next
????也就是說調(diào)用countDown方法,如果CountDownLatch的計數(shù)是0了,那么會喚醒head.next節(jié)點的線程,被喚醒的線程還會喚醒他后繼節(jié)點的線程.