1、AQS介紹
AQS全稱AbstractQueuedSynchronizer,是一個同步器,用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架。內(nèi)部主要使用一個volatile修飾的state變量和一個FIFO雙向隊(duì)列來實(shí)現(xiàn)的。
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
ReentrantLock、Semaphore、CountDownLatch等都是基于AQS實(shí)現(xiàn)的。
同步器的主要使用方式是繼承,子類通過繼承同步器并實(shí)現(xiàn)它的抽象方法來管理同步狀 態(tài),在抽象方法的實(shí)現(xiàn)過程中免不了要對同步狀態(tài)進(jìn)行更改,這時就需要使用同步器提供的3 個方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))來進(jìn)行操 作,因?yàn)樗鼈兡軌虮WC狀態(tài)的改變是安全的。

子類推薦被定義為自定義同步組件的靜態(tài)內(nèi)部類,同步器自身沒有實(shí)現(xiàn)任何同步接口,它僅僅是定義了若干同步狀態(tài)獲取和釋放的方法來 供自定義同步組件使用,同步器既可以支持獨(dú)占式地獲取同步狀態(tài),也可以支持共享式地獲 取同步狀態(tài),這樣就可以方便實(shí)現(xiàn)不同類型的同步組件(ReentrantLock、 ReentrantReadWriteLock和CountDownLatch等)
public class Main {
static class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
return super.tryAcquire(arg);
}
@Override
protected boolean tryRelease(int arg) {
return super.tryRelease(arg);
}
@Override
protected int tryAcquireShared(int arg) {
return super.tryAcquireShared(arg);
}
@Override
protected boolean tryReleaseShared(int arg) {
return super.tryReleaseShared(arg);
}
@Override
protected boolean isHeldExclusively() {
return super.isHeldExclusively();
}
}
}
同步器是實(shí)現(xiàn)鎖(也可以是任意同步組件)的關(guān)鍵,在鎖的實(shí)現(xiàn)中聚合同步器,利用同步器實(shí)現(xiàn)鎖的語義??梢赃@樣理解二者之間的關(guān)系:鎖是面向使用者的,它定義了使用者與鎖交 互的接口(比如可以允許兩個線程并行訪問),隱藏了實(shí)現(xiàn)細(xì)節(jié);同步器面向的是鎖的實(shí)現(xiàn)者,它簡化了鎖的實(shí)現(xiàn)方式,屏蔽了同步狀態(tài)管理、線程的排隊(duì)、等待與喚醒等底層操作。鎖和同步器很好地隔離了使用者和實(shí)現(xiàn)者所需關(guān)注的領(lǐng)域。
2、同步器的接口與示例
同步器的設(shè)計(jì)是基于模板方法模式的,也就是說,使用者需要繼承同步器并重寫指定的方法,隨后將同步器組合在自定義同步組件的實(shí)現(xiàn)中,并調(diào)用同步器提供的模板方法,而這些模板方法將會調(diào)用使用者重寫的方法。 這句話聽起來很繞,慢慢往后看就懂了。
我們在繼承AQS并重寫那5個方法的時候,需要調(diào)用AQS提供的幾個方法去訪問或修改stats變量的狀態(tài)。
// 獲取當(dāng)前同步狀態(tài)
protected final int getState() {
return state;
}
// 設(shè)置當(dāng)前同步狀態(tài)
protected final void setState(int newState) {
state = newState;
}
// 使用CAS設(shè)置當(dāng)前狀態(tài),CAS能保證操作的原子性
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
我們在繼承同步器時可重寫的方法如下

在實(shí)現(xiàn)自定義同步組件時,也會調(diào)用同步器提供的模板方法,部分模板方法如下

