背景
早前寫過一篇深入理解 AQS 的文章
過了一段時間后,我發(fā)現(xiàn)有些地方記得不清楚了,所以我感覺其實(shí)理解的并不深入透徹,于是決定再好好找資料看一看,把之前一知半解的地方徹底弄清楚!
看文檔
先看下源碼, AbstractQueuedSynchronizer 這個類有代碼+注釋共有 2335 行,可以說很長,我們先看第一手資料即源碼注釋是怎么寫的,通過 AbstractQueuedSynchronizer 類的注釋提綱挈領(lǐng)地了解一下這個類。
類注釋
這里就不貼英文原文了,比較長,直接上翻譯內(nèi)容:
本類提供了一個用于實(shí)現(xiàn)阻塞鎖和同步器(信號量、事件等)的框架,該框架依賴先進(jìn)先出 (FIFO) 的等待隊(duì)列來實(shí)現(xiàn)。它為大多數(shù)通過單個 int 類型的原子值來表示狀態(tài)的同步器提供了實(shí)現(xiàn)基礎(chǔ)。子類必須重寫更改同步器狀態(tài)的 protected 方法,并定義該狀態(tài)在獲取或釋放子類對象方面的具體含義?;诖?,此類中的其他方法實(shí)現(xiàn)所有排隊(duì)和阻塞機(jī)制。子類可以維護(hù)其他狀態(tài)字段,但就同步而言,只有使用了方法 getState、 setState 和 compareAndSetState 操作以原子方式更新的 int 值才能夠被跟蹤。
子類應(yīng)定義為非公共內(nèi)部輔助類,用于實(shí)現(xiàn)其外部類的同步屬性。AbstractQueuedSynchronizer 類沒有實(shí)現(xiàn)任何同步接口。相反,它定義了諸如 acquireInterruptably 之類的方法,這些方法可以由具體鎖和相關(guān)同步器適當(dāng)調(diào)用以實(shí)現(xiàn)它們的公共方法。
此類默認(rèn)支持獨(dú)占模式和共享模式獲取同步狀態(tài)。當(dāng)以獨(dú)占模式獲取同步狀態(tài)時,其他線程嘗試獲取不會成功。共享模式下,多個線程獲取同步狀態(tài)一般(但不一定)會成功。這個類會忽略這兩種模式在機(jī)制上的差異,那就是當(dāng)線程在共享模式下成功獲取到同步狀態(tài)時,下一個等待的線程(如果存在)也必須確定它是否也可以獲取該狀態(tài)。
不同模式下的等待線程共享一個 FIFO 隊(duì)列。
通常子類只實(shí)現(xiàn)一種模式,但兩者可以同時工作,例如 ReadWriteLock。只支持一種模式的子類,無需再為另一種模式定義方法。
AQS 定義了一個內(nèi)部類 ConditionObject,可以當(dāng)做 Condition 的實(shí)例供支持獨(dú)占模式的子類使用。在獨(dú)占模式下,方法 #isHeldExclusively 用于表示當(dāng)前線程是否在獨(dú)占子類對象,以方法#getState 的返回值為入?yún)ⅲ{(diào)用方法#release 可以完全釋放子類對象。使用這個值調(diào)用#acquire 方法,最終恢復(fù)到前一個獲取鎖的狀態(tài)。 AbstractQueuedSynchronizer 中沒有其他方法創(chuàng)建這個條件,所以如果不能滿足這個約束,就不要使用它。 AbstractQueuedSynchronizer.ConditionObject 的行為當(dāng)然是基于它的實(shí)現(xiàn)類的語義。
該類為內(nèi)部類提供了 檢查/儀表化/監(jiān)視 方法,以及類似的用于 condition 對象的方法。 這些可以導(dǎo)出到使用 AbstractQueuedSynchronizer 作為同步機(jī)制的類中。
該類的序列化僅僅存儲原子整數(shù)的維護(hù)狀態(tài),所以反序列化對象得到的線程隊(duì)列是空的。子類若需要序列化,要定義 readObject() 用于恢復(fù)自己到一個已知的初始狀態(tài)。
用法:
若將該類用作同步器的基礎(chǔ),請使用 getState()/setState()/cas 這些用于檢查或修改同步狀態(tài)的方法對下面的方法進(jìn)行重新定義:
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
這些方法默認(rèn)拋出 UnsupportedOperationException, 這些方法內(nèi)部必須保證線程安全,并且通常應(yīng)該是簡短且無鎖的。 定義這些方法是使用該類的意義所在。其他方法因?yàn)椴荒塥?dú)立的變化,所以聲明為 final。
你也能看到繼承自 AbstractOwnableSynchronizer 的方法,用于跟蹤那些獨(dú)占了同步器的線程。鼓勵你使用這些方法,它們開啟了監(jiān)視和診斷功能,可以幫助用戶判斷那些線程持有鎖。
即使該類基于內(nèi)部 FIFO 隊(duì)列,但也不會執(zhí)行默認(rèn)的 FIFO 策略,獨(dú)占同步的核心使用以下策略:
Acquire:
while (!tryAcquire(arg)) {
enqueue thread if it is not already queued;
possibly block current thread;
}
Release:
if (tryRelease(arg))
unblock the first queued thread;
(共享模式類似,但可能涉及級聯(lián)信號。)
因?yàn)閷Λ@取鎖的檢查是在入隊(duì)之前進(jìn)行,所以一個新線程可能會插入到其他已經(jīng)入隊(duì)的線程之前。不管咋地吧,你要是愿意,可以通過調(diào)用內(nèi)部的一個或多個檢查方法去定義 tryAcquire() / tryAcquireShared() 方法,從而提供一個公平的 FIFO 獲取順序。特別是,大多數(shù)公平同步器在 hasQueuedPredecessors()(這是一個專門設(shè)計用于公平同步器的)返回 true 的時候,讓 tryAcquire() 返回 false. 當(dāng)然,其他方法也有可能。
對于默認(rèn)的插隊(duì)策略(也叫 greedy/renouncement/convoy-avoidance), 吞吐量和可伸縮性通常最高。盡管這不能保證公平,但允許早入隊(duì)的線程在后來的線程之前重新競爭,每個重新競爭的線程擁有公平的機(jī)會打敗新來的線程。盡管獲取行為通常不會一直進(jìn)行,但線程在被阻塞之前,伴隨著其他計算過程,他們可能會多次調(diào)用 tryAcquire(). 當(dāng)獨(dú)占同步器只是短暫的被持有時,這對于自旋來說很有好處,沒有太多負(fù)擔(dān)。你可以通過前面的調(diào)用增強(qiáng)這一點(diǎn),以獲得帶有"fast-path"的方法。如果同步器可能沒有被爭奪,或許只會預(yù)先檢查 hasContended() 和 hasQueuedThreads()。
這個類為同步提供了有效的可擴(kuò)展的基礎(chǔ),部分原因是把范圍集中在那些依賴數(shù)字狀態(tài)、 獲取/釋放參數(shù)、 以及內(nèi)部 FIFO 的同步器上。當(dāng)這些不能滿足需要時,你可以使用原子類、 你自己的 java.util.Queue 類、 LockSuppor 來構(gòu)建更底層的同步器。
用法示例:
這是一個不可重入互斥鎖,0 表示打開,1 表示鎖定。雖然不可重入鎖無需嚴(yán)格記錄本地?fù)碛姓叩木€程,但這個類還是這么做了,以便更容易監(jiān)視。同時也支持 conditions 并且公開了一個檢測方法。
class Mutex implements Lock, java.io.Serializable {
// Our internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
// Acquires the lock if state is zero
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// Releases the lock by setting state to zero
protected boolean tryRelease(int releases) {
assert releases == 1; // Otherwise unused
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// Reports whether in locked state
public boolean isLocked() {
return getState() != 0;
}
public boolean isHeldExclusively() {
// a data race, but safe due to out-of-thin-air guarantees
return getExclusiveOwnerThread() == Thread.currentThread();
}
// Provides a Condition
public Condition newCondition() {
return new ConditionObject();
}
// Deserializes properly
private void readObject(ObjectInputStream s)
throws IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// The sync object does all the hard work. We just forward to it.
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isLocked();
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
這是一個類似于 java.util.concurrent.CountDownLatch CountDownLatch 的閂鎖類,只是它只需要一個信號即可觸發(fā)。 因?yàn)殒i存器是非獨(dú)占的,所以它使用共享的獲取和釋放方法。
class BooleanLatch {
private static class Sync extends AbstractQueuedSynchronizer {
boolean isSignalled() {
return getState() != 0;
}
protected int tryAcquireShared(int ignore) {
return isSignalled() ? 1 : -1;
}
protected boolean tryReleaseShared(int ignore) {
setState(1);
return true;
}
}
private final Sync sync = new Sync();
public boolean isSignalled() {
return sync.isSignalled();
}
public void signal() {
sync.releaseShared(1);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}
如果對上面的翻譯理解起來還是費(fèi)勁,那還放那兒,等看完下面的,再回到上面看一遍就通透了。
看設(shè)計
AQS 本質(zhì)上是一個 FIFO 的雙向隊(duì)列,線程被包裝成結(jié)點(diǎn)的形式,基于自旋機(jī)制在隊(duì)列中等待獲取資源(這里的資源可以簡單理解為對象鎖)

總攬一下這個類,可以看到有兩個內(nèi)部類,剩下的就是一堆成員變量和成員方法。

設(shè)計思路
這是 AQS 的模型:

AQS 主要由三部分組成,state 同步狀態(tài)、Node 組成的 CLH 隊(duì)列、ConditionObject 條件變量(包含 Node 組成的條件單向隊(duì)列)。
state 用 volatile 來修飾,保證了我們操作的可見性,所以任何線程通過 getState() 獲得狀態(tài)都是可以得到最新值,但是 setState() 無法保證原子性,因此 AQS 給我們提供了 compareAndSetState 方法利用底層 UnSafe 的 CAS 功能來實(shí)現(xiàn)原子性。
對于 AQS 來說,線程同步的關(guān)鍵是對 state 的操作,可以說獲取、釋放資源是否成功都是由 state 決定的,比如 state>0 代表可獲取資源,否則無法獲取,所以 state 的具體語義由實(shí)現(xiàn)者去定義,現(xiàn)有的 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch 定義的 state 語義都不一樣。
- ReentrantLock 的 state 用來表示是否有鎖資源,變量記錄了鎖的重入次數(shù)
- ReentrantReadWriteLock 的 state 高 16 位代表讀鎖狀態(tài),低 16 位代表寫鎖狀態(tài)
- Semaphore 的 state 用來表示可用信號的個數(shù)
- CountDownLatch 的 state 用來表示計數(shù)器的值
AQS 實(shí)現(xiàn)了兩類隊(duì)列,即 同步隊(duì)列 和 條件隊(duì)列。
同步隊(duì)列服務(wù)于線程阻塞等待獲取資源,而條件隊(duì)列則服務(wù)于線程因某個條件不滿足而進(jìn)入等待狀態(tài)。
條件隊(duì)列中的線程實(shí)際上已經(jīng)獲取到了資源,但是沒有能夠繼續(xù)執(zhí)行下去的條件,所以被打入條件隊(duì)列并釋放持有的資源,以讓渡其它線程執(zhí)行,如果未來某個時刻條件得以滿足,則該線程會被從條件隊(duì)列轉(zhuǎn)移到同步隊(duì)列,繼續(xù)參與競爭資源,以繼續(xù)向下執(zhí)行。
同步隊(duì)列
CLH
同步隊(duì)列是基于鏈表實(shí)現(xiàn)的雙向隊(duì)列,也是 CLH 鎖的變種。CLH 鎖是 AQS 隊(duì)列同步器實(shí)現(xiàn)的基礎(chǔ)。
來看一下 CLH 隊(duì)列

- CLH 鎖是有由 Craig, Landin, and Hagersten 這三個人發(fā)明的鎖,取了三個人名字的首字母,所以叫 CLH Lock。
- CLH 鎖是一個自旋鎖。能確保無饑餓性。提供先來先服務(wù)的公平性。
- CLH 隊(duì)列鎖也是一種基于鏈表的可擴(kuò)展、高性能、公平的自旋鎖,申請線程僅僅在本地變量上自旋,它不斷輪詢前驅(qū)的狀態(tài),假設(shè)發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋。
Node
AQS 以內(nèi)部類 Node 的形式定義了同步隊(duì)列結(jié)點(diǎn)。這就是前文看到的第一個內(nèi)部類。
static final class Node {
/** 模式定義 */
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
/** 線程狀態(tài) */
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/** 線程等待狀態(tài) */
volatile int waitStatus;
/** 前驅(qū)結(jié)點(diǎn) */
volatile Node prev;
/** 后置結(jié)點(diǎn) */
volatile Node next;
/** 持有的線程對象 */
volatile Thread thread;
/** 對于獨(dú)占模式而言,指向下一個處于 CONDITION 等待狀態(tài)的結(jié)點(diǎn);對于共享模式而言,則為 SHARED 結(jié)點(diǎn) */
Node nextWaiter;
// ... 省略方法定義
}

Node 在 CLH 的基礎(chǔ)上進(jìn)行了變種。CLH 是單向隊(duì)列,其主要特點(diǎn)是自旋檢查前驅(qū)節(jié)點(diǎn)的 locked 狀態(tài)。而 AQS 同步隊(duì)列是雙向隊(duì)列,每個節(jié)點(diǎn)也有狀態(tài) waitStatus,而其并不是一直對前驅(qū)節(jié)點(diǎn)的狀態(tài)自旋判斷,而是自旋一段時間后阻塞讓出 cpu 時間片(上下文切換),等待前驅(qū)節(jié)點(diǎn)主動喚醒后繼節(jié)點(diǎn)。

waitStatus 有如下 5 中狀態(tài):
- CANCELLED = 1
表示當(dāng)前結(jié)點(diǎn)已取消調(diào)度。當(dāng)超時或被中斷(響應(yīng)中斷的情況下),會觸發(fā)變更為此狀態(tài),進(jìn)入該狀態(tài)后的結(jié)點(diǎn)將不會再變化。 - SIGNAL = -1
表示后繼結(jié)點(diǎn)在等待當(dāng)前結(jié)點(diǎn)喚醒。后繼結(jié)點(diǎn)入隊(duì)時,會將前繼結(jié)點(diǎn)的狀態(tài)更新為 SIGNAL。 - CONDITION = -2
表示結(jié)點(diǎn)等待在 Condition 上,當(dāng)其他線程調(diào)用了 Condition 的 signal() 方法后,CONDITION 狀態(tài)的結(jié)點(diǎn)將從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列中,等待獲取同步鎖。 - PROPAGATE = -3
共享模式下,前繼結(jié)點(diǎn)不僅會喚醒其后繼結(jié)點(diǎn),同時也可能會喚醒后繼的后繼結(jié)點(diǎn)。 - INITIAL = 0
新結(jié)點(diǎn)入隊(duì)時的默認(rèn)狀態(tài)。
從上面的代碼中可以看出,位于 CLH 鏈表中的線程以 2 種模式在等待資源,即 SHARED 和 EXCLUSIVE,其中 SHARED 表示共享模式,而 EXCLUSIVE 表示獨(dú)占模式。共享模式與獨(dú)占模式的主要區(qū)別在于,同一時刻獨(dú)占模式只能有一個線程獲取到資源,而共享模式在同一時刻可以有多個線程獲取到資源。典型的場景就是讀寫鎖,讀操作可以有多個線程同時獲取到讀鎖資源,而寫操作同一時刻只能有一個線程獲取到寫鎖資源,其它線程在嘗試獲取資源時都會被阻塞。
同步隊(duì)列主要行為
AQS 類成員變量 head 和 tail 字段分別指向同步隊(duì)列的頭結(jié)點(diǎn)和尾結(jié)點(diǎn):
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
其中 head 表示同步隊(duì)列的頭結(jié)點(diǎn),而 tail 則表示同步隊(duì)列的尾結(jié)點(diǎn),具體組織形式如下圖:

當(dāng)調(diào)用 AQS 的 acquire 方法獲取資源時,如果資源不足則當(dāng)前線程會被封裝成 Node 結(jié)點(diǎn)添加到同步隊(duì)列的末端(入隊(duì)),頭結(jié)點(diǎn) head 用于記錄當(dāng)前正在持有資源的線程結(jié)點(diǎn),而 head 的后繼結(jié)點(diǎn)就是下一個將要被調(diào)度的線程結(jié)點(diǎn),當(dāng) release 方法被調(diào)用時,該結(jié)點(diǎn)上的線程將被喚醒(出隊(duì)),繼續(xù)獲取資源。
同步隊(duì)列的主要行為是 :入隊(duì)、出隊(duì)
入隊(duì)
獲取資源失敗的線程需要封裝成 Node 節(jié)點(diǎn),接著尾部入隊(duì),在 AQS 中提供 addWaiter 函數(shù)完成 Node 節(jié)點(diǎn)的創(chuàng)建與入隊(duì)。添加節(jié)點(diǎn)的時候,如 CLH 隊(duì)列已經(jīng)存在,通過 CAS 快速將當(dāng)前節(jié)點(diǎn)添加到隊(duì)列尾部,如果添加失敗或隊(duì)列不存在,則初始化同步隊(duì)列。
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
總結(jié):入隊(duì)列,線程獲取鎖失敗,入隊(duì)列將新節(jié)點(diǎn)加到 tail 后面,然后對 tail 進(jìn)行 CAS 操作,將 tail 指針后移到新節(jié)點(diǎn)上。
出隊(duì)
CLH 隊(duì)列中的節(jié)點(diǎn)都是獲取資源失敗的線程節(jié)點(diǎn),當(dāng)持有資源的線程釋放資源時,會將 head.next 指向的線程節(jié)點(diǎn)喚醒(C L H 隊(duì)列的第二個節(jié)點(diǎn)),如果喚醒的線程節(jié)點(diǎn)獲取資源成功,線程節(jié)點(diǎn)清空信息設(shè)置為頭部節(jié)點(diǎn)(新哨兵節(jié)點(diǎn)),原頭部節(jié)點(diǎn)出隊(duì)(原哨兵節(jié)點(diǎn))
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 如果 state=0 了,就是可以釋放鎖了
free = true;
setExclusiveOwnerThread(null); // 將拿鎖線程置為 null
}
setState(c); // 重置同步器的 state
return free; // 返回是否成功釋放
}
private void unparkSuccessor(Node node) {
// node 節(jié)點(diǎn)是當(dāng)前釋放鎖的節(jié)點(diǎn),也是同步隊(duì)列的頭節(jié)點(diǎn)
int ws = node.waitStatus;
// 如果節(jié)點(diǎn)已經(jīng)被取消了,把節(jié)點(diǎn)的狀態(tài)置為初始化
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 拿出隊(duì)二 s
Node s = node.next;
// s 為空,表示 node 的后一個節(jié)點(diǎn)為空
// s.waitStatus 大于 0,代表 s 節(jié)點(diǎn)已經(jīng)被取消了
// 遇到以上這兩種情況,就從隊(duì)尾開始,向前遍歷,找到第一個 waitStatus 字段不是被取消的
if (s == null || s.waitStatus > 0) {
s = null;
// 結(jié)束條件是前置節(jié)點(diǎn)就是 head 了
for (Node t = tail; t != null && t != node; t = t.prev)
// t.waitStatus <= 0 說明 t 當(dāng)前沒有被取消,肯定還在等待被喚醒
if (t.waitStatus <= 0)
s = t;
}
// 喚醒以上代碼找到的線程
if (s != null)
LockSupport.unpark(s.thread);
}
當(dāng)線程被喚醒后,又繼續(xù)執(zhí)行 acquireQueued 方法,進(jìn)入循環(huán)
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
//獲取前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
//如果前驅(qū)節(jié)點(diǎn)是首節(jié)點(diǎn),獲取資源(子類實(shí)現(xiàn))
if (p == head && tryAcquire(arg)) {
//獲取資源成功,設(shè)置當(dāng)前節(jié)點(diǎn)為頭節(jié)點(diǎn),清空當(dāng)前節(jié)點(diǎn)的信息,把當(dāng)前節(jié)點(diǎn)變成哨兵節(jié)點(diǎn)
setHead(node);
//原來首節(jié)點(diǎn)下個節(jié)點(diǎn)指向?yàn)?null
p.next = null; // help GC
//返回線程中斷狀態(tài)
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
總結(jié):出隊(duì)列,鎖釋放喚醒 head 的后繼節(jié)點(diǎn),head 的后繼節(jié)點(diǎn)從阻塞中醒來,開始搶鎖,獲取鎖成功,此時 head 指針向后移一個位置,原先 head 的后繼節(jié)點(diǎn)成為新的 head。
最后是一個 同步隊(duì)列的流程概述

條件隊(duì)列
一個 AQS 可以對應(yīng)多個條件變量

ConditionObject 內(nèi)部維護(hù)著一個單向條件隊(duì)列,不同于 CLH 隊(duì)列,條件隊(duì)列只入隊(duì)執(zhí)行 await 的線程節(jié)點(diǎn),并且加入條件隊(duì)列的節(jié)點(diǎn),不能在 CLH 隊(duì)列, 條件隊(duì)列出隊(duì)的節(jié)點(diǎn),會入隊(duì)到 CLH 隊(duì)列。
當(dāng)某個線程執(zhí)行了 ConditionObject 的 await 函數(shù),阻塞當(dāng)前線程,線程會被封裝成 Node 節(jié)點(diǎn)添加到條件隊(duì)列的末端,其他線程執(zhí)行 ConditionObject 的 signal 函數(shù),會將條件隊(duì)列頭部線程節(jié)點(diǎn)轉(zhuǎn)移到 CLH 隊(duì)列參與競爭資源,具體流程如下圖:

一個 Condition 對象就有一個單項(xiàng)的等待任務(wù)隊(duì)列。在一個多線程任務(wù)中我們可以 new 出多個等待任務(wù)隊(duì)列。比如我們 new 出來兩個等待隊(duì)列。
private Lock lock = new ReentrantLock();
private Condition FirstCond = lock.newCondition();
private Condition SecondCond = lock.newCondition();
所以真正的 AQS 任務(wù)中一般是一個任務(wù)隊(duì)列 N 個等待隊(duì)列的,因此我們盡量調(diào)用 signal 而少用 signalAll,因?yàn)樵谥付ǖ膶?shí)例化等待隊(duì)列中只有一個可以拿到鎖的。
設(shè)計模式
從設(shè)計模式的角度講,AbstractQueuedSynchronizer 是個抽象類,所有用到方法的類都要繼承此類的若干方法,對應(yīng)的設(shè)計模式就是模版模式。 這樣就解決了實(shí)現(xiàn)同步器時涉及的大量細(xì)節(jié)問題,能夠極大地減少實(shí)現(xiàn)工作。
若干方法就是上文注釋翻譯中描述的這些方法:
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
說明一下各個方法的作用:
- tryAcquire :嘗試以獨(dú)占模式獲取資源,如果獲取成功則返回 true,否則返回 false。
- tryRelease :嘗試以獨(dú)占模式釋放資源,如果釋放成功則返回 true,否則返回 false。
- tryAcquireShared :嘗試以共享模式獲取資源,如果返回正數(shù)則說明獲取成功,且還有可用的剩余資源;如果返回 0 則說明獲取成功,但是沒有可用的剩余資源;如果返回負(fù)數(shù)則說明獲取資源失敗。
- tryReleaseShared :嘗試以共享模式釋放資源,如果釋放成功則返回 true,否則返回 false。
- isHeldExclusively :判斷當(dāng)前線程是否正在獨(dú)占資源,如果是則返回 true,否則返回 false。
AbstractQueuedSynchronizer 中的方法實(shí)現(xiàn)按照功能劃分可以分為兩大類,即獲取資源(acquire)和釋放資源(release),同時區(qū)分獨(dú)占模式和共享模式
可以看到具體實(shí)現(xiàn)類需要對資源在不同模式下的獲得和釋放進(jìn)行具體定義。
具體實(shí)現(xiàn)舉例來說可以到 ReentrantReadWriteLock 中的 內(nèi)部類 Sync 看一下。
持鎖的當(dāng)前線程:exclusiveOwnerThread
自實(shí)現(xiàn)
AQS 定義了一套多線程訪問共享資源的同步模板,解決了實(shí)現(xiàn)同步器時涉及的大量細(xì)節(jié)問題,能夠極大地減少實(shí)現(xiàn)工作,現(xiàn)在我們基于 AQS 實(shí)現(xiàn)一個不可重入的獨(dú)占鎖,直接使用 AQS 提供的獨(dú)占式模板,只需明確 state 的語義與實(shí)現(xiàn) tryAcquire 與 tryRelease 函數(shù)(獲取資源與釋放資源)。在這里 state 為 0 表示鎖沒有被線程持有,state 為 1 表示鎖已經(jīng)被某個線程持有,由于是不可重入鎖,所以不需要記錄持有鎖線程的獲取鎖次數(shù)。
不可重入的獨(dú)占鎖代碼如下
public class NonReentrantLock implements Lock {
/**
*
* @Description 自定義同步器
*/
private static class Sync extends AbstractQueuedSynchronizer {
/**
* 鎖是否被線程持有
*/
@Override
protected boolean isHeldExclusively() {
//0:未持有 1:已持有
return super.getState() == 1;
}
/**
* 獲取鎖
*/
@Override
protected boolean tryAcquire(int arg) {
if (arg != 1) {
//獲取鎖操作,是需要把 state 更新為 1,所以 arg 必須是 1
throw new RuntimeException("arg not is 1");
}
if (compareAndSetState(0, arg)) {//cas 更新 state 為 1 成功,代表獲取鎖成功
//設(shè)置持有鎖線程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 釋放鎖
*/
@Override
protected boolean tryRelease(int arg) {
if (arg != 0) {
//釋放鎖操作,是需要把 state 更新為 0,所以 arg 必須是 0
throw new RuntimeException("arg not is 0");
}
//清空持有鎖線程
setExclusiveOwnerThread(null);
//設(shè)置 state 狀態(tài)為 0,此處不用 cas,因?yàn)橹挥蝎@取鎖成功的線程才會執(zhí)行該函數(shù),不需要考慮線程安全問題
setState(arg);
return true;
}
/**
* 提供創(chuàng)建條件變量入口
*/
public ConditionObject createConditionObject() {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
/**
* 獲取鎖
*/
@Override
public void lock() {
//Aqs 獨(dú)占式-獲取資源模板函數(shù)
sync.acquire(1);
}
/**
* 獲取鎖-響應(yīng)中斷
*/
@Override
public void lockInterruptibly() throws InterruptedException {
//Aqs 獨(dú)占式-獲取資源模板函數(shù)(響應(yīng)線程中斷)
sync.acquireInterruptibly(1);
}
/**
* 獲取鎖是否成功-不阻塞
*/
@Override
public boolean tryLock() {
//子類實(shí)現(xiàn)
return sync.tryAcquire(1);
}
/**
* 獲取鎖-超時機(jī)制
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
//Aqs 獨(dú)占式-獲取資源模板函數(shù)(超時機(jī)制)
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
/**
* 釋放鎖
*/
@Override
public void unlock() {
//Aqs 獨(dú)占式-釋放資源模板函數(shù)
sync.release(0);
}
/**
* 創(chuàng)建條件變量
*/
@Override
public Condition newCondition() {
return sync.createConditionObject();
}
}
NonReentrantLock 定義了一個內(nèi)部類 Sync,Sync 用來實(shí)現(xiàn)具體的鎖操作,它繼承了 AQS,因?yàn)槭褂玫氖仟?dú)占式模板,所以重寫 tryAcquire 與 tryRelease 函數(shù),另外提供了一個創(chuàng)建條件變量的入口,下面使用自定義的獨(dú)占鎖來同步兩個線程對 j++。
private static int j = 0;
public static void main(String[] agrs) throws InterruptedException {
NonReentrantLock nonReentrantLock = new NonReentrantLock();
Runnable runnable = () -> {
//獲取鎖
nonReentrantLock.lock();
for (int i = 0; i < 100000; i++) {
j++;
}
//釋放鎖
nonReentrantLock.unlock();
};
Thread thread = new Thread(runnable);
Thread threadTwo = new Thread(runnable);
thread.start();
threadTwo.start();
thread.join();
threadTwo.join();
System.out.println(j);
}
無論執(zhí)行多少次輸出內(nèi)容都是:
200000
其他
LockSupport 輔助類
LockSupport 是一個線程阻塞工具類,所有的方法都是靜態(tài)方法,可以讓線程在任意位置阻塞,當(dāng)然阻塞之后肯定得有喚醒的方法。
park 是因?yàn)?park 英文意思為停車。我們?nèi)绻?Thread 看成一輛車的話,park 就是讓車停下,unpark 就是讓車啟動然后跑起來。
park/unpark 調(diào)用的是 Unsafe(提供 CAS 操作) 中的 native 代碼。
說明
本文參考了非常多網(wǎng)絡(luò)圖片和文章資料,嚴(yán)格意義上已經(jīng)不算原創(chuàng)文章了,如本文引有的內(nèi)容有侵權(quán),請聯(lián)系刪除。
參考
- https://blog.csdn.net/lpf463061655/article/details/87290730
- https://www.codenong.com/cs106963035/
- https://mp.weixin.qq.com/s/bxWgo9IuggDpE1l37JqEhQ
- https://www.modb.pro/db/108644
- https://mp.weixin.qq.com/s/BLnZYa4lbXUx3KEQm7Z7tA
- https://mp.weixin.qq.com/s/Y4GbMdNmSDvHtomxtObRSg
- https://www.baiyp.ren/CLH%E9%98%9F%E5%88%97%E9%94%81.html
- https://juejin.cn/post/6873020483755884552
- https://mp.weixin.qq.com/s?__biz=MzU4NzU0MDIzOQ==&mid=2247488891&idx=1&sn=227928446c692aaa0085557682ed732d&scene=21#wechat_redirect
更多精彩內(nèi)容,請關(guān)注公眾號 “小盒子的技術(shù)分享”