概述
對(duì)于Java來說萬物皆對(duì)象,所有的Java對(duì)象的最終父類都是Object,所以它們都擁有一組監(jiān)視器方法,主要包括:wait(),wait(long timeout),notify()和notifyAll(),這些方法與Synchronized關(guān)鍵字配合,可以實(shí)現(xiàn)等待/通知機(jī)制。
Condition也提供了類似Object的監(jiān)控方法,與Lock接口配合能夠?qū)崿F(xiàn)等待/通知機(jī)制,但是這兩者在使用方式和功能特性上有一定的區(qū)別。下面是一個(gè)Object的監(jiān)視器方法與Condition接口的對(duì)比:
| 對(duì)比項(xiàng) | Object監(jiān)視器方法 | Condition接口 |
|---|---|---|
| 前置條件 | 獲取對(duì)象鎖 | 1.調(diào)用Lock.lock()方法 2.調(diào)用Lock.newCondition()方法獲取Condition對(duì)象 |
| 調(diào)用方式 | 直接調(diào)用。例如Object.wait() | 直接調(diào)用。例如condition.await()方法 |
| 等待隊(duì)列個(gè)數(shù) | 一個(gè) | 多個(gè) |
| 當(dāng)前線程釋放鎖,進(jìn)入等待狀態(tài) | 支持 | 支持 |
| 當(dāng)前線程釋放鎖,進(jìn)入等待狀態(tài), 在等待狀態(tài)中不響應(yīng)中斷 |
不支持 | 支持 |
| 當(dāng)前線程釋放鎖,并進(jìn)入超時(shí)等待狀態(tài) | 支持 | 支持 |
| 當(dāng)前線程釋放鎖,進(jìn)入等待狀態(tài)到某個(gè)時(shí)間 | 不支持 | 支持 |
| 喚醒等待隊(duì)列中的一個(gè)線程 | 支持 | 支持 |
| 喚醒等待隊(duì)列中的所有線程 | 支持 | 支持 |
Condition的使用方式
Condition接口中定義了等待、通知兩種類型的方法,具體如下圖:

