最近工作不飽和,寫寫文章充充電。何以解憂,唯有Coding。后續(xù)更新的文章涉及的方向有:ThreadPoolExecutor、Spring、MyBatis、ReentrantLock、CyclicBarrier、Semaphore.
同系列文章:
1.看ThreadPoolExecutor源碼前的騷操作

開始講解之前,自定義ThreadPoolExecutor和Task。
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public static class CustomTask<V> extends FutureTask<V> {
public CustomTask(Callable<V> callable) {
super(callable);
}
public CustomTask(Runnable runnable, V result) {
super(runnable, result);
}
}
}
為什么要需要線程池 ?
線程資源必須通過線程池提供,不允許在應(yīng)用中自行顯式的創(chuàng)建線程。使用線程池的好處是減少在創(chuàng)建和銷毀線程上所消耗的時(shí)間以及系統(tǒng)資源的開銷,解決資源不足的問題。如果不使用線程池的話,有可能造成系統(tǒng)創(chuàng)建大量的同類線程而消耗完線程而導(dǎo)致消耗完內(nèi)存或者過度切換的問題。為什么要不能通過Executors去創(chuàng)建線程池?
因?yàn)镕ixedThreadPool和SingleThreadPool使用的是無界隊(duì)列,會(huì)堆積大量的請(qǐng)求,造成OOM。還有CachedThreadPool和ScheduledThreadPool會(huì)造成Integer.MAX_VALUE,會(huì)創(chuàng)建大量的線程,造成OOM
分析CountDownLatch
CountDownLatch用于協(xié)調(diào)多個(gè)線程的同步,能讓一個(gè)線程在等待其他線程執(zhí)行完任務(wù)后,再繼續(xù)執(zhí)行。內(nèi)部是通過一個(gè)計(jì)數(shù)器去完成實(shí)現(xiàn)。
靜態(tài)內(nèi)部類Sync繼承AQS,通過state變量完成計(jì)數(shù)器的實(shí)現(xiàn)。
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
CountDownLatch構(gòu)造方法,count代表需要執(zhí)行的線程數(shù)量。
/**
* Constructs a {@code CountDownLatch} initialized with the given count.
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
在計(jì)數(shù)值 > 0的情況下,每當(dāng)一個(gè)線程完成任務(wù),計(jì)數(shù)減去1。
/**
* Decrements the count of the latch, releasing all waiting threads if
* the count reaches zero.
*
* <p>If the current count is greater than zero then it is decremented.
* If the new count is zero then all waiting threads are re-enabled for
* thread scheduling purposes.
*
* <p>If the current count equals zero then nothing happens.
*/
public void countDown() {
sync.releaseShared(1);
}
讓當(dāng)前線程等待其他線程,直到計(jì)數(shù)為0,除非當(dāng)前線程被中斷了。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
讓當(dāng)前線程等待其他線程,如果超過指定的timeout時(shí)間范圍,那么忽略要等待的線程,
直接執(zhí)行。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
返回當(dāng)前計(jì)數(shù)量。
/**
* Returns the current count.
*
* <p>This method is typically used for debugging and testing purposes.
*
* @return the current count
*/
public long getCount() {
return sync.getCount();
}
當(dāng)計(jì)數(shù)器應(yīng)該為0,所有的線程執(zhí)行完自己的任務(wù)。在CountDownLatch等待的線程,可以繼續(xù)執(zhí)行的任務(wù)。
使用場(chǎng)景
適用于一個(gè)任務(wù)的執(zhí)行需要等待其他任務(wù)執(zhí)行完畢,方可執(zhí)行的場(chǎng)景。
場(chǎng)景1:一群學(xué)生在教室考試,學(xué)生們都完成了作答,老師才可以進(jìn)行收卷操作。
場(chǎng)景2:110跨欄比賽中,所有運(yùn)動(dòng)員準(zhǔn)備好起跑姿勢(shì),進(jìn)入到預(yù)備狀態(tài),等待裁判一聲槍響。裁判開了槍,所有運(yùn)動(dòng)員才可以開跑。
CountDownLatch是一次性的,只能通過構(gòu)造方法設(shè)置初始計(jì)數(shù)量,計(jì)數(shù)完了無法進(jìn)行復(fù)位,不能達(dá)到復(fù)用。而CyclicBarrier可以實(shí)現(xiàn)復(fù)用。
場(chǎng)景1
一個(gè)線程的執(zhí)行要等待其他線程執(zhí)行完畢后,才能繼續(xù)執(zhí)行。
public static void test1() {
final CountDownLatch countDownLatch = new CountDownLatch(2);
ExecutorService executorService = new CustomThreadPoolExecutor(2,
2, 0L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
for (int i = 0; i < 2; i++) {
CustomThreadPoolExecutor.CustomTask task = new CustomThreadPoolExecutor.CustomTask(new Runnable() {
@Override
public void run() {
System.out.println("子線程" + Thread.currentThread().getName()
+ "正在執(zhí)行...");
System.out.println("子線程" + Thread.currentThread().getName()
+ "執(zhí)行完畢...");
countDownLatch.countDown();
}
}, "success");
executorService.submit(task);
}
try {
System.out.println("等待2個(gè)線程...");
countDownLatch.await();
executorService.shutdown();
System.out.println("2個(gè)線程執(zhí)行完畢...");
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
輸出結(jié)果
等待2個(gè)線程...
子線程pool-1-thread-2正在執(zhí)行...
子線程pool-1-thread-2執(zhí)行完畢...
子線程pool-1-thread-1正在執(zhí)行...
子線程pool-1-thread-1執(zhí)行完畢...
2個(gè)線程執(zhí)行完畢..
場(chǎng)景2
多個(gè)線程在某一個(gè)時(shí)刻同時(shí)執(zhí)行。
public static void test2() {
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch end = new CountDownLatch(10);
ExecutorService executorService = new CustomThreadPoolExecutor(10,
10, 0L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
for (int i = 0; i < 10; i++) {
CustomThreadPoolExecutor.CustomTask task = new CustomThreadPoolExecutor.CustomTask(new Runnable() {
@Override
public void run() {
try {
System.out.println("子線程" + Thread.currentThread().getName()
+ "正在執(zhí)行...");
start.await();
System.out.println("子線程" + Thread.currentThread().getName()
+ "執(zhí)行完畢...");
} catch (InterruptedException ex) {
ex.printStackTrace();
} finally {
end.countDown();
}
}
}, "success");
executorService.submit(task);
}
start.countDown();
try {
System.out.println("等待10個(gè)線程...");
end.await();
executorService.shutdown();
System.out.println("10個(gè)線程執(zhí)行完畢...");
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
輸出結(jié)果
子線程pool-1-thread-2正在執(zhí)行...
子線程pool-1-thread-1正在執(zhí)行...
子線程pool-1-thread-3正在執(zhí)行...
子線程pool-1-thread-4正在執(zhí)行...
子線程pool-1-thread-5正在執(zhí)行...
子線程pool-1-thread-6正在執(zhí)行...
子線程pool-1-thread-7正在執(zhí)行...
等待10個(gè)線程...
子線程pool-1-thread-2執(zhí)行完畢...
子線程pool-1-thread-8正在執(zhí)行...
子線程pool-1-thread-8執(zhí)行完畢...
子線程pool-1-thread-1執(zhí)行完畢...
子線程pool-1-thread-3執(zhí)行完畢...
子線程pool-1-thread-4執(zhí)行完畢...
子線程pool-1-thread-10正在執(zhí)行...
子線程pool-1-thread-10執(zhí)行完畢...
子線程pool-1-thread-5執(zhí)行完畢...
子線程pool-1-thread-6執(zhí)行完畢...
子線程pool-1-thread-7執(zhí)行完畢...
子線程pool-1-thread-9正在執(zhí)行...
子線程pool-1-thread-9執(zhí)行完畢...
10個(gè)線程執(zhí)行完畢...
CountDownLatch源碼分析
CountDownLatch的構(gòu)造方法初始計(jì)數(shù)器值,是通過其內(nèi)部類Sync的構(gòu)造方法來實(shí)現(xiàn)的。
Sync(int count) {
setState(count);
}
AQS中的state變量可以表示狀態(tài)。對(duì)于ReentrantLock而言,代表著鎖獲取的次數(shù)。而對(duì)于CountDownLatch代表著計(jì)數(shù)器的值。state變量通過volatile修飾,具有可見性,可以在多個(gè)線程中共享變量。
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
AQS中的CAS操作,使其state變量具有原子性。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
我們來看一下AQS中的Node節(jié)點(diǎn)結(jié)構(gòu)。
- 當(dāng)waitStatus = CANCELLED時(shí),說明因?yàn)槌瑫r(shí)或者中斷,節(jié)點(diǎn)會(huì)被設(shè)置為取消狀態(tài)。處于取消狀態(tài)的節(jié)點(diǎn)不會(huì)參與到競(jìng)爭(zhēng)中。它會(huì)一直保持取消狀態(tài),會(huì)轉(zhuǎn)變到其他狀態(tài)。
- 當(dāng)waitStatus = SIGNAL時(shí),說明當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)處于等待狀態(tài)。而當(dāng)前節(jié)點(diǎn)的線程如果釋放了同步狀態(tài)或者被取消,將會(huì)通知后繼節(jié)點(diǎn),使后繼節(jié)點(diǎn)的線程可以得到運(yùn)行。
- 當(dāng)waitStatus = CONDITION,說明該節(jié)點(diǎn)在等待隊(duì)列中,節(jié)點(diǎn)線程等待在Condition上。當(dāng)其他線程對(duì)Condition調(diào)用了signal()后,該節(jié)點(diǎn)會(huì)從等待隊(duì)列中轉(zhuǎn)移到同步隊(duì)列中,加入到同步狀態(tài)的獲取中。
- 當(dāng)waitStatus = PROPAGATE,說明下一次共享式獲取同步狀態(tài),將會(huì)無條件的傳播下去。
static final class Node {
/** 共享 */
static final Node SHARED = new Node();
/** 獨(dú)占 */
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/** 等待狀態(tài) */
volatile int waitStatus;
/** 前驅(qū)節(jié)點(diǎn) */
volatile Node prev;
/** 后繼節(jié)點(diǎn) */
volatile Node next;
/** 獲取同步狀態(tài)的線程 */
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
countDown()實(shí)際是調(diào)用AQS中的releaseShared方法中,達(dá)到計(jì)數(shù)減1的目的。
public void countDown() {
sync.releaseShared(1);
}

以共享模式去釋放鎖,如果tryReleaseShared方法釋放鎖成功,則執(zhí)行AQS中的doReleaseShared方法去喚醒等待線程,并且返回true;否則返回false,說明鎖釋放失敗。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared在CountDownLatch中被重寫。通過輪詢 + CAS方式達(dá)到釋放鎖的目的。第一次循環(huán)的時(shí)候判斷當(dāng)前state變量,如果等于0,說明計(jì)數(shù)器值為0或者說鎖沒有被持有,可以直接返回false。然后進(jìn)行CAS操作,讓獲取鎖的次數(shù)減少1或者說計(jì)數(shù)器值減少1。如果nextc等于0,說明計(jì)數(shù)值為0或者持有鎖的次數(shù)為0,可以讓喚醒等待的線程,所以返回true,否則返回false,代表釋放鎖失敗。
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
首先獲得head節(jié)點(diǎn)。如果head節(jié)點(diǎn)不等于空且head節(jié)點(diǎn)不等于tail節(jié)點(diǎn),獲得head節(jié)點(diǎn)的waitStatus。判斷當(dāng)前head節(jié)點(diǎn)狀態(tài)是否是SINGAL。處于SINGAL狀態(tài)的節(jié)點(diǎn),說明當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)處于被喚醒的狀態(tài)。如果CAS操作將head節(jié)點(diǎn)的waitStatus重置為0失敗,那么跳出當(dāng)前循環(huán),繼續(xù)執(zhí)行下一次循環(huán)(重新檢查)。如果重置成功,那么調(diào)用unparkSuccessor方法喚醒后繼節(jié)點(diǎn)。 如果當(dāng)前head節(jié)點(diǎn)狀態(tài)等于0,通過CAS操作將waitStatus設(shè)置為PROPAGATE(傳播)狀態(tài),確??梢韵蚝笠粋€(gè)節(jié)點(diǎn)傳播下去。如果CAS操作失敗,那么當(dāng)前循環(huán),繼續(xù)執(zhí)行下一次循環(huán)。最后的h == head,是判斷head節(jié)點(diǎn)是否發(fā)生變化。如果沒有發(fā)生變化,結(jié)束循環(huán)。如果發(fā)生變化,必須再次循環(huán)。
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
獲取head節(jié)點(diǎn)的waitStatus,如果小于0,進(jìn)行CAS操作重置為0。獲取head節(jié)點(diǎn)的后繼節(jié)點(diǎn),如果后繼節(jié)點(diǎn)等于null或者后繼節(jié)點(diǎn)的waitStaus大于0(說明后繼節(jié)點(diǎn)處于CANCELLED狀態(tài)),那么從隊(duì)列從尾部往前進(jìn)行遍歷尋找waitStatus小于等于0的節(jié)點(diǎn)。如果這個(gè)遍歷出來的節(jié)點(diǎn)不等于null的話,那么通過LockSupport.unpark()喚醒這個(gè)節(jié)點(diǎn)中的線程。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
分析完了CountDownLatch的countDown()方法,接著擼await()方法

await方法會(huì)使當(dāng)前線程在計(jì)數(shù)器值為0之前,一直處于等待狀態(tài),除非中斷。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
AQS中的acquireSharedInterruptibly方法,會(huì)判斷線程是否中斷。如果中斷, 拋出InterruptedException異常。值得注意的是Thread.interrupted()方法,是測(cè)試當(dāng)前線程是否中斷。該方法會(huì)清除線程的中斷狀態(tài)。換句話說,如果調(diào)用這個(gè)方法2次,那么第二次會(huì)直接返回false,除非當(dāng)前線程在第一次調(diào)用之后再次被中斷。如果tryAcquireShared()小于0(說明該計(jì)數(shù)器值大于0),繼續(xù)執(zhí)行doAcquireSharedInterruptibly。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
在CountDownLatch中重寫了AQS中的tryAcquireShared()方法,這里只是簡(jiǎn)單的判斷state變量。如果state等于0(說明計(jì)數(shù)值為0),返回1,否則返回-1(說明計(jì)數(shù)器值大于0)。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
很明顯,是通過輪詢的方式去獲取共享鎖。首先將當(dāng)前線程包裝成類型為SHARED的節(jié)點(diǎn),標(biāo)志為共享類型的節(jié)點(diǎn)。獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)。如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)為head節(jié)點(diǎn)的話,說明該節(jié)點(diǎn)是在AQS隊(duì)列中等待獲取鎖的第一個(gè)節(jié)點(diǎn)。調(diào)用CountDownLatch中的tryAcquireShared()嘗試去獲取鎖。返回的值大于0的話,說明獲取鎖成功。如果獲取共享鎖成功,那么把當(dāng)前節(jié)點(diǎn)設(shè)置為AQS同步隊(duì)列中的head節(jié)點(diǎn),同時(shí)將p.next置為null(方便GC)?;氐筋^看,如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)不是head節(jié)點(diǎn)或者獲取鎖失敗,我們需要調(diào)用shouldParkAfterFailedAcquire()方法判斷當(dāng)前線程是否需要掛起,如果需要掛起調(diào)用 parkAndCheckInterrupt()
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
通過當(dāng)前線程構(gòu)造出Node節(jié)點(diǎn),mode用于標(biāo)志是獨(dú)占還是共享。如果隊(duì)列非空,則快速入隊(duì)。通過CAS將node節(jié)點(diǎn)置為tail節(jié)點(diǎn),并返回node節(jié)點(diǎn)。如果CAS失敗或者隊(duì)列為空,那么通過enq()入隊(duì)。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
enq中通過死循環(huán)的方式保證節(jié)點(diǎn)的正確插入。如果隊(duì)列為空,那么創(chuàng)建一個(gè)新的節(jié)點(diǎn)。通過CAS將新節(jié)點(diǎn)設(shè)置為head節(jié)點(diǎn),同時(shí)也將head節(jié)點(diǎn)設(shè)置為tail節(jié)點(diǎn)。當(dāng)隊(duì)列只有一個(gè)元素時(shí),head節(jié)點(diǎn)等于tail節(jié)點(diǎn)。在循環(huán)中,唯一跳出循環(huán)的條件是通過CAS將node節(jié)點(diǎn)設(shè)置為tail節(jié)點(diǎn)。這樣的話,enq方法將并發(fā)插入節(jié)點(diǎn)的請(qǐng)求變得串行化了。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
這里將node節(jié)點(diǎn)設(shè)置為head節(jié)點(diǎn)。經(jīng)過一些條件判斷后,獲取head節(jié)點(diǎn)的后繼節(jié)點(diǎn)。如果后繼節(jié)點(diǎn)等于null或者后繼節(jié)點(diǎn)也是共享節(jié)點(diǎn),那么調(diào)用doReleaseShared去喚醒它。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
如果pred節(jié)點(diǎn)(node節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn))的狀態(tài)是SIGNAL,說明該pred節(jié)點(diǎn)的線程如果釋放了同步狀態(tài)或者被取消,會(huì)通知其后繼節(jié)點(diǎn)(也就是node節(jié)點(diǎn))。所以我們可以安全讓node節(jié)點(diǎn)的線程掛起。如果pred節(jié)點(diǎn)處于取消狀態(tài),我們進(jìn)行死循環(huán), 直到pred節(jié)點(diǎn)的狀態(tài)不是取消狀態(tài)。通過死循環(huán),我們能確保node節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)不處于取消狀態(tài)。反之,如果一開始pred節(jié)點(diǎn)不處于取消狀態(tài),那么我們通過CAS將pre節(jié)點(diǎn)的狀態(tài)置為SIGNAL,為后面循環(huán)涉及到的park操作進(jìn)行準(zhǔn)備。但是有一點(diǎn)要注意,我們進(jìn)行park之前,要確保當(dāng)前節(jié)點(diǎn)獲取鎖失敗。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
調(diào)用LockSupport.park()掛起當(dāng)前線程,并且返回中斷狀態(tài)。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
CountDownLatch的await(long timeout, TimeUnit unit)和await()方法實(shí)現(xiàn)方式基本一致,只不過加入了超時(shí)機(jī)制而已。
// 返回false,代表超時(shí)
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 返回false,代表超時(shí)。返回true,代表獲得共享鎖成功
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
如果在nanosTimeout時(shí)間范圍內(nèi),還沒有獲取共享鎖成功的話,直接返回false。spinForTimeoutThreadshold的值為1000nanoseconds。如果shouldParkAfterFailedAcquire(p, node)返回true且超時(shí)時(shí)間大于閥值spinForTimeoutThreadshold的話,會(huì)通過LockSupport.parkNanos(this, nanosTimeout);讓線程掛起nanosTimeout時(shí)間。這樣的策略體現(xiàn)是:如果超時(shí)時(shí)間很短的話,就不把當(dāng)前線程掛起,而是通過自旋,這樣線程獲取鎖很快就釋放的情況下,可以減少cpu資源和線程掛起和恢復(fù)的性能損耗。
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
總結(jié)
通過重寫AQS中的模板方法,可以實(shí)現(xiàn)共享鎖和獨(dú)占鎖。
如果獲取共享鎖失敗,請(qǐng)求共享鎖的線程封裝成SHARED類型的Node對(duì)象并且加入到AQS同步隊(duì)列中,并掛起Node對(duì)象對(duì)應(yīng)的線程,等待鎖的釋放。待共享鎖的可以被獲取后,從head節(jié)點(diǎn)開始依次喚醒head節(jié)點(diǎn)之后的所有SHARED類型的節(jié)點(diǎn),實(shí)現(xiàn)共享狀態(tài)的傳播。而獨(dú)占鎖則是,當(dāng)鎖被頭節(jié)點(diǎn)獲取后,只有頭節(jié)點(diǎn)獲得了鎖,其他節(jié)點(diǎn)的線程繼續(xù)沉睡。等待鎖被釋放了,才會(huì)喚醒下一個(gè)節(jié)點(diǎn)的線程,少了setHeadAndPropagate()這一步。
尾言
大家好,我是cmazxiaoma(寓意是沉夢(mèng)昂志的小馬),希望和你們一起成長(zhǎng)進(jìn)步,感謝各位閱讀本文章。
如果您對(duì)這篇文章有什么意見或者錯(cuò)誤需要改進(jìn)的地方,歡迎與我討論。
如果您覺得還不錯(cuò)的話,希望你們可以點(diǎn)個(gè)贊。
希望我的文章對(duì)你能有所幫助。
有什么意見、見解或疑惑,歡迎留言討論。
最后送上:心之所向,素履以往。生如逆旅,一葦以航。
