1 概述
哪里使用AQS?我們最常用的ReentrantLock類其實(shí)就是使用CAS和AQS來實(shí)現(xiàn)的。
ReentrantLock的構(gòu)造方法中,sync對(duì)象其實(shí)就是繼承了AbstractQueuedSynchronizer(AQS)。
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
....
}
public ReentrantLock() {
sync = new NonfairSync();
}
AQS翻譯過來是,抽象隊(duì)列同步器,其定義了一套多線程訪問共享變量的同步器框架。
內(nèi)部類Sync就是自定義同步器,采用內(nèi)部類的方式也是作者建議的。
AbstractQueuedSynchronizer的源碼里作者還給了一個(gè)圖。
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
畫詳細(xì)點(diǎn)

2 state
state我們可以理解成為表示當(dāng)前同步情況的一個(gè)狀態(tài)變量。state是使用volatile修飾的一個(gè)變量,它的值代表鎖的狀態(tài),state > 0表示鎖被使用,state < 0表示鎖被釋放。
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
getState用于獲取state的值,比如一個(gè)線程調(diào)用該方法查看state的值是不是0,來判斷當(dāng)前是不是鎖定的狀態(tài)。
setState用來直接將state的值修改成其他,比如重入鎖重入的時(shí)候?qū)tate的值+1。
compareAndSetState使用CAS的方式修改state的值,比如線程獲取到鎖嘗試修改state的值為1。
state有兩種共享方式:Exclusive(獨(dú)占,只能有一個(gè)線程使用state)和 Share(共享,多個(gè)線程可以同時(shí)使用state)。
3 CLH同步隊(duì)列
這里的CLH同步隊(duì)列是一個(gè)FIFO(先進(jìn)先出的)雙向隊(duì)列,元素的節(jié)點(diǎn)類型為Node,并且通過head和tail來記錄隊(duì)首和隊(duì)尾的元素。Node節(jié)點(diǎn)對(duì)象用來維護(hù)需要獲取鎖的線程,當(dāng)前一個(gè)節(jié)點(diǎn)釋放鎖的時(shí)候,該節(jié)點(diǎn)的線程就會(huì)被喚醒。
private transient volatile Node head;
private transient volatile Node tail;
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
說明一下幾個(gè)重要的常量和變量:
SHARED:表示被標(biāo)記的線程是因?yàn)橐怨蚕淼姆绞将@取state失敗而進(jìn)入阻塞隊(duì)列的;
EXCLUSIV:表示被標(biāo)記的線程是因?yàn)橐元?dú)占的方式獲取state失敗而進(jìn)入阻塞隊(duì)列的;
-
waitStatus:表示線程等待的狀態(tài)
static final int CANCELLED = 1; 表示線程因?yàn)橹袛嗷蛘叩却瑫r(shí),需要從等待隊(duì)列中取消等待;
static final int SIGNAL = -1; 鎖被占用,隊(duì)列中的head(僅僅代表頭結(jié)點(diǎn),里面沒有存放線程引用)的后繼結(jié)點(diǎn)node1處于等待狀態(tài),如果已占有鎖的線程釋放鎖或被CANCEL之后就會(huì)通知這個(gè)結(jié)點(diǎn)node1去獲取鎖執(zhí)行。
static final int CONDITION = -2; 表示結(jié)點(diǎn)在等待隊(duì)列中(這里指的是等待在某個(gè)lock的condition上,關(guān)于Condition的原理下面會(huì)寫到),當(dāng)持有鎖的線程調(diào)用了Condition的signal()方法之后,結(jié)點(diǎn)會(huì)從該condition的等待隊(duì)列轉(zhuǎn)移到該lock的同步隊(duì)列上,去競(jìng)爭lock。(注意:這里的同步隊(duì)列就是我們說的AQS維護(hù)的FIFO隊(duì)列,等待隊(duì)列則是每個(gè)condition關(guān)聯(lián)的隊(duì)列)
static final int PROPAGATE = -3; 表示下一次共享狀態(tài)獲取將會(huì)傳遞給后繼結(jié)點(diǎn)獲取這個(gè)共享同步狀態(tài)。
我們可以通過繼承AbstractQueuedSynchronizer類來自己實(shí)現(xiàn)一個(gè)鎖,使用的時(shí)候需要重寫一些指定的方法(模板方法模式)。
//獨(dú)占式的獲取同步狀態(tài),實(shí)現(xiàn)該方法需要查詢當(dāng)前狀態(tài)并判斷同步狀態(tài)是否符合預(yù)期,然后再進(jìn)行CAS設(shè)置同步狀態(tài)
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
//獨(dú)占式的釋放同步狀態(tài),等待獲取同步狀態(tài)的線程可以有機(jī)會(huì)獲取同步狀態(tài)
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
//共享式的獲取同步狀態(tài)
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}
//嘗試將狀態(tài)設(shè)置為以共享模式釋放同步狀態(tài)。 該方法總是由執(zhí)行釋放的線程調(diào)用。
protected int tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
//當(dāng)前同步器是否在獨(dú)占模式下被線程占用,一般該方法表示是否被當(dāng)前線程所獨(dú)占
protected int isHeldExclusively(int arg) { throw new UnsupportedOperationException();}
4 使用ReentrantLock分析
我們通過ReentrantLock來幫助了解一下加鎖和釋放鎖時(shí)state和CLH隊(duì)列的具體操作情況。
4.1 先來看lock方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
很好理解,state沒有被占用的時(shí)候,當(dāng)前線程使用鎖,并將獨(dú)占鎖的線程持有者設(shè)置為自己。
否則,調(diào)用acquire方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
第一個(gè)條件
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//(1)獲取當(dāng)前線程
final Thread current = Thread.currentThread();
//(2)獲得當(dāng)前同步狀態(tài)state
int c = getState();
//(3)如果state==0,表示沒有線程獲取
if (c == 0) {
//(3-1)那么就嘗試以CAS的方式更新state的值
if (compareAndSetState(0, acquires)) {
//(3-2)如果更新成功,就設(shè)置當(dāng)前獨(dú)占模式下同步狀態(tài)的持有者為當(dāng)前線程
setExclusiveOwnerThread(current);
//(3-3)獲得成功之后,返回true
return true;
}
}
//(4)這里是重入鎖的邏輯
else if (current == getExclusiveOwnerThread()) {
//(4-1)判斷當(dāng)前占有state的線程就是當(dāng)前來再次獲取state的線程之后,就計(jì)算重入后的state
int nextc = c + acquires;
//(4-2)這里是風(fēng)險(xiǎn)處理
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//(4-3)通過setState無條件的設(shè)置state的值,(因?yàn)檫@里也只有一個(gè)線程操作state的值,即
//已經(jīng)獲取到的線程,所以沒有進(jìn)行CAS操作)
setState(nextc);
return true;
}
//(5)沒有獲得state,也不是重入,就返回false
return false;
}
第一個(gè)條件總結(jié)來說就是再次嘗試占用state,如果成功當(dāng)前線程使用鎖,如果失敗并且當(dāng)前線程和正在使用鎖的線程是同一個(gè),那么重入該鎖。如果都不是,返回false。
第二個(gè)條件又分為幾個(gè)方法,我們都看一下
private Node addWaiter(Node mode) {
//(1)將當(dāng)前線程以及阻塞原因(是因?yàn)镾HARED模式獲取state失敗還是EXCLUSIVE獲取失敗)構(gòu)造為Node結(jié)點(diǎn)
Node node = new Node(Thread.currentThread(), mode);
//(2)這一步是快速將當(dāng)前線程插入隊(duì)列尾部
Node pred = tail;
if (pred != null) {
//(2-1)將構(gòu)造后的node結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)設(shè)置為tail
node.prev = pred;
//(2-2)以CAS的方式設(shè)置當(dāng)前的node結(jié)點(diǎn)為tail結(jié)點(diǎn)
if (compareAndSetTail(pred, node)) {
//(2-3)CAS設(shè)置成功,就將原來的tail的next結(jié)點(diǎn)設(shè)置為當(dāng)前的node結(jié)點(diǎn)。這樣這個(gè)雙向隊(duì)
//列就更新完成了
pred.next = node;
return node;
}
}
//(3)執(zhí)行到這里,說明要么當(dāng)前隊(duì)列為null,要么存在多個(gè)線程競(jìng)爭失敗都去將自己設(shè)置為tail結(jié)點(diǎn),
//那么就會(huì)有線程在上面(2-2)的CAS設(shè)置中失敗,就會(huì)到這里調(diào)用enq方法
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
//(4)還是先獲取當(dāng)前隊(duì)列的tail結(jié)點(diǎn)
Node t = tail;
//(5)如果tail為null,表示當(dāng)前同步隊(duì)列為null,就必須初始化這個(gè)同步隊(duì)列的head和tail(建
//立一個(gè)哨兵結(jié)點(diǎn))
if (t == null) {
//(5-1)初始情況下,多個(gè)線程競(jìng)爭失敗,在檢查的時(shí)候都發(fā)現(xiàn)沒有哨兵結(jié)點(diǎn),所以需要CAS的
//設(shè)置哨兵結(jié)點(diǎn)
if (compareAndSetHead(new Node()))
tail = head;
}
//(6)tail不為null
else {
//(6-1)直接將當(dāng)前結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)設(shè)置為tail結(jié)點(diǎn)
node.prev = t;
//(6-2)前驅(qū)結(jié)點(diǎn)設(shè)置完畢之后,還需要以CAS的方式將自己設(shè)置為tail結(jié)點(diǎn),如果設(shè)置失敗,
//就會(huì)重新進(jìn)入循環(huán)判斷一遍
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//在這樣一個(gè)循環(huán)中嘗試tryAcquire同步狀態(tài)
for (;;) {
//獲取前驅(qū)結(jié)點(diǎn)
final Node p = node.predecessor();
//(1)如果前驅(qū)結(jié)點(diǎn)是頭節(jié)點(diǎn),就嘗試取獲取同步狀態(tài),這里的tryAcquire方法相當(dāng)于還是調(diào)
//用NofairSync的tryAcquire方法,在上面已經(jīng)說過
if (p == head && tryAcquire(arg)) {
//如果前驅(qū)結(jié)點(diǎn)是頭節(jié)點(diǎn)并且tryAcquire返回true,那么就重新設(shè)置頭節(jié)點(diǎn)為node
setHead(node);
p.next = null; //將原來的頭節(jié)點(diǎn)的next設(shè)置為null,交由GC去回收它
failed = false;
return interrupted;
}
//(2)如果不是頭節(jié)點(diǎn),或者雖然前驅(qū)結(jié)點(diǎn)是頭節(jié)點(diǎn)但是嘗試獲取同步狀態(tài)失敗就會(huì)將node結(jié)點(diǎn)
//的waitStatus設(shè)置為-1(SIGNAL),并且park自己,等待前驅(qū)結(jié)點(diǎn)的喚醒。至于喚醒的細(xì)節(jié)
//下面會(huì)說到
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//(1)獲取前驅(qū)結(jié)點(diǎn)的waitStatus
int ws = pred.waitStatus;
//(2)如果前驅(qū)結(jié)點(diǎn)的waitStatus為SINGNAL,就直接返回true
if (ws == Node.SIGNAL)
//前驅(qū)結(jié)點(diǎn)的狀態(tài)為SIGNAL,那么該結(jié)點(diǎn)就能夠安全的調(diào)用park方法阻塞自己了。
return true;
if (ws > 0) {
//(3)這里就是將所有的前驅(qū)結(jié)點(diǎn)狀態(tài)為CANCELLED的都移除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//CAS操作將這個(gè)前驅(qū)節(jié)點(diǎn)設(shè)置成SIGHNAL。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
模擬一下,
- 假設(shè)線程A已經(jīng)持有鎖,并將state修改成1;
- 線程B通過lock方法進(jìn)入到addWaiter,此時(shí)還沒有維護(hù)隊(duì)列,tail=nill;
- 進(jìn)入enq方法的自旋中;
- 第一次循環(huán) t = tail = null,進(jìn)入if條件,head = tail = new Node();
- 第二次循環(huán) t = tail = new Node(),進(jìn)入else,將node(線程B)的上一個(gè)節(jié)點(diǎn)設(shè)置為t,將node設(shè)置為新的tail,將t的下一個(gè)節(jié)點(diǎn)設(shè)置為node,退出循環(huán);
- 進(jìn)入到acquireQueued方法,還是自旋,先獲取node的上一個(gè)結(jié)點(diǎn);
- 第一次循環(huán),判斷上一個(gè)節(jié)點(diǎn)是head并且當(dāng)前線程B可以獲取到state;
- 第7步通過,將head設(shè)置為node,返回false;
- 顯然第7步不通過,進(jìn)入下面的條件,先進(jìn)入shouldParkAfterFailedAcquire方法;
- 判斷上一個(gè)節(jié)點(diǎn)(pred)的waitStatus不是 -1,進(jìn)入else,將上一個(gè)節(jié)點(diǎn)(pred)的waitStatus設(shè)置為 -1,返回false;
- 第二次循環(huán),進(jìn)入shouldParkAfterFailedAcquire方法,由于上一次循環(huán)修改waitStatus為-1,直接返回true;
- 再看parkAndCheckInterrupt方法,將線程B pack阻塞;
- 自旋并沒有結(jié)束,只是被掛起了,這個(gè)自旋在后面還有用。
這時(shí),隊(duì)列是這樣的

再來一個(gè)線程C,變成這樣

4.2 再來看unlock方法
public void unlock() {
sync.release(1); //這里ReentrantLock的unlock方法調(diào)用了AQS的release方法
}
public final boolean release(int arg) {
//這里調(diào)用了子類的tryRelease方法,即ReentrantLock的內(nèi)部類Sync的tryRelease方法
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先我們能看到,重入鎖在獲取鎖和釋放鎖的時(shí)候?qū)tate的操作參數(shù)都是1,這也就是為什么lock了幾次就要unlock幾次的原因。
protected final boolean tryRelease(int releases) {
//(1)獲取當(dāng)前的state,然后減1,得到要更新的state
int c = getState() - releases;
//(2)判斷當(dāng)前調(diào)用的線程是不是持有鎖的線程,如果不是拋出IllegalMonitorStateException
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//(3)判斷更新后的state是不是0
if (c == 0) {
free = true;
//(3-1)將當(dāng)前鎖持者設(shè)為null
setExclusiveOwnerThread(null);
}
//(4)設(shè)置當(dāng)前state=c=getState()-releases
setState(c);
//(5)只有state==0,才會(huì)返回true
return free;
}
private void unparkSuccessor(Node node) {
//(1)獲得node的waitStatus
int ws = node.waitStatus;
//(2)判斷waitStatus是否小于0
if (ws < 0)
//(2-1)如果waitStatus小于0需要將其以CAS的方式設(shè)置為0
compareAndSetWaitStatus(node, ws, 0);
//(2)獲得s的后繼結(jié)點(diǎn),這里即head的后繼結(jié)點(diǎn)
Node s = node.next;
//(3)判斷后繼結(jié)點(diǎn)是否已經(jīng)被移除,或者其waitStatus==CANCELLED
if (s == null || s.waitStatus > 0) {
//(3-1)如果s!=null,但是其waitStatus=CANCELLED需要將其設(shè)置為null
s = null;
//(3-2)會(huì)從尾部結(jié)點(diǎn)開始尋找,找到離head最近的不為null并且node.waitStatus的結(jié)點(diǎn)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//(4)node.next!=null或者找到的一個(gè)離head最近的結(jié)點(diǎn)不為null
if (s != null)
//(4-1)喚醒這個(gè)結(jié)點(diǎn)中的線程
LockSupport.unpark(s.thread);
}
我們還是分析一下代碼
- 進(jìn)入tryRelease方法,將state的值 -1 ,如果state = 0 就返回true,否則返回 false;
- 進(jìn)入unparkSuccessor方法,先將head的waitStatus修改成0,在判斷head的后續(xù)節(jié)點(diǎn)的waitStatus是不是 < 0 ;
- 小于 0 就喚醒,否則一直向下查找直到出現(xiàn)waitStatus小于 0 的節(jié)點(diǎn),并且將它喚醒;
后面程序是怎么執(zhí)行的呢?
- 回憶一下上面的lock邏輯,這時(shí)候線程B被喚醒了,繼續(xù)自旋;
- 線程B獲取state成功,就會(huì)從acquireQueued方法中退出,最后執(zhí)行自己鎖住的代碼塊中的程序。