我們知道Condition是由Lock.newCondition()創(chuàng)建來的,也就是說condition是依賴于Lock對(duì)象的。在調(diào)用上圖的方法時(shí),必須先獲取到Condition對(duì)象關(guān)聯(lián)的鎖。Condition的使用方式如下:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionTest {
public static final Lock lock = new ReentrantLock();
public static final Condition condition = lock.newCondition();
public static void conditionWait() throws InterruptedException{
lock.lock();
try {
System.out.println(System.currentTimeMillis() + " :" +Thread.currentThread().getName()+" 調(diào)用await!");
condition.await();
System.out.println(System.currentTimeMillis() + " :" +Thread.currentThread().getName()+" 被喚醒,重新獲得了鎖!");
} finally {
lock.unlock();
System.out.println(System.currentTimeMillis() + " :" + Thread.currentThread().getName()+" 釋放了鎖!");
}
}
public static void conditionSignal() throws InterruptedException{
lock.lock();
try {
System.out.println(System.currentTimeMillis() + " :" +Thread.currentThread().getName()+" 調(diào)用signal方法!");
condition.signal();
System.out.println(System.currentTimeMillis() + " :" +Thread.currentThread().getName()+" 調(diào)用了signal方法!");
} finally {
lock.unlock();
System.out.println(System.currentTimeMillis() + " :" +Thread.currentThread().getName()+" 釋放了鎖!");
}
}
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(System.currentTimeMillis() + " :" +Thread.currentThread().getName()+" 開始執(zhí)行!");
conditionWait();
System.out.println(System.currentTimeMillis() + " :" +Thread.currentThread().getName()+" 結(jié)束執(zhí)行!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(System.currentTimeMillis() + " :" +Thread.currentThread().getName()+" 開始執(zhí)行!");
conditionSignal();
System.out.println(System.currentTimeMillis() + " :" +Thread.currentThread().getName()+" 結(jié)束執(zhí)行!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
一般都會(huì)將Condition對(duì)象作為成員變量。當(dāng)調(diào)用await()方法后,當(dāng)前線程會(huì)釋放鎖進(jìn)入等待狀態(tài),而其他線程在獲取鎖,進(jìn)行自己的業(yè)務(wù)邏輯后調(diào)用了Condition對(duì)象的Signal()方法,通知當(dāng)前線程后,當(dāng)前線程才從await()方法返回,并在返回前已經(jīng)獲取了鎖。
關(guān)于Condition方法的描述如下:
-
void await()當(dāng)前線程進(jìn)入等待狀態(tài),直到被通知(signal/signalAll)或中斷(其他線程調(diào)用interrupt()方法),當(dāng)前線程將進(jìn)入運(yùn)行狀態(tài)且從await()方法返回,此時(shí)當(dāng)前線程已經(jīng)獲取到了對(duì)應(yīng)鎖。 -
void awaitUninterruptibly()當(dāng)前線程進(jìn)入等待狀態(tài),直到被通知,但是該方法不響應(yīng)中斷 -
long awaitNanos(long nanosTimeout)當(dāng)前線程進(jìn)入等待狀態(tài),直到被通知、中斷或者超時(shí)。返回值表示剩余時(shí)間,如果在nanosTimeout納秒之前被喚醒,那么返回值為(nanosTimeout - 實(shí)際耗時(shí))。返回值為0或負(fù)值,則表示已經(jīng)超時(shí)了。 -
boolean awaitUntil(Date deadline)當(dāng)前線程進(jìn)入等待狀態(tài),直到被通知、中斷或者某個(gè)時(shí)間。如果沒有到達(dá)指定時(shí)間就被通知,則返回true;否則返回false。 -
void signal()喚醒一個(gè)等待在Condition上的線程,該線程從等待方法返回前必須獲得與Condition相關(guān)的鎖; -
void signalAll()喚醒所有等待在Condition上的線程,能夠從等待方法返回的線程必須獲得與Condition相關(guān)聯(lián)的鎖;
Condition實(shí)現(xiàn)分析
上面我們說到Condition是由Lock.newCondition()方法創(chuàng)建出來的,而查看ReentrantLock中的源碼,可以看到,newCondition()方法實(shí)際上會(huì)new一個(gè)ConditionObject對(duì)象。具體如下:
final ConditionObject newCondition() {
return new ConditionObject();
}
ConditionObject對(duì)象是AQS的一個(gè)內(nèi)部類,之前說到Condition是依賴于Lock來使用的,那么ConditionObject是AQS的內(nèi)部類也順理成章了。每一個(gè)Condition對(duì)象都維護(hù)者一個(gè)隊(duì)列,即等待隊(duì)列,該隊(duì)列是Condition實(shí)現(xiàn)等待/通知機(jī)制的關(guān)鍵。等待隊(duì)列是一個(gè)FIFO的隊(duì)列,在等待隊(duì)列的每一個(gè)節(jié)點(diǎn)都包含了一個(gè)線程引用,如果一個(gè)線程調(diào)用condition.await()方法,那么該線程將會(huì)釋放鎖、構(gòu)造成節(jié)點(diǎn)加入到等待隊(duì)列并進(jìn)入等待狀態(tài)。這里說到的節(jié)點(diǎn)Node其實(shí)與之前AQS中提到的Node是同一個(gè)內(nèi)部類AbstractQueuedSynchronizer.Node。
這里我們還需注意ConditionObject中包含兩個(gè)成員變量:
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
分別表示Condition的頭尾指針,還有Node中還有一個(gè)屬性是需要大家注意的Node nextWaiter,nextWaiter表示等待隊(duì)列中的后繼節(jié)點(diǎn),而Node中關(guān)于同步隊(duì)列的相關(guān)屬性卻有兩個(gè):prev和next。那么由此我們可以判斷等待隊(duì)列是一個(gè)單向隊(duì)列,每個(gè)節(jié)點(diǎn)只保存其后一個(gè)節(jié)點(diǎn)的引用。而等待隊(duì)列的基本結(jié)構(gòu)則如下圖:

如上圖所示,Condition擁有首節(jié)點(diǎn)的引用,而新增節(jié)點(diǎn)只需要將原尾節(jié)點(diǎn)的nextWaiter指向它,并更新尾節(jié)點(diǎn)即可。需要注意的是節(jié)點(diǎn)更新的過程是沒有使用CAS方法的,原因是調(diào)用await 方法的線程必定獲取了鎖。我們可以不止一次的調(diào)用lock.newCondition方法,這說明AQS中不止維護(hù)了一個(gè)等待隊(duì)列。object監(jiān)視器上只能擁有一個(gè)同步隊(duì)列和一個(gè)等待隊(duì)列,而AQS卻擁有一個(gè)同步隊(duì)列,多個(gè)等待隊(duì)列。具體如下圖:

如上圖所示,Condition的實(shí)現(xiàn)是同步器的內(nèi)部類,因此每個(gè)Condition實(shí)例都能夠訪問同步器提供的方法,相當(dāng)于每個(gè)Condition都擁有同步器的引用。
condition.await()方法
廢話不多說,直接擼源碼:
public final void await() throws InterruptedException {
//如果線程被中斷,那么拋出中斷異常
if (Thread.interrupted())
throw new InterruptedException();
//將線程構(gòu)建成Node節(jié)點(diǎn),并加入等待隊(duì)列
Node node = addConditionWaiter();
//釋放當(dāng)前線程所占用的鎖,并喚醒同步隊(duì)列中的下一個(gè)節(jié)點(diǎn)
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//當(dāng)前線程進(jìn)入等待狀態(tài)
LockSupport.park(this);
//判斷是否被中斷
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//自旋等待獲取同步狀態(tài)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//處理被中斷狀態(tài)
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
當(dāng)前線程調(diào)用condition.await()方法后,會(huì)使得當(dāng)前線程釋放鎖并進(jìn)入等待隊(duì)列中,直到被signal/signalAll方法喚醒后會(huì)使當(dāng)前線程從等待隊(duì)列移至同步隊(duì)列中去,知道獲取鎖后返回,或者在等待過程中被中斷做中斷處理。那么這中間的細(xì)節(jié)是如何處理的呢?當(dāng)前線程是如何加入等待隊(duì)列中的?又是怎么釋放鎖的呢?釋放之后await方法如何退出呢?這些我們都還不清楚,下面我們來仔細(xì)分析下源碼中調(diào)用的幾個(gè)方法。
private Node addConditionWaiter() {
//獲取尾節(jié)點(diǎn)指針
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
//如果尾節(jié)點(diǎn)不為null,并且尾節(jié)點(diǎn)等待狀態(tài)不是CONDITION,那么刪除等待隊(duì)列中所有非CONDITION狀態(tài)的節(jié)點(diǎn)
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
//重新獲取尾節(jié)點(diǎn)
t = lastWaiter;
}
//將當(dāng)前線程構(gòu)建成節(jié)點(diǎn)
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//如果尾節(jié)點(diǎn)為空,則將頭結(jié)點(diǎn)指針指向當(dāng)前節(jié)點(diǎn),否則將尾節(jié)點(diǎn)的后繼節(jié)點(diǎn)指向當(dāng)前節(jié)點(diǎn)
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
//然后將尾節(jié)點(diǎn)指針指向當(dāng)前節(jié)點(diǎn)
lastWaiter = node;
return node;
}
從上面這段代碼可以看到,該方法將當(dāng)前線程構(gòu)建成節(jié)點(diǎn),判斷頭結(jié)點(diǎn)firstWaiter是否為空,如果為空,則將firstWaiter指向當(dāng)前節(jié)點(diǎn),如果不為空,則更新尾節(jié)點(diǎn)。這就解決了如何加入等待隊(duì)列的問題,下面由fullRelease方法來釋放鎖,具體源碼如下;
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
//嘗試釋放鎖,并喚醒同步隊(duì)列中的下一個(gè)節(jié)點(diǎn)
if (release(savedState)) {
//成功則返回同步狀態(tài)
failed = false;
return savedState;
} else {
//不成功拋出異常
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
這段代碼就很容易理解了,調(diào)用AQS中的release()方法釋放鎖,并喚醒同步隊(duì)列中頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)引用的線程,如果釋放成功則正常返回,釋放失敗則拋出異常。然后在回到await()方法的源碼中,發(fā)現(xiàn)以上方法調(diào)用完后有這么一段邏輯:
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
final boolean isOnSyncQueue(Node node) {
//如果當(dāng)前節(jié)點(diǎn)為等待狀態(tài),或前置節(jié)點(diǎn)為空,那么返回false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)next不為空,這說明在同步隊(duì)列中,返回true
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
//再同步隊(duì)列中尋找當(dāng)前節(jié)點(diǎn),找到返回true,未找到返回false
return findNodeFromTail(node);
}
很顯然要想退出await方法,需要先跳出該循環(huán)。而從代碼中可以看出跳出循環(huán)的方法兩種:1、!isOnSyncQueue(node)返回false;2、(interruptMode = checkInterruptWhileWaiting(node)) != 0等于true。從上面的源碼可以看出isOnSyncQueue(node)方法,用來判斷當(dāng)前節(jié)點(diǎn)是否在同步隊(duì)列中,即另外線程調(diào)用signal/signalAll方法。第二個(gè)條件判斷當(dāng)前線程是否被中斷。
總結(jié)為:退出await方法的前提條件是當(dāng)前線程被中斷或其他線程調(diào)用signal/signalAll方法將當(dāng)前線程移動(dòng)到同步隊(duì)列中。當(dāng)跳出while循環(huán)后,會(huì)繼續(xù)調(diào)用acquireQueued(node, savedState)方法,自旋獲取同步狀態(tài),直到成功,這樣說明了要跳出await方法必須要獲得鎖。到這里我們已經(jīng)解決了上面提出的疑問,對(duì)await方法也理解的更加透徹了。下面是await方法的示意圖:

signal/signalAll
調(diào)用Condition的signal()和signalAll()方法,將會(huì)喚醒等待隊(duì)列中等待時(shí)間最長(zhǎng)的節(jié)點(diǎn)(即首節(jié)點(diǎn)),在喚醒節(jié)點(diǎn)之前,會(huì)將節(jié)點(diǎn)移動(dòng)到同步隊(duì)列中。下面先看下Signal()方法的源碼:
public final void signal() {
//先判斷當(dāng)前線程是否獲取到了鎖,沒有的話,拋出異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//喚醒等待隊(duì)列中的頭結(jié)點(diǎn)
doSignal(first);
}
從上面代碼可以看出,首先會(huì)判斷當(dāng)前線程是否獲取到了鎖,如果沒有獲取到,則會(huì)拋出異常。如果獲取到了鎖,那么先拿到等待隊(duì)列的頭指針引用的節(jié)點(diǎn),之后喚醒等待隊(duì)列中的頭結(jié)點(diǎn),具體細(xì)節(jié)在doSignal(first)方法中,具體看下源碼;
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
//將頭結(jié)點(diǎn)從等待隊(duì)列中移除
first.nextWaiter = null;
//對(duì)頭結(jié)點(diǎn)做處理的部分在transferForSignal(first)中
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
使用CAS將等待狀態(tài)改為0,如果失敗返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//將節(jié)點(diǎn)移入同步隊(duì)列
Node p = enq(node);
int ws = p.waitStatus;
//如果該節(jié)點(diǎn)等待狀態(tài)>0或者嘗試修改等待狀態(tài)為SIGNAL失敗,則喚醒該節(jié)點(diǎn)對(duì)應(yīng)的線程,返回true
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
閱讀源碼,能夠發(fā)現(xiàn),doSignal主要做了一下幾件事:1、將頭結(jié)點(diǎn)從等待隊(duì)列移除;2、將頭結(jié)點(diǎn)狀態(tài)由CONDITION改為0,即初始狀態(tài);3、將節(jié)點(diǎn)從同步隊(duì)列尾部插入;4、喚醒該節(jié)點(diǎn)。由此我們可以得出結(jié)論:調(diào)用Condition.signal()方法的前提是當(dāng)前線程已經(jīng)獲取到了鎖,該方法會(huì)將等待隊(duì)列中的頭結(jié)點(diǎn)移除并從同步隊(duì)列的尾節(jié)點(diǎn)插入,并喚醒當(dāng)前節(jié)點(diǎn)對(duì)應(yīng)的線程。

signalAll()
signalAll()方法與signal()方法的區(qū)別僅僅體現(xiàn)在doSignal和doSignalAll方法上,我們看下doSignalAll方法的源碼:
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
可以看到這里的區(qū)別就是,doSignalAll會(huì)將等待隊(duì)列中的所有節(jié)點(diǎn)都移動(dòng)到同步隊(duì)列中,并喚醒全部對(duì)應(yīng)節(jié)點(diǎn)的線程。
總結(jié)
下面是我自己總結(jié)的關(guān)于condition.await方法和signal方法的運(yùn)行流程圖:


如果有什么問題的話,歡迎大家留言指正,謝謝!
注:本文參考《Java并發(fā)編程的藝術(shù)》