概述
AbstractQueuedSynchronizer,即隊列同步器。它是構(gòu)建鎖或者其他同步組件的基礎框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC并發(fā)包的作者(Doug Lea)期望它能夠成為實現(xiàn)大部分同步需求的基礎。它是JUC并發(fā)包中的核心基礎組件。
設計
AQS的基本設計思想并不復雜:
獲取鎖:
while (synchronization state does not allow acquire) {
enqueue current thread if not already queued;
possibly block current thread;
}
dequeue current thread if it was queued;
釋放鎖:
update synchronization state;
if (state may permit a blocked thread to acquire)
unblock one or more queued threads;
這兩段代碼中有三個基本組件:
- state
一個同步操作的變量,用以表示鎖的狀態(tài),加鎖之前需要去修改state,修改成功了說明獲取鎖成功。當然,解鎖時也需要更新鎖的狀態(tài) - queue
等待隊列,獲取鎖失敗的線程需要進入這個隊列,等待著被喚醒,再次嘗試獲取鎖。獲取到鎖的線程,需要出隊列(如果線程在等待隊列中的話) - block and unblock
在線程進入等待隊列后,需要阻塞自己。釋放鎖時,需要喚醒等待隊列中的一個或多個線程,以讓它獲取鎖。
Synchronization State
state是整個機制中競爭訪問最激烈的對象了,它的操作應該具備以下三個條件:
- 原子性
state的更新必定得具備原子性,以保證更新動作的完整,i++這種錯誤在state的更新中是絕對不允許的。 - 可見性
state的更新對其他線程應當是立即可見的。 - 有序性
鑒于Java的指令重排序優(yōu)化,使得有序性成為同步操作的一個必要條件??紤]一下單例模式的雙鎖檢測實現(xiàn)方式,正是因為亂序執(zhí)行引發(fā)的錯誤(java 1.5后用volatile可修復)
在AQS的實現(xiàn)中,state是一個volatile修飾的int變量。volatile變量可以滿足state操作內(nèi)存可見性、有序性(內(nèi)存屏障)的要求。但是volatile并不保證原子性,所以需要Unsale包的CAS操作來保證更新的原子性。
比較并交換(compare and swap, CAS),是原子操作的一種,可用于在多線程編程中實現(xiàn)不被打斷的數(shù)據(jù)交換操作,從而避免多線程同時改寫某一數(shù)據(jù)時由于執(zhí)行順序不確定性以及中斷的不可預知性產(chǎn)生的數(shù)據(jù)不一致問題。 該操作通過將內(nèi)存中的值與指定數(shù)據(jù)進行比較,當數(shù)值一樣時將內(nèi)存中的數(shù)據(jù)替換為新的值。
---- wikipedia
CLH Queue
CLH同步隊列是一個FIFO雙向隊列,AQS依賴它來完成同步狀態(tài)的管理,當前線程如果獲取同步狀態(tài)失敗時,AQS則會將當前線程已經(jīng)等待狀態(tài)等信息構(gòu)造成一個節(jié)點(Node)并將其加入到CLH同步隊列,同時會阻塞當前線程,當同步狀態(tài)釋放時,會把首節(jié)點喚醒(公平鎖),使其再次嘗試獲取同步狀態(tài)。
CLH隊列的優(yōu)勢在于它的入隊和出隊都是快速的無鎖無障礙操作(CAS樂觀鎖機制),即使是在線程爭用的情況下,也能保證有一個線程能快速入隊出隊。檢測是否有線程在等待也是快速的,只需要判斷head==tail就可以了,無需加鎖。
在CLH同步隊列中,一個節(jié)點表示一個線程,它保存著線程的引用(thread)、狀態(tài)(waitStatus)、前驅(qū)節(jié)點(prev)、后繼節(jié)點(next),其定義如下:
static final class Node {
// 標記當前結(jié)點是共享模式
static final Node SHARED = new Node();
// 標記當前結(jié)點是獨占模式
static final Node EXCLUSIVE = null;
// 代表線程已經(jīng)被取消
static final int CANCELLED = 1;
// 代表后續(xù)節(jié)點需要喚醒
static final int SIGNAL = -1;
// 代表線程在condition queue中,等待某一條件
static final int CONDITION = -2;
// 代表后續(xù)結(jié)點會傳播喚醒的操作,共享模式下起作用
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
// 指向條件隊列的下一個結(jié)點,或者特殊的SHARED結(jié)點
Node nextWaiter;
}
入隊:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
可以看到,入隊是用了CAS樂觀鎖來控制并發(fā)的。這里有一個小優(yōu)化,先快速嘗試入隊,失敗了再進入自旋循環(huán)反復嘗試。
需要注意的是,compareAndSetTail(t, node)和t.next = node這兩步操作并非原子操作,也就是說在通過t.next并不一定能找到下一個結(jié)點。原始的CLH隊列中僅僅是一個單向的隊列(從tail到head),next是AQS的一個小優(yōu)化,可以迅速找到下一個結(jié)點,找不到的時候,需要從tail結(jié)點回溯,因為t.next = node可能還沒來得及執(zhí)行。
出隊:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
出隊只需要設置新的head就可以了。
Block and Unblock
LockSupport類通過native方法提供了線程的阻塞(park)和喚醒(unpark)方法。并且,park方法還可以設置時間或者設置許可(blocker,等待對象),很明顯設置超時時間可以用于解決鎖的超時問題,設置許可可以解決鎖的條件等待問題。
使用
AQS采用模板方法設計模式,AQS提供了大量的方法幫助我們實現(xiàn)同步,子類只需要繼承它并根據(jù)需求選擇實現(xiàn)它的部分抽象方法來管理抽象狀態(tài)就可以了。
AQS提供了兩種模式的同步——獨占式和共享式,以及這兩種模式下的同步狀態(tài)獲取方法(包括 acquire、acquireInterruptibly、tryAcquireNanos)和釋放(release方法)。
獨占式
獨占式:同一時刻僅有一個線程持有同步狀態(tài)。
acquire
acquire是最最常用的一種方式,基于AQS實現(xiàn)的鎖一般實現(xiàn)lock方法都是直接調(diào)用這個方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
-
tryAcquire是由子類實現(xiàn)的,嘗試獲取鎖,如果成功了返回true。該方法必須必須保證線程安全地對state進行操作。 -
addWaiter在上文中已經(jīng)分析過了,將當前線程加入等待隊列。 -
acquireQueued方法比較復雜,我們來看看源碼:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
final Node p = node.predecessor();
// 當前線程是head時,嘗試獲取,成功則返回
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判斷是否需要park,需要則park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 檢測到中斷,記錄在案,接著干我的(中斷就中斷,關(guān)我啥事)
interrupted = true;
}
} finally {
// 失敗了,取消獲取
if (failed)
cancelAcquire(node);
}
}
-
selfInterrupt獲取失敗,自行了斷(中斷)
acquireInterruptibly
acquire方法對中斷不響應,對線程進行中斷操作后,該線程會依然位于CLH同步隊列中等待著獲取同步狀態(tài)。為了響應中斷,AQS提供了acquireInterruptibly(int arg)方法,該方法在等待獲取同步狀態(tài)時,如果當前線程被中斷了,會立刻響應中斷拋出異常InterruptedException。
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 檢測中斷
if (Thread.interrupted())
throw new InterruptedException();
// 照例先嘗試獲取
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
doAcquireInterruptibly
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 跟acquireQueued唯一的區(qū)別在于,檢測到中斷,立刻拋出異常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquireNanos
tryAcquireNanos是tryAcquire的增強版,在響應中斷的同時,增加了超時機制
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
doAcquireNanos
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 計算deadline
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 判斷是否超時
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 使用LockSupport.parkNanos方法,超時自動喚醒
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
-
tryRelease由子類實現(xiàn),同步設置state -
unparkSuccessor喚醒等待隊列的head線程
共享式
共享式:在同一時刻可以有多個線程獲取同步狀態(tài)
acquireShared
acquireShared對應于acquire
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
-
tryAcquireShared也是由子類實現(xiàn)的,同步嘗試修改state值 doAcquireShared
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 跟doAcquire的不同在于此
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* 嘗試喚醒后繼的結(jié)點:<br />
* propagate > 0說明許可還有能夠繼續(xù)被線程acquire;<br />
* 或者 之前的head被設置為PROPAGATE(PROPAGATE可以被轉(zhuǎn)換為SIGNAL)說明需要往后傳遞;<br />
* 或者為null,我們還不確定什么情況。 <br />
* 并且 后繼結(jié)點是共享模式或者為如上為null。
* <p>
* 上面的檢查有點保守,在有多個線程競爭獲取/釋放的時候可能會導致不必要的喚醒。<br />
*
*/
if (propagate > 0 || h == null || h.waitStatus < 0) {
Node s = node.next;
// 后繼結(jié)是共享模式或者s == null(不知道什么情況)
// 如果后繼是獨占模式,那么即使剩下的許可大于0也不會繼續(xù)往后傳遞喚醒操作
// 即使后面有結(jié)點是共享模式。
if (s == null || s.isShared())
// 喚醒后繼結(jié)點
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
// 隊列不為空且有后繼結(jié)點
if (h != null && h != tail) {
int ws = h.waitStatus;
// 不管是共享還是獨占只有結(jié)點狀態(tài)為SIGNAL才嘗試喚醒后繼結(jié)點
if (ws == Node.SIGNAL) {
// 將waitStatus設置為0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);// 喚醒后繼結(jié)點
// 如果狀態(tài)為0則更新狀態(tài)為PROPAGATE,更新失敗則重試
} else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果過程中head被修改了則重試。
if (h == head) // loop if head changed
break;
}
}
tryAcquireSharedInterruptibly、tryAcquireSharedNanos跟tryAcquireInterruptibly、tryAcquireNanos類似,都是在tryAcquireShared的基礎上加了中斷響應和超時機制,這里不做分析。
releaseShared
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
-
tryReleaseShared同樣的由子類實現(xiàn) -
doReleaseShared見上文分析
結(jié)論
AQS基于volatile關(guān)鍵字、cas操作、自旋鎖、park和unpark支持等簡單高效的操作,為java構(gòu)建了一套高效、通用的同步框架。AQS的設計思想簡單易懂,然而基于AQS的同步類卻復雜高效,且用途各異,實在牛逼。
本文尚有不少欠缺,未能涉及Condition部分,也沒有關(guān)于ReentrantLock、Semaphore等基于AQS框架實現(xiàn)的同步器的分析,后續(xù)當繼續(xù)分析。
參考
Java 源碼
The java.util.concurrent Synchronizer Framework
java 7 doc
J.U.C之AQS
Java Concurrency In Practice