AQS提供的模板方法主要分為三類:
(1)獨(dú)占式獲取和釋放同步狀態(tài)。
(2)共享式獲取和釋放同步狀態(tài)。
(3)查詢同步隊(duì)列中等待的線程情況。
3、實(shí)現(xiàn)一個獨(dú)占鎖
獨(dú)占鎖,就是同一時刻只能一個線程擁有鎖?;贏QS,我們可以很方便的實(shí)現(xiàn)。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 基于AQS實(shí)現(xiàn)獨(dú)占鎖
*/
public class ExclusiveLock implements Lock {
/**
* 實(shí)現(xiàn)自定義的同步器
*/
private static class Sync extends AbstractQueuedSynchronizer {
/**
* 獲取鎖
* @param arg
* @return
*/
@Override
protected boolean tryAcquire(int arg) {
// 設(shè)置同步變量state為1
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
/**
* 是否處于占用狀態(tài)
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 每個condition都包含一個condition隊(duì)列
Condition newCondition() {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
一般在實(shí)現(xiàn)自定義同步器時,都會把它作為靜態(tài)內(nèi)部類去實(shí)現(xiàn)。上面實(shí)現(xiàn)的ExclusiveLock類中就是定義了一個靜態(tài)內(nèi)部類Sync去繼承AQS實(shí)現(xiàn)獨(dú)占鎖邏輯的。通過CAS設(shè)置同步變量state值為1表示獲取鎖成功,釋放鎖時把state設(shè)置為0即可。
4、AQS的實(shí)現(xiàn)分析
4.1 同步隊(duì)列
AQS內(nèi)部依賴同步隊(duì)列(雙向FIFO隊(duì)列)來進(jìn)行線程的管理,當(dāng)線程獲取鎖失敗時,會將線程以及等待狀態(tài)信息構(gòu)造為一個節(jié)點(diǎn)Node并將其放入同步隊(duì)列隊(duì)尾,然后阻塞該線程。當(dāng)鎖釋放的時候,會把隊(duì)首節(jié)點(diǎn)中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。
Node是AQS中的一個靜態(tài)內(nèi)部類,用來保存獲取同步狀態(tài)失敗的線程引用、等待狀態(tài)以及前驅(qū)和后繼節(jié)點(diǎn),主要字段如下
static final class Node {
// 表示節(jié)點(diǎn)是共享模式
static final Node SHARED = new Node();
// 表示節(jié)點(diǎn)是獨(dú)占模式
static final Node EXCLUSIVE = null;
// 由于在同步隊(duì)列中等待的線程等待超時或者被中斷,需要從同步隊(duì)列中取消等待,節(jié)點(diǎn)進(jìn)入該狀態(tài)將不會改變。
static final int CANCELLED = 1;
// 后繼節(jié)點(diǎn)的線程處于等待狀態(tài),而當(dāng)前節(jié)點(diǎn)的線程如果釋放了同步狀態(tài)或者被取消,將會通知后繼節(jié)點(diǎn),使后繼節(jié)點(diǎn)的線程得以運(yùn)行。
static final int SIGNAL = -1;
// 節(jié)點(diǎn)在等待隊(duì)列中,節(jié)點(diǎn)線程等待在Condition上,當(dāng)其他線程對Condition調(diào)用了signal()方法后,該節(jié)點(diǎn)將會從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列中,加入到對同步狀態(tài)的獲取中。
static final int CONDITION = -2;
// 表示下一次共享式同步狀態(tài)獲取將會無條件的傳播下去。
static final int PROPAGATE = -3;
/**
* 等待狀態(tài),取值如下
* SIGNAL 值為-1,
*
* CANCELLED 值為1,
*
* CONDITION 值為-2,
*
* PROPAGATE 值為-3,
*
* INITIAL 值為0,表示初始狀態(tài)。
*/
volatile int waitStatus;
// 前驅(qū)節(jié)點(diǎn),當(dāng)節(jié)點(diǎn)加入同步隊(duì)列時被設(shè)置(尾部添加)
volatile Node prev;
// 后繼節(jié)點(diǎn)
volatile Node next;
// 獲取同步狀態(tài)的線程
volatile Thread thread;
// 等待隊(duì)列中的后繼節(jié)點(diǎn)。如果當(dāng)前節(jié)點(diǎn)是共享的,那么這個字段將是一個SHARED常量,也就是說節(jié)點(diǎn)類型(獨(dú)占和共享)和等待隊(duì)列中的后繼節(jié)點(diǎn)共用同一個字段
Node nextWaiter;
}
沒有成功獲取同步狀態(tài)的線程將會加入同步隊(duì)列的尾部,這個加入的過程也必須要保證線程安全,因此AQS提供了compareAndSetTail(Node expect, Node update)方法。同步隊(duì)列的結(jié)構(gòu)大致如下

同步隊(duì)列設(shè)置尾結(jié)點(diǎn)的過程大致如下

同步隊(duì)列的首節(jié)點(diǎn)是獲取鎖成功的線程,首節(jié)點(diǎn)的線程在釋放鎖后,將會喚醒后繼節(jié)點(diǎn),后繼節(jié)點(diǎn)如果獲取鎖成功的同時會把自己設(shè)置為頭結(jié)點(diǎn)

設(shè)置頭結(jié)點(diǎn)是通過獲取同步狀態(tài)成功的線程來完成的,由于只有一個線程能成功獲取到同步狀態(tài),因此設(shè)置頭結(jié)點(diǎn)的方法并不需要CAS來保證,它只需要將首節(jié)點(diǎn)設(shè)置為頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)然后斷開首節(jié)點(diǎn)即可。
4.2 獨(dú)占式同步狀態(tài)獲取和釋放
我們在實(shí)現(xiàn)自定義的獨(dú)占式同步器時,主要重寫了AQS的tryAcquire和tryRelease方法,通過操作同步變量state完成同步狀態(tài)的獲取和釋放。
我們可以調(diào)用AQS對外提供的acquire獲取同步狀態(tài),該方法會調(diào)用我們重新的tryAcquire方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
如果調(diào)用tryAcquire方法返回為false,則通過addWaiter把線程加入同步隊(duì)列隊(duì)尾,并標(biāo)志位獨(dú)占Node.EXCLUSIVE。通過CAS確保節(jié)點(diǎn)能安全的添加到隊(duì)列尾。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
加入隊(duì)列尾后,通過CAS不斷的嘗試獲取同步狀態(tài)。只有在前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)的情況下,才有可能獲取到同步狀態(tài)。如果獲取不到則阻塞節(jié)點(diǎn)中的線程,而被阻塞線程的喚醒主要依靠前驅(qū)節(jié)點(diǎn)的出隊(duì)或阻塞線程被中斷來實(shí)現(xiàn)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在acquireQueued(final Node node,int arg)方法中,當(dāng)前線程在“死循環(huán)”中嘗試獲取同步狀態(tài),而只有前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn)才能夠嘗試獲取同步狀態(tài),這是為什么?原因有兩個,如下。
(1)頭節(jié)點(diǎn)是成功獲取到同步狀態(tài)的節(jié)點(diǎn),而頭節(jié)點(diǎn)的線程釋放了同步狀態(tài)之后,將會喚醒其后繼節(jié)點(diǎn),后繼節(jié)點(diǎn)的線程被喚醒后需要檢查自己的前驅(qū)節(jié)點(diǎn)是否是頭節(jié)點(diǎn)。
(2)維護(hù)同步隊(duì)列的FIFO原則。

