看這篇文章前最好看下http://m.itdecent.cn/p/89132109d49d
回顧一下java中的等待/通知機(jī)制
我們有時會遇到這樣的場景:線程A執(zhí)行到某個點的時候,因為某個條件condition不滿足,需要線程A暫停;等到線程B修改了條件condition,使condition滿足了線程A的要求時,A再繼續(xù)執(zhí)行。
自旋實現(xiàn)的等待通知
最簡單的實現(xiàn)方法就是將condition設(shè)為一個volatile的變量,當(dāng)A線程檢測到條件不滿足時就自旋,類似下面:
public class Test {
private static volatile int condition = 0;
public static void main(String[] args) throws InterruptedException {
Thread A = new Thread(new Runnable() {
@Override
public void run() {
while (!(condition == 1)) {
// 條件不滿足,自旋
}
System.out.println("a executed");
}
});
A.start();
Thread.sleep(2000);
condition = 1;
}
}
這種方式的問題在于自旋非常耗費CPU資源,當(dāng)然如果在自旋的代碼塊里加入Thread.sleep(time)將會減輕CPU資源的消耗,但是如果time設(shè)的太大,A線程就不能及時響應(yīng)condition的變化,如果設(shè)的太小,依然會造成CPU的消耗。
Object提供的等待通知
因此,java在Object類里提供了wait()和notify()方法,使用方法如下:
class Test1 {
private static volatile int condition = 0;
private static final Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
Thread A = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock) {
while (!(condition == 1)) {
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("a executed by notify");
}
}
});
A.start();
Thread.sleep(2000);
condition = 1;
synchronized (lock) {
lock.notify();
}
}
}
通過代碼可以看出,在使用一個對象的wait()、notify()方法前必須要獲取這個對象的鎖。
當(dāng)線程A調(diào)用了lock對象的wait()方法后,線程A將釋放持有的lock對象的鎖,然后將自己掛起,直到有其他線程調(diào)用notify()/notifyAll()方法或被中斷??梢钥吹皆趌ock.wait()前面檢測condition條件的時候使用了一個while循環(huán)而不是if,那是因為當(dāng)有其他線程把condition修改為滿足A線程的要求并調(diào)用notify()后,A線程會重新等待獲取鎖,獲取到鎖后才從lock.wait()方法返回,而在A線程等待鎖的過程中,condition是有可能再次變化的。
因為wait()、notify()是和synchronized配合使用的,因此如果使用了顯示鎖Lock,就不能用了。所以顯示鎖要提供自己的等待/通知機(jī)制,Condition應(yīng)運而生。
顯示鎖提供的等待通知
我們用Condition實現(xiàn)上面的例子:
class Test2 {
private static volatile int condition = 0;
private static Lock lock = new ReentrantLock();
private static Condition lockCondition = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
Thread A = new Thread(new Runnable() {
@Override
public void run() {
lock.lock();
try {
while (!(condition == 1)) {
lockCondition.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
System.out.println("a executed by condition");
}
});
A.start();
Thread.sleep(2000);
condition = 1;
lock.lock();
try {
lockCondition.signal();
} finally {
lock.unlock();
}
}
}
可以看到通過 lock.newCondition() 可以獲得到 lock 對應(yīng)的一個Condition對象lockCondition ,lockCondition的await()、signal()方法分別對應(yīng)之前的Object的wait()和notify()方法。整體上和Object的等待通知是類似的。
實現(xiàn)一個阻塞隊列
上面我們看到了Condition實現(xiàn)的等待通知和Object的等待通知是非常類似的,而Condition提供的等待通知功能更強大,最重要的一點是,一個lock對象可以通過多次調(diào)用 lock.newCondition() 獲取多個Condition對象,也就是說,在一個lock對象上,可以有多個等待隊列,而Object的等待通知在一個Object上,只能有一個等待隊列。用下面的例子說明,下面的代碼實現(xiàn)了一個阻塞隊列,當(dāng)隊列已滿時,add操作被阻塞有其他線程通過remove方法刪除元素;當(dāng)隊列已空時,remove操作被阻塞直到有其他線程通過add方法添加元素。
public class BoundedQueue1<T> {
public List<T> q; //這個列表用來存隊列的元素
private int maxSize; //隊列的最大長度
private Lock lock = new ReentrantLock();
private Condition addConditoin = lock.newCondition();
private Condition removeConditoin = lock.newCondition();
public BoundedQueue1(int size) {
q = new ArrayList<>(size);
maxSize = size;
}
public void add(T e) {
lock.lock();
try {
while (q.size() == maxSize) {
addConditoin.await();
}
q.add(e);
removeConditoin.signal(); //執(zhí)行了添加操作后喚醒因隊列空被阻塞的刪除操作
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
public T remove() {
lock.lock();
try {
while (q.size() == 0) {
removeConditoin.await();
}
T e = q.remove(0);
addConditoin.signal(); //執(zhí)行刪除操作后喚醒因隊列滿而被阻塞的添加操作
return e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
lock.unlock();
}
}
}
demo
Condition是JUC里面提供于控制線程釋放鎖, 然后進(jìn)行等待其他獲取鎖的線程發(fā)送 signal 信號來進(jìn)行喚醒的工具類.
- Condition內(nèi)部主要是由一個裝載線程節(jié)點 Node 的 Condition Queue 實現(xiàn)。
- 對 Condition 的方法(await, signal等) 的調(diào)用必需是在本線程獲取了獨占鎖的前提下。
- 因為 操作Condition的方法的前提是獲取獨占鎖, 所以 Condition Queue 內(nèi)部是一條不支持并發(fā)安全的單向 queue
public class ConditionTest {
private static final Logger logger = Logger.getLogger(ConditionTest.class);
static final Lock lock = new ReentrantLock();
static final Condition condition = lock.newCondition();
public static void main(String[] args) throws Exception{
final Thread thread1 = new Thread("Thread 1 "){
@Override
public void run() {
lock.lock(); // 線程 1獲取 lock
logger.info(Thread.currentThread().getName() + " 正在運行 .....");
try {
Thread.sleep(2 * 1000);
logger.info(Thread.currentThread().getName() + " 停止運行, 等待一個 signal ");
condition.await(); // 調(diào)用 condition.await 進(jìn)行釋放鎖, 將當(dāng)前節(jié)點封裝成一個 Node 放入 Condition Queue 里面, 等待喚醒
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info(Thread.currentThread().getName() + " 獲取一個 signal, 繼續(xù)執(zhí)行 ");
lock.unlock(); // 釋放鎖
}
};
thread1.start(); // 線程 1 線運行
Thread.sleep(1 * 1000);
Thread thread2 = new Thread("Thread 2 "){
@Override
public void run() {
lock.lock(); // 線程 2獲取lock
logger.info(Thread.currentThread().getName() + " 正在運行.....");
thread1.interrupt(); // 對線程1 進(jìn)行中斷 看看中斷后會怎么樣? 結(jié)果 線程 1還是獲取lock, 并且最后還進(jìn)行 lock.unlock()操作
try {
Thread.sleep(2 * 1000);
}catch (Exception e){
}
condition.signal(); // 發(fā)送喚醒信號 從 AQS 的 Condition Queue 里面轉(zhuǎn)移 Node 到 Sync Queue
logger.info(Thread.currentThread().getName() + " 發(fā)送一個 signal ");
logger.info(Thread.currentThread().getName() + " 發(fā)送 signal 結(jié)束");
lock.unlock(); // 線程 2 釋放鎖
}
};
thread2.start();
}
}
執(zhí)行過程
線程 1 開始執(zhí)行, 獲取 lock, 然后開始睡眠 2秒
當(dāng)線程1睡眠到 1秒時, 線程2開始執(zhí)行, 但是lock被線程1獲取, 所以 等待
線程 1 睡足2秒 調(diào)用 condition.await() 進(jìn)行鎖的釋放, 并且將 線程1封裝成一個 node 放到 condition 的 Condition Queue里面,等待其他獲取鎖的線程給他 signal, 或?qū)ζ溥M(jìn)行中斷,中斷后可以到 Sync Queue里面進(jìn)而獲取 鎖
線程 2 獲取鎖成功, 中斷 線程1, 線程被中斷后, node 從 Condition Queue 轉(zhuǎn)移到 Sync Queue 里面, 但是 lock 還是被 線程2獲取者, 所以 node呆在 Sync Queue 里面等待獲取 lock
線程 2睡了 2秒, 開始用signal喚醒 Condition Queue 里面的節(jié)點(此時代表 線程1的node已經(jīng)到 Sync Queue 里面
線程 2釋放lock, 并且在 Sync Queue 里面進(jìn)行喚醒等待獲取鎖的節(jié)點 node
7.線程1 得到喚醒, 獲取鎖
- 線程1 釋放鎖
[2017-02-08 22:43:09,557] INFO Thread 1 (ConditionTest.java:26) - Thread 1 正在運行 .....
[2017-02-08 22:43:11,565] INFO Thread 1 (ConditionTest.java:30) - Thread 1 停止運行, 等待一個 signal
[2017-02-08 22:43:11,565] INFO Thread 2 (ConditionTest.java:48) - Thread 2 正在運行.....
java.lang.InterruptedException
[2017-02-08 22:43:13,566] INFO Thread 2 (ConditionTest.java:57) - Thread 2 發(fā)送一個 signal
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
[2017-02-08 22:43:13,566] INFO Thread 2 (ConditionTest.java:58) - Thread 2 發(fā)送 signal 結(jié)束
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
[2017-02-08 22:43:13,567] INFO Thread 1 (ConditionTest.java:35) - Thread 1 獲取一個 signal, 繼續(xù)執(zhí)行
at com.lami.tuomatuo.search.base.concurrent.aqs.ConditionTest$1.run(ConditionTest.java:31)
源碼分析
之前我們介紹AQS的時候說過,AQS的同步排隊用了一個隱式的雙向隊列,同步隊列的每個節(jié)點是一個AbstractQueuedSynchronizer.Node實例。
Node的主要字段有:
- waitStatus:等待狀態(tài),所有的狀態(tài)見下面的表格。
- prev:前驅(qū)節(jié)點
- next:后繼節(jié)點
- thread:當(dāng)前節(jié)點代表的線程
- nextWaiter:Node既可以作為同步隊列節(jié)點使用,也可以作為Condition的等待隊列節(jié)點使用(將會在后面講Condition時講到)。在作為同步隊列節(jié)點時,nextWaiter可能有兩個值:EXCLUSIVE、SHARED標(biāo)識當(dāng)前節(jié)點是獨占模式還是共享模式;在作為等待隊列節(jié)點使用時,nextWaiter保存后繼節(jié)點。
| 狀態(tài) | 值 | 含義 |
|---|---|---|
| CANCELLED | 1 | 當(dāng)前節(jié)點因為超時或中斷被取消同步狀態(tài)獲取,該節(jié)點進(jìn)入該狀態(tài)不會再變化 |
| SIGNAL | -1 | 標(biāo)識后繼的節(jié)點處于阻塞狀態(tài),當(dāng)前節(jié)點在釋放同步狀態(tài)或被取消時,需要通知后繼節(jié)點繼續(xù)運行。每個節(jié)點在阻塞前,需要標(biāo)記其前驅(qū)節(jié)點的狀態(tài)為SIGNAL。 |
| CONDITION | -2 | 標(biāo)識當(dāng)前節(jié)點是作為等待隊列節(jié)點使用的。 |
| PROPAGATE | -3 | |
| 0 | 0 | 初始狀態(tài) |
Condition實現(xiàn)等待的時候內(nèi)部也有一個等待隊列,等待隊列是一個隱式的單向隊列,等待隊列中的每一個節(jié)點也是一個AbstractQueuedSynchronizer.Node實例。
每個Condition對象中保存了firstWaiter和lastWaiter作為隊列首節(jié)點和尾節(jié)點,每個節(jié)點使用Node.nextWaiter保存下一個節(jié)點的引用,因此等待隊列是一個單向隊列。
每當(dāng)一個線程調(diào)用Condition.await()方法,那么該線程會釋放鎖,構(gòu)造成一個Node節(jié)點加入到等待隊列的隊尾。
等待
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); //構(gòu)造一個新的等待隊列Node加入到隊尾
int savedState = fullyRelease(node); //釋放當(dāng)前線程的獨占鎖,不管重入幾次,都把state釋放為0
int interruptMode = 0;
//如果當(dāng)前節(jié)點沒有在同步隊列上,即還沒有被signal,則將當(dāng)前線程阻塞
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//后面的藍(lán)色代碼都是和中斷相關(guān)的,主要是區(qū)分兩種中斷:是在被signal前中斷還是在被signal后中斷,如果是被signal前就被中斷則拋出 InterruptedException,否則執(zhí)行 Thread.currentThread().interrupt();
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //被中斷則直接退出自旋
break;
}
//退出了上面自旋說明當(dāng)前節(jié)點已經(jīng)在同步隊列上,但是當(dāng)前節(jié)點不一定在同步隊列隊首。acquireQueued將阻塞直到當(dāng)前節(jié)點成為隊首,即當(dāng)前線程獲得了鎖。然后await()方法就可以退出了,讓線程繼續(xù)執(zhí)行await()后的代碼。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
final boolean isOnSyncQueue(Node node) {
//如果當(dāng)前節(jié)點狀態(tài)是CONDITION或node.prev是null,則證明當(dāng)前節(jié)點在等待隊列上而不是同步隊列上。之所以可以用node.prev來判斷,是因為一個節(jié)點如果要加入同步隊列,在加入前就會設(shè)置好prev字段。
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果node.next不為null,則一定在同步隊列上,因為node.next是在節(jié)點加入同步隊列后設(shè)置的
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node); //前面的兩個判斷沒有返回的話,就從同步隊列隊尾遍歷一個一個看是不是當(dāng)前節(jié)點。
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
通知
Condition.signal() 方法的源碼如下:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException(); //如果同步狀態(tài)不是被當(dāng)前線程獨占,直接拋出異常。從這里也能看出來,Condition只能配合獨占類同步組件使用。
Node first = firstWaiter;
if (first != null)
doSignal(first); //通知等待隊列隊首的節(jié)點。
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && //transferForSignal方法嘗試喚醒當(dāng)前節(jié)點,如果喚醒失敗,則繼續(xù)嘗試喚醒當(dāng)前節(jié)點的后繼節(jié)點。
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//如果當(dāng)前節(jié)點狀態(tài)為CONDITION,則將狀態(tài)改為0準(zhǔn)備加入同步隊列;如果當(dāng)前狀態(tài)不為CONDITION,說明該節(jié)點等待已被中斷,則該方法返回false,doSignal()方法會繼續(xù)嘗試喚醒當(dāng)前節(jié)點的后繼節(jié)點
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node); //將節(jié)點加入同步隊列,返回的p是節(jié)點在同步隊列中的先驅(qū)節(jié)點
int ws = p.waitStatus;
//如果先驅(qū)節(jié)點的狀態(tài)為CANCELLED(>0) 或設(shè)置先驅(qū)節(jié)點的狀態(tài)為SIGNAL失敗,那么就立即喚醒當(dāng)前節(jié)點對應(yīng)的線程,線程被喚醒后會執(zhí)行acquireQueued方法,該方法會重新嘗試將節(jié)點的先驅(qū)狀態(tài)設(shè)為SIGNAL并再次park線程;如果當(dāng)前設(shè)置前驅(qū)節(jié)點狀態(tài)為SIGNAL成功,那么就不需要馬上喚醒線程了,當(dāng)它的前驅(qū)節(jié)點成為同步隊列的首節(jié)點且釋放同步狀態(tài)后,會自動喚醒它。
//其實筆者認(rèn)為這里不加這個判斷條件應(yīng)該也是可以的。只是對于CAS修改前驅(qū)節(jié)點狀態(tài)為SIGNAL成功這種情況來說,如果不加這個判斷條件,提前喚醒了線程,等進(jìn)入acquireQueued方法了節(jié)點發(fā)現(xiàn)自己的前驅(qū)不是首節(jié)點,還要再阻塞,等到其前驅(qū)節(jié)點成為首節(jié)點并釋放鎖時再喚醒一次;而如果加了這個條件,線程被喚醒的時候它的前驅(qū)節(jié)點肯定是首節(jié)點了,線程就有機(jī)會直接獲取同步狀態(tài)從而避免二次阻塞,節(jié)省了硬件資源。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
Condition等待通知的本質(zhì)
總的來說,Condition的本質(zhì)就是等待隊列和同步隊列的交互:
當(dāng)一個持有鎖的線程調(diào)用Condition.await()時,它會執(zhí)行以下步驟:
- 構(gòu)造一個新的等待隊列節(jié)點加入到等待隊列隊尾
- 釋放鎖
- 自旋,直到它在等待隊列上的節(jié)點移動到了同步隊列(通過其他線程調(diào)用signal())或被中斷
- 阻塞當(dāng)前節(jié)點,直到它獲取到了鎖,也就是它在同步隊列上的節(jié)點排隊排到了隊首。
當(dāng)一個持有鎖的線程調(diào)用Condition.signal()時,它會執(zhí)行以下操作:
- 從等待隊列的隊首開始,嘗試對隊首節(jié)點執(zhí)行喚醒操作;如果節(jié)點CANCELLED,就嘗試喚醒下一個節(jié)點;如果再CANCELLED則繼續(xù)迭代。
對每個節(jié)點執(zhí)行喚醒操作時,首先將節(jié)點加入同步隊列,此時await()操作的步驟3的解鎖條件就已經(jīng)開啟了。然后分兩種情況討論:
- 如果先驅(qū)節(jié)點的狀態(tài)為CANCELLED(>0) 或設(shè)置先驅(qū)節(jié)點的狀態(tài)為SIGNAL失敗,那么就立即喚醒當(dāng)前節(jié)點對應(yīng)的線程,此時await()方法就會完成步驟3,進(jìn)入步驟4.
- 如果成功把先驅(qū)節(jié)點的狀態(tài)設(shè)置為了SIGNAL,那么就不立即喚醒了。等到先驅(qū)節(jié)點成為同步隊列首節(jié)點并釋放了同步狀態(tài)后,會自動喚醒當(dāng)前節(jié)點對應(yīng)線程的,這時候await()的步驟3才執(zhí)行完成,而且有很大概率快速完成步驟4.
如果知道Object的等待通知機(jī)制,Condition的使用是比較容易掌握的,因為和Object等待通知的使用基本一致。
對Condition的源碼理解,主要就是理解等待隊列,等待隊列可以類比同步隊列,而且等待隊列比同步隊列要簡單,因為等待隊列是單向隊列,同步隊列是雙向隊列。
之所以同步隊列要設(shè)計成雙向的,是因為在同步隊列中,節(jié)點喚醒是接力式的,由每一個節(jié)點喚醒它的下一個節(jié)點,如果是由next指針獲取下一個節(jié)點,是有可能獲取失敗的,因為虛擬隊列每添加一個節(jié)點,是先用CAS把tail設(shè)置為新節(jié)點,然后才修改原tail的next指針到新節(jié)點的。因此用next向后遍歷是不安全的,但是如果在設(shè)置新節(jié)點為tail前,為新節(jié)點設(shè)置prev,則可以保證從tail往前遍歷是安全的。因此要安全的獲取一個節(jié)點Node的下一個節(jié)點,先要看next是不是null,如果是null,還要從tail往前遍歷看看能不能遍歷到Node。
而等待隊列就簡單多了,等待的線程就是等待者,只負(fù)責(zé)等待,喚醒的線程就是喚醒者,只負(fù)責(zé)喚醒,因此每次要執(zhí)行喚醒操作的時候,直接喚醒等待隊列的首節(jié)點就行了。等待隊列的實現(xiàn)中不需要遍歷隊列,因此也不需要prev指針。
Condition 構(gòu)造函數(shù)級基本屬性
主要是Condition Queue 的頭尾節(jié)點
/** First node of condition queue */
/** Condition Queue 里面的頭節(jié)點 */
private transient Node firstWaiter;
/** Last node of condition queue */
/** Condition Queue 里面的尾節(jié)點 */
private transient Node lastWaiter;
/** Creates a new {@code ConditionObject} instance */
/** 構(gòu)造函數(shù) */
public ConditionObject(){}
Condition Queue enqueue節(jié)點方法 addConditionWaiter
addConditionWaiter方法主要用于調(diào)用 Condition.await 時將當(dāng)前節(jié)點封裝成 一個Node, 加入到 Condition Queue里面。
/**
* Adds a new waiter to wait queue
* 將當(dāng)前線程封裝成一個 Node 節(jié)點 放入大 Condition Queue 里面
* 大家可以注意到, 下面對 Condition Queue 的操作都沒考慮到 并發(fā)(Sync Queue 的隊列是支持并發(fā)操作的), 這是為什么呢? 因為在進(jìn)行操作 Condition 是當(dāng)前的線程已經(jīng)獲取了AQS的獨占鎖, 所以不需要考慮并發(fā)的情況
* @return
*/
private Node addConditionWaiter(){
Node t = lastWaiter; // 1. Condition queue 的尾節(jié)點
// If lastWaiter is cancelled, clean out // 2.尾節(jié)點已經(jīng)Cancel, 直接進(jìn)行清除,
// 這里有1個問題, 1 何時出現(xiàn)t.waitStatus != Node.CONDITION -> 在對線程進(jìn)行中斷時 ConditionObject -> await -> checkInterruptWhileWaiting -> transferAfterCancelledWait "compareAndSetWaitStatus(node, Node.CONDITION, 0)" <- 導(dǎo)致這種情況一般是 線程中斷或 await 超時
// 一個注意點: 當(dāng)Condition進(jìn)行 awiat 超時或被中斷時, Condition里面的節(jié)點是沒有被刪除掉的, 需要其他 await 在將線程加入 Condition Queue 時調(diào)用addConditionWaiter而進(jìn)而刪除, 或 await 操作差不多結(jié)束時, 調(diào)用 "node.nextWaiter != null" 進(jìn)行判斷而刪除 (PS: 通過 signal 進(jìn)行喚醒時 node.nextWaiter 會被置空, 而中斷和超時時不會)
if(t != null && t.waitStatus != Node.CONDITION){
unlinkCancelledWaiters(); // 3. 調(diào)用 unlinkCancelledWaiters 對 "waitStatus != Node.CONDITION" 的節(jié)點進(jìn)行刪除(在Condition里面的Node的waitStatus 要么是CONDITION(正常), 要么就是 0 (signal/timeout/interrupt))
t = lastWaiter; // 4. 獲取最新的 lastWaiter
}
Node node = new Node(Thread.currentThread(), Node.CONDITION); // 5. 將線程封裝成 node 準(zhǔn)備放入 Condition Queue 里面
if(t == null){
firstWaiter = node; // 6 .Condition Queue 是空的
}else{
t.nextWaiter = node; // 7. 最加到 queue 尾部
}
lastWaiter = node; // 8. 重新賦值 lastWaiter
return node;
}
對 Condition Queue 的操作都沒考慮到 并發(fā)(Sync Queue 的隊列是支持并發(fā)操作的), 這是為什么呢? 因為在進(jìn)行操作 Condition 是當(dāng)前的線程已經(jīng)獲取了AQS的獨占鎖, 所以不需要考慮并發(fā)的情況

Condition 喚醒 first節(jié)點方法 doSignal
這里的喚醒指的是將節(jié)點從 Condition Queue 轉(zhuǎn)移到 Sync Queue 里面
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters
* @param first
*/
/**
* 喚醒 Condition Queue 里面的頭節(jié)點, 注意這里的喚醒只是將 Node 從 Condition Queue 轉(zhuǎn)到 Sync Queue 里面(這時的 Node 也還是能被 Interrupt)
*/
private void doSignal(Node first){
do{
if((firstWaiter = first.nextWaiter) == null){ // 1. 將 first.nextWaiter 賦值給 nextWaiter 為下次做準(zhǔn)備
lastWaiter = null; // 2. 這時若 nextWaiter == null, 則說明 Condition 為空了, 所以直接置空 lastWaiter
}
first.nextWaiter = null; // 3. first.nextWaiter == null 是判斷 Node 從 Condition queue 轉(zhuǎn)移到 Sync Queue 里面是通過 signal 還是 timeout/interrupt
}while(!transferForSignal(first) && (first = firstWaiter) != null); // 4. 調(diào)用 transferForSignal將 first 轉(zhuǎn)移到 Sync Queue 里面, 返回不成功的話, 將 firstWaiter 賦值給 first
}
Condition 喚醒 所有 節(jié)點方法 doSignalAll
/**
* Removes and transfers all nodes
* @param first (non-null) the first node on condition queue
*/
/**
* 喚醒 Condition Queue 里面的所有的節(jié)點
*/
private void doSignalAll(Node first){
lastWaiter = firstWaiter = null; // 1. 將 lastWaiter, firstWaiter 置空
do{
Node next = first.nextWaiter; // 2. 初始化下個換新的節(jié)點
first.nextWaiter = null; // 3. first.nextWaiter == null 是判斷 Node 從 Condition queue 轉(zhuǎn)移到 Sync Queue 里面是通過 signal 還是 timeout/interrupt
transferForSignal(first); // 4. 調(diào)用 transferForSignal將 first 轉(zhuǎn)移到 Sync Queue 里面
first = next; // 5. 開始換新 next 節(jié)點
}while(first != null);
}
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
Condition 刪除取消節(jié)點的方法 unlinkCancelledWaiters
一般的節(jié)點都會被 signal 喚醒, 從 Condition Queue 轉(zhuǎn)移到 Sync Queue, 而若遇到 interrupt 或 等待超時, 則直接改變 node 的狀態(tài)(從 CONDITION 變成 0), 并直接放入 Sync 里面, 而不清理Condition Queue 里面的節(jié)點,所以需要下面的函數(shù)
/**
* 在 調(diào)用 addConditionWaiter 將線程放入 Condition Queue 里面時 或 awiat 方法獲取 差不多結(jié)束時 進(jìn)行清理 Condition queue 里面的因 timeout/interrupt 而還存在的節(jié)點
* 這個刪除操作比較巧妙, 其中引入了 trail 節(jié)點, 可以理解為traverse整個 Condition Queue 時遇到的最后一個有效的節(jié)點
*/
private void unlinkCancelledWaiters(){
Node t = firstWaiter;
Node trail = null;
while(t != null){
Node next = t.nextWaiter; // 1. 先初始化 next 節(jié)點
if(t.waitStatus != Node.CONDITION){ // 2. 節(jié)點不有效, 在Condition Queue 里面 Node.waitStatus 只有可能是 CONDITION 或是 0(timeout/interrupt引起的)
t.nextWaiter = null; // 3. Node.nextWaiter 置空
if(trail == null){ // 4. 一次都沒有遇到有效的節(jié)點
firstWaiter = next; // 5. 將 next 賦值給 firstWaiter(此時 next 可能也是無效的, 這只是一個臨時處理)
}else{
trail.nextWaiter = next; // 6. next 賦值給 trail.nextWaiter, 這一步其實就是刪除節(jié)點 t
}
if(next == null){ // 7. next == null 說明 已經(jīng) traverse 完了 Condition Queue
lastWaiter = trail;
}
}else{
trail = t; // 8. 將有效節(jié)點賦值給 trail
}
t = next;
}
}
這是一段非常精巧的queue節(jié)點刪除, 主要還是在 節(jié)點 trail 上, trail 節(jié)點可以理解為traverse整個 Condition Queue 時遇到的最后一個有效的節(jié)點
Condition 喚醒首節(jié)點方法 signal
/**
* 將 Condition queue 的頭節(jié)點轉(zhuǎn)移到 Sync Queue 里面
* 在進(jìn)行調(diào)用 signal 時, 當(dāng)前的線程必須獲取了 獨占的鎖
*/
@Override
public void signal() {
if(!isHeldExclusively()){ // 1. 判斷當(dāng)前的線程是否已經(jīng)獲取 獨占鎖
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if(first != null){
doSignal(first); // 2. 調(diào)用 doSignal 進(jìn)行轉(zhuǎn)移
}
}
Condition 喚醒所有節(jié)點方法 signalAll
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively()}
* return {@code false}
*/
/**
* 將 Condition Queue 里面的節(jié)點都轉(zhuǎn)移到 Sync Queue 里面
*/
public final void signalAll(){
if(!isHeldExclusively()){
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if(first != null){
doSignalAll(first);
}
}
Condition 釋放鎖進(jìn)行等待方法 awaitUninterruptibly
awaitUninterruptibly 方法是一個不響應(yīng) 中斷的方法
整個流程
將當(dāng)前的線程封裝成 Node 加入到 Condition 里面
丟棄當(dāng)前線程所擁有的 獨占鎖
等待 其他獲取 獨占鎖的線程的喚醒, 喚醒從 Condition Queue 到 Sync Queue 里面, 進(jìn)而獲取 獨占鎖
最后獲取 lock 之后, 在根據(jù)線程喚醒的方式(signal/interrupt) 進(jìn)行處理
/**
* 不響應(yīng)線程中斷的方式進(jìn)行 await
*/
public final void awaitUninterruptibly(){
Node node = addConditionWaiter(); // 1. 將當(dāng)前線程封裝成一個 Node 放入 Condition Queue 里面
int savedState = fullyRelease(node); // 2. 釋放當(dāng)前線程所獲取的所有的獨占鎖(PS: 獨占的鎖支持重入), 等等, 為什么要釋放呢? 以為你調(diào)用 awaitUninterruptibly 方法的前提就是你已經(jīng)獲取了 獨占鎖
boolean interrupted = false; // 3. 線程中斷標(biāo)識
while(!isOnSyncQueue(node)){ // 4. 這里是一個 while loop, 調(diào)用 isOnSyncQueue 判斷當(dāng)前的 Node 是否已經(jīng)被轉(zhuǎn)移到 Sync Queue 里面
LockSupport.park(this); // 5. 若當(dāng)前 node 不在 sync queue 里面, 則先 block 一下等待其他線程調(diào)用 signal 進(jìn)行喚醒; (這里若有其他線程對當(dāng)前線程進(jìn)行 中斷的換, 也能進(jìn)行喚醒)
if(Thread.interrupted()){ // 6. 判斷這是喚醒是 signal 還是 interrupted(Thread.interrupted()會清楚線程的中斷標(biāo)記, 但沒事, 我們有步驟7中的interrupted進(jìn)行記錄)
interrupted = true; // 7. 說明這次喚醒是被中斷而喚醒的,這個標(biāo)記若是true的話, 在 awiat 離開時還要 自己中斷一下(selfInterrupt), 其他的函數(shù)可能需要線程的中斷標(biāo)識
}
}
if(acquireQueued(node, savedState) || interrupted){ // 8. acquireQueued 返回 true 說明線程在 block 的過程中式被 inetrrupt 過(其實 acquireQueued 返回 true 也有可能其中有一次喚醒是 通過 signal)
selfInterrupt(); // 9. 自我中斷, 外面的線程可以通過這個標(biāo)識知道, 整個 awaitUninterruptibly 運行過程中 是否被中斷過
}
}
Condition 中斷標(biāo)示
/**
* For interruptible waits, we need to track whether to throw
* InterruptedException, if interrupted while blocked on
* condition, versus reinterrupt current thread, if
* interrupted while blocked waiting to re-acquire
*/
/**
* 下面兩個是用于追蹤 調(diào)用 awaitXXX 方法時線程有沒有被中斷過
* 主要的區(qū)別是
* REINTERRUPT: 代表線程是在 signal 后被中斷的 (REINTERRUPT = re-interrupt 再次中斷 最后會調(diào)用 selfInterrupt)
* THROW_IE: 代表在接受 signal 前被中斷的, 則直接拋出異常 (Throw_IE = throw inner exception)
*/
/** Mode meaning to reinterrupt on exit from wait */
/** 在離開 awaitXX方法, 退出前再次 自我中斷 (調(diào)用 selfInterrupt)*/
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
/** 在離開 awaitXX方法, 退出前再次, 以為在 接受 signal 前被中斷, 所以需要拋出異常 */
private static final int THROW_IE = -1;
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted
*/
/**
* 檢查 在 awaitXX 方法中的這次喚醒是否是中斷引起的
* 若是中斷引起的, 則將 Node 從 Condition Queue 轉(zhuǎn)移到 Sync Queue 里面
* 返回值的區(qū)別:
* 0: 此次喚醒是通過 signal -> LockSupport.unpark
* THROW_IE: 此次的喚醒是通過 interrupt, 并且 在 接受 signal 之前
* REINTERRUPT: 線程的喚醒是 接受過 signal 而后又被中斷
*/
private int checkInterruptWhileWaiting(Node node){
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode
*/
`Condition 中斷處理方法 reportInterruptAfterWait`
/**
* 這個方法是在 awaitXX 方法離開前調(diào)用的, 主要是根據(jù)
* interrupMode 判斷是拋出異常, 還是自我再中斷一下
*/
private void reportInterruptAfterWait(int interrupMode) throws InterruptedException{
if(interrupMode == THROW_IE){
throw new InterruptedException();
}
else if(interrupMode == REINTERRUPT){
selfInterrupt();
}
}
Condition 釋放鎖 進(jìn)行等待的方法 await
await 此方法響應(yīng)中斷請求, 當(dāng)接受到中斷請求后會將節(jié)點從 Condition Queue 轉(zhuǎn)移到 Sync Queue
/**
* Implements interruptible condition wait
*
* <li>
* If current thread is interrupted, throw InterruptedException
* Save lock state returned by {@link #getState()}
* Invoke {@link #release(int)} with saved state as argument,
* throwing IllegalMonitorStateException if it fails
* Blocking until signalled or interrupted
* Reacquire by invoking specifized version of
* {@link #acquire(int)} with saved state as argument.
* If interrupted while blocked in step 4, throw InterruptedException
* </li>
*
* @throws InterruptedException
*/
/**
* 支持 InterruptedException 的 await <- 注意這里即使是線程被中斷,
* 還是需要獲取了獨占的鎖后, 再 調(diào)用 lock.unlock 進(jìn)行釋放鎖
*/
@Override
public final void await() throws InterruptedException {
if(Thread.interrupted()){ // 1. 判斷線程是否中斷
throw new InterruptedException();
}
Node node = addConditionWaiter(); // 2. 將線程封裝成一個 Node 放到 Condition Queue 里面, 其中可能有些清理工作
int savedState = fullyRelease(node); // 3. 釋放當(dāng)前線程所獲取的所有的鎖 (PS: 調(diào)用 await 方法時, 當(dāng)前線程是必須已經(jīng)獲取了獨占的鎖)
int interruptMode = 0;
while(!isOnSyncQueue(node)){ // 4. 判斷當(dāng)前線程是否在 Sync Queue 里面(這里 Node 從 Condtion Queue 里面轉(zhuǎn)移到 Sync Queue 里面有兩種可能 (1) 其他線程調(diào)用 signal 進(jìn)行轉(zhuǎn)移 (2) 當(dāng)前線程被中斷而進(jìn)行Node的轉(zhuǎn)移(就在checkInterruptWhileWaiting里面進(jìn)行轉(zhuǎn)移))
LockSupport.park(this); // 5. 當(dāng)前線程沒在 Sync Queue 里面, 則進(jìn)行 block
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){ // 6. 判斷此次線程的喚醒是否因為線程被中斷, 若是被中斷, 則會在checkInterruptWhileWaiting的transferAfterCancelledWait 進(jìn)行節(jié)點的轉(zhuǎn)移; 返回值 interruptMode != 0
break; // 說明此是通過線程中斷的方式進(jìn)行喚醒, 并且已經(jīng)進(jìn)行了 node 的轉(zhuǎn)移, 轉(zhuǎn)移到 Sync Queue 里面
}
}
if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ // 7. 調(diào)用 acquireQueued在 Sync Queue 里面進(jìn)行 獨占鎖的獲取, 返回值表明在獲取的過程中有沒有被中斷過
interruptMode = REINTERRUPT;
}
if(node.nextWaiter != null){ // clean up if cancelled // 8. 通過 "node.nextWaiter != null" 判斷 線程的喚醒是中斷還是 signal, 因為通過中斷喚醒的話, 此刻代表線程的 Node 在 Condition Queue 與 Sync Queue 里面都會存在
unlinkCancelledWaiters(); // 9. 進(jìn)行 cancelled 節(jié)點的清除
}
if(interruptMode != 0){ // 10. "interruptMode != 0" 代表通過中斷的方式喚醒線程
reportInterruptAfterWait(interruptMode); // 11. 根據(jù) interruptMode 的類型決定是拋出異常, 還是自己再中斷一下
}
}
Condition 釋放鎖 進(jìn)行等待的方法 awaitNanos
awaitNanos 具有超時功能, 與響應(yīng)中斷的功能, 不管中斷還是超時都會 將節(jié)點從 Condition Queue 轉(zhuǎn)移到 Sync Queue
**
* Impelemnts timed condition wait
*
* <li>
* If current thread is interrupted, throw InterruptedException
* Save lock state returned by {@link #getState()}
* Invoke {@link #release(int)} with saved state as argument,
* throwing IllegalMonitorStateException if it fails
* Block until aignalled, interrupted, or timed out
* Reacquire by invoking specified version of
* {@link #acquire(int)} with saved state as argument
* If interrupted while blocked in step 4, throw InterruptedException
* </li>
*/
/**
* 所有 awaitXX 方法其實就是
* 0. 將當(dāng)前的線程封裝成 Node 加入到 Condition 里面
* 1. 丟到當(dāng)前線程所擁有的 獨占鎖,
* 2. 等待 其他獲取 獨占鎖的線程的喚醒, 喚醒從 Condition Queue 到 Sync Queue 里面, 進(jìn)而獲取 獨占鎖
* 3. 最后獲取 lock 之后, 在根據(jù)線程喚醒的方式(signal/interrupt) 進(jìn)行處理
* 4. 最后還是需要調(diào)用 lock./unlock 進(jìn)行釋放鎖
*/
@Override
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if(Thread.interrupted()){ // 1. 判斷線程是否中斷
throw new InterruptedException();
}
Node node = addConditionWaiter(); // 2. 將線程封裝成一個 Node 放到 Condition Queue 里面, 其中可能有些清理工作
int savedState = fullyRelease(node); // 3. 釋放當(dāng)前線程所獲取的所有的鎖 (PS: 調(diào)用 await 方法時, 當(dāng)前線程是必須已經(jīng)獲取了獨占的鎖)
final long deadline = System.nanoTime() + nanosTimeout; // 4. 計算 wait 的截止時間
int interruptMode = 0;
while(!isOnSyncQueue(node)){ // 5. 判斷當(dāng)前線程是否在 Sync Queue 里面(這里 Node 從 Condtion Queue 里面轉(zhuǎn)移到 Sync Queue 里面有兩種可能 (1) 其他線程調(diào)用 signal 進(jìn)行轉(zhuǎn)移 (2) 當(dāng)前線程被中斷而進(jìn)行Node的轉(zhuǎn)移(就在checkInterruptWhileWaiting里面進(jìn)行轉(zhuǎn)移))
if(nanosTimeout <= 0L){ // 6. 等待時間超時(這里的 nanosTimeout 是有可能 < 0),
transferAfterCancelledWait(node); // 7. 調(diào)用 transferAfterCancelledWait 將 Node 從 Condition 轉(zhuǎn)移到 Sync Queue 里面
break;
}
if(nanosTimeout >= spinForTimeoutThreshold){ // 8. 當(dāng)剩余時間 < spinForTimeoutThreshold, 其實函數(shù) spin 比用 LockSupport.parkNanos 更高效
LockSupport.parkNanos(this, nanosTimeout); // 9. 進(jìn)行線程的 block
}
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){ // 10. 判斷此次線程的喚醒是否因為線程被中斷, 若是被中斷, 則會在checkInterruptWhileWaiting的transferAfterCancelledWait 進(jìn)行節(jié)點的轉(zhuǎn)移; 返回值 interruptMode != 0
break; // 說明此是通過線程中斷的方式進(jìn)行喚醒, 并且已經(jīng)進(jìn)行了 node 的轉(zhuǎn)移, 轉(zhuǎn)移到 Sync Queue 里面
}
nanosTimeout = deadline - System.nanoTime(); // 11. 計算剩余時間
}
if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ // 12. 調(diào)用 acquireQueued在 Sync Queue 里面進(jìn)行 獨占鎖的獲取, 返回值表明在獲取的過程中有沒有被中斷過
interruptMode = REINTERRUPT;
}
if(node.nextWaiter != null){ // 13. 通過 "node.nextWaiter != null" 判斷 線程的喚醒是中斷還是 signal, 因為通過中斷喚醒的話, 此刻代表線程的 Node 在 Condition Queue 與 Sync Queue 里面都會存在
unlinkCancelledWaiters(); // 14. 進(jìn)行 cancelled 節(jié)點的清除
}
if(interruptMode != 0){ // 15. "interruptMode != 0" 代表通過中斷的方式喚醒線程
reportInterruptAfterWait(interruptMode); // 16. 根據(jù) interruptMode 的類型決定是拋出異常, 還是自己再中斷一下
}
return deadline - System.nanoTime(); // 17 這個返回值代表是 通過 signal 還是 超時
}
Condition 釋放鎖 進(jìn)行等待的方法 awaitUntil
/**
* Implements absolute timed condition wait
* <li>
* If current thread is interrupted, throw InterruptedException
* Save lock state returned by {@link #getState()}
* Invoke {@link #release(int)} with saved state as argument,
* throwing IllegalMonitorStateException if it fails
* Block until signalled, interrupted, or timed out
* Reacquire by invoking specialized version of
* {@link #acquire(int)} with saved state as argument
* if interrupted while blocked in step 4, throw InterruptedException
* If timeed out while blocked in step 4, return false, else true
* </li>
*/
/**
* 所有 awaitXX 方法其實就是
* 0. 將當(dāng)前的線程封裝成 Node 加入到 Condition 里面
* 1. 丟到當(dāng)前線程所擁有的 獨占鎖,
* 2. 等待 其他獲取 獨占鎖的線程的喚醒, 喚醒從 Condition Queue 到 Sync Queue 里面, 進(jìn)而獲取 獨占鎖
* 3. 最后獲取 lock 之后, 在根據(jù)線程喚醒的方式(signal/interrupt) 進(jìn)行處理
* 4. 最后還是需要調(diào)用 lock./unlock 進(jìn)行釋放鎖
*
* awaitUntil 和 awaitNanos 差不多
*/
@Override
public boolean awaitUntil(Date deadline) throws InterruptedException {
long abstime = deadline.getTime(); // 1. 判斷線程是否中斷
if(Thread.interrupted()){
throw new InterruptedException();
}
Node node = addConditionWaiter(); // 2. 將線程封裝成一個 Node 放到 Condition Queue 里面, 其中可能有些清理工作
int savedState = fullyRelease(node); // 3. 釋放當(dāng)前線程所獲取的所有的鎖 (PS: 調(diào)用 await 方法時, 當(dāng)前線程是必須已經(jīng)獲取了獨占的鎖)
boolean timeout = false;
int interruptMode = 0;
while(!isOnSyncQueue(node)){ // 4. 判斷當(dāng)前線程是否在 Sync Queue 里面(這里 Node 從 Condtion Queue 里面轉(zhuǎn)移到 Sync Queue 里面有兩種可能 (1) 其他線程調(diào)用 signal 進(jìn)行轉(zhuǎn)移 (2) 當(dāng)前線程被中斷而進(jìn)行Node的轉(zhuǎn)移(就在checkInterruptWhileWaiting里面進(jìn)行轉(zhuǎn)移))
if(System.currentTimeMillis() > abstime){ // 5. 計算是否超時
timeout = transferAfterCancelledWait(node); // 6. 調(diào)用 transferAfterCancelledWait 將 Node 從 Condition 轉(zhuǎn)移到 Sync Queue 里面
break;
}
LockSupport.parkUntil(this, abstime); // 7. 進(jìn)行 線程的阻塞
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){ // 8. 判斷此次線程的喚醒是否因為線程被中斷, 若是被中斷, 則會在checkInterruptWhileWaiting的transferAfterCancelledWait 進(jìn)行節(jié)點的轉(zhuǎn)移; 返回值 interruptMode != 0
break; // 說明此是通過線程中斷的方式進(jìn)行喚醒, 并且已經(jīng)進(jìn)行了 node 的轉(zhuǎn)移, 轉(zhuǎn)移到 Sync Queue 里面
}
}
if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ // 9. 調(diào)用 acquireQueued在 Sync Queue 里面進(jìn)行 獨占鎖的獲取, 返回值表明在獲取的過程中有沒有被中斷過
interruptMode = REINTERRUPT;
}
if(node.nextWaiter != null){ // 10. 通過 "node.nextWaiter != null" 判斷 線程的喚醒是中斷還是 signal, 因為通過中斷喚醒的話, 此刻代表線程的 Node 在 Condition Queue 與 Sync Queue 里面都會存在
unlinkCancelledWaiters(); // 11. 進(jìn)行 cancelled 節(jié)點的清除
}
if(interruptMode != 0){ // 12. "interruptMode != 0" 代表通過中斷的方式喚醒線程
reportInterruptAfterWait(interruptMode); // 13. 根據(jù) interruptMode 的類型決定是拋出異常, 還是自己再中斷一下
}
return !timeout; // 13. 返回是否通過 interrupt 進(jìn)行線程的喚醒
}