獨(dú)占式同步狀態(tài)獲取流程,也就是acquire(int arg)方法調(diào)用流程大致如下

上圖中,當(dāng)同步狀態(tài)獲取成功之后,當(dāng)前線程從acquire(int arg)方法返回,如果對于鎖這種并發(fā)組件而言,代表著當(dāng)前線程獲取了鎖。
當(dāng)線程獲取同步狀態(tài)并執(zhí)行完相應(yīng)的邏輯后,就需要釋放同步狀態(tài),通過調(diào)用AQS提供的release方法。該方法在釋放了同步狀態(tài)之后,會喚醒其后繼節(jié)點(diǎn)(進(jìn)而使后繼節(jié)點(diǎn)重新嘗試獲取同步狀態(tài))。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
該方法執(zhí)行時,會喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)線程,unparkSuccessor(Node node)方法使用LockSupport來喚醒處于等待狀態(tài)的線程。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
分析了獨(dú)占式同步狀態(tài)獲取和釋放過程后,適當(dāng)做個總結(jié):在獲取同步狀態(tài)時,同步器維護(hù)一個同步隊(duì)列,獲取狀態(tài)失敗的線程都會被加入到隊(duì)列中并在隊(duì)列中進(jìn)行自旋;移出隊(duì)列(或停止自旋)的條件是前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)且成功獲取了同步狀態(tài)。在釋放同步狀態(tài)時,同步器調(diào)用tryRelease(int arg)方法釋放同步狀態(tài),然后喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)。
4.3 共享式同步狀態(tài)獲取和釋放
共享式獲取與獨(dú)占式獲取最主要的區(qū)別在于同一時刻能否有多個線程同時獲取到同步狀態(tài)。以文件的讀寫為例,如果一個程序在對文件進(jìn)行讀操作,那么這一時刻對于該文件的寫操作均被阻塞,而讀操作能夠同時進(jìn)行。寫操作要求對資源的獨(dú)占式訪問,而讀操作可以是共享式訪問。
通過調(diào)用同步器的acquireShared(int arg)方法可以共享式地獲取同步狀態(tài)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在acquireShared方法中,會調(diào)用我們重寫的tryAcquireShared方法嘗試獲取同步狀態(tài),該方法返回值為int,返回值大于等于0時,表示獲取成功。因此,在共享式獲取的自旋過程中,成功獲取到同步狀態(tài)并退出自旋的條件就是tryAcquireShared(int arg)方法返回值大于等于0??梢钥吹?,在doAcquireShared(int arg)方法的自旋過程中,如果當(dāng)前節(jié)點(diǎn)的前驅(qū)為頭節(jié)點(diǎn)時,嘗試獲取同步狀態(tài),如果返回值大于等于0,表示該次獲取同步狀態(tài)成功并從自旋過程中退出。
與獨(dú)占式一樣,共享式獲取也需要釋放同步狀態(tài),通過調(diào)用releaseShared(int arg)方法可以釋放同步狀態(tài)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
該方法在釋放同步狀態(tài)之后,將會喚醒后續(xù)處于等待狀態(tài)的節(jié)點(diǎn)。對于能夠支持多個線程同時訪問的并發(fā)組件(比如Semaphore),它和獨(dú)占式主要區(qū)別在于tryReleaseShared(int arg)方法必須確保同步狀態(tài)(或者資源數(shù))線程安全釋放,一般是通過循環(huán)和CAS來保證的,因?yàn)獒尫磐綘顟B(tài)的操作會同時來自多個線程。
4.4 超時獲取同步狀態(tài)
通過調(diào)用同步器的doAcquireNanos(int arg,long nanosTimeout)和doAcquireSharedNanos(int arg, long nanosTimeout)方法可以超時獲取同步狀態(tài),即在指定的時間段內(nèi)獲取同步狀態(tài),如果獲取到同步狀態(tài)則返回true,否則,返回false。該方法提供了傳統(tǒng)Java同步操作(比如synchronized關(guān)鍵字)所不具備的特性。
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 從現(xiàn)在起經(jīng)過nanosTimeout后超時
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 調(diào)用tryAcquire方法獨(dú)占式獲取同步狀態(tài)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 更新超時時間
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
大致流程如下
