1 AQS原理
全稱是 AbstractQueuedSynchronizer ,阻塞式鎖和相關同步工具的框架
1.1 特點
用state屬性來表示資源的狀態(tài)(分獨占模式和共享模式),子類需要定義如何維護這個狀態(tài),控制如何獲取鎖和釋放鎖
getState 獲取state狀態(tài)
setState 設置state 狀態(tài)
compareAndState cas機制設置state狀態(tài)
獨占模式是只有一個線程能夠訪問資源,而共享模式可以允許多個線程訪問資源
提供了基于FIFO的等待隊列,類似于Monitor 的EntryList
條件變量來實現(xiàn)等待、喚醒機制,支持多個條件變量,類似于Monitor 的WaitSet
子類主要實現(xiàn)這樣的一些方法(默認拋出UnsupportedOperationException)
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
獲取鎖的姿勢
//如果獲取鎖失敗
if (! tryAcquire(arg)) {
//入隊,可以選擇阻塞當前線程 park unpark 機制
}
釋放鎖的姿勢
//如果釋放鎖成功
if (tryRelease(arg)) {
//讓阻塞線程恢復運行
}
2. 自定義鎖
自定義鎖實現(xiàn) Lock 接口,定義一個內部類繼承 AbstractQueuedSynchronizer 類
自定義鎖是一個不可重入鎖
public class AqsTest {
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(()->{
lock.lock();
try{
System.out.println("lock" + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}finally {
System.out.println("unlock" + Thread.currentThread().getName());
lock.unlock();
}
}).start();
new Thread(()->{
lock.lock();
try{
System.out.println("lock" + Thread.currentThread().getName());
}finally {
System.out.println("unlock" + Thread.currentThread().getName());
lock.unlock();
}
}).start();
}
}
/**
* 自定義不可重入鎖
*/
class MyLock implements Lock {
class MySync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0,1)) {
//表示加上鎖了,設置owner為當前線程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
//注意兩個方法順序
setExclusiveOwnerThread(null);
//state 使用volatile修飾,禁止指令重排
setState(0); //表示解鎖
return true;
}
/**
* 是否持有獨占鎖
* @return
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
public Condition newCondition(){
return new ConditionObject();
}
}
private MySync sync = new MySync();
/**
* 不成功進入等待隊列等待
*/
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
* 嘗試加鎖,只會嘗試一次
* @return
*/
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
/**
* 創(chuàng)建條件變量
* @return
*/
@Override
public Condition newCondition() {
return null;
}
}
測試結果:
lockThread-0
unlockThread-0
lockThread-1
unlockThread-1
3. ReentrantLock 原理
ReentrantLock UML類圖

3.1 非公平鎖實現(xiàn)原理
3.1.1 加解鎖流程
加鎖流程
沒有競爭時

第一個競爭出現(xiàn)時
Thread-1 執(zhí)行了
- CAS 嘗試將state 由 0 改為 1 ,結果失敗
- 進入tryAcquire邏輯,這時 state 已經是 1 ,結果仍然失敗
- 接下來進入addWaiter邏輯,構造Node隊列
- 途中黃色三角表示該Node的waitStatus 狀態(tài),其中 0 為默認正常狀態(tài)
- Node的創(chuàng)建時懶惰的
- 其中第一個Node成為 Dummy(啞元) 或哨兵,用來占位,并不關聯(lián)線程

- 當前線程進入acquireQueued 邏輯
- acquireQueued 會在一個死循環(huán)中不斷嘗試獲得鎖,失敗后進入park 阻塞
- 如果自己是緊鄰著 head (排第二位),那么再次tryAcquire 嘗試獲取鎖,當然這時state 仍為1,失敗
- 進入 shouldParkAfterFailedAcquire(p, node) 邏輯,將前驅node ,即head 的waitStatus 改為 -1 ,這次返回false (-1 表示有責任喚醒它的后繼節(jié)點)

- shouldParkAfterFailedAcquire(p, node) 執(zhí)行完畢后回到acquireQueued, 再次tryAcquire 嘗試獲得鎖,當然這時的 state 仍為1,失敗
- 當再次進入 shouldParkAfterFailedAcquire 時,這時因為其前驅node 的waitStatus 已經是 -1,這次返回true
- 進入 parkAndCheckInterrupt, Thread-1 park(灰色表示)

- 再次有多個線程經歷上述過程競爭失敗,變成這個樣子

解鎖流程
1.Thread-0 釋放鎖,進入tryRelease 流程,如果成功
- 設置 execlusiveOwnerThread 為null
- state = 0

2.當前隊列不為null , 并且 head 的waitStatus = -1,進入unparkSuccessor 流程
- 找到隊列中離head 最近的一個Node (沒取消的),unpark 恢復其運行,本例中為Thread-1
3.回到Thread-1 的acquireQueue流程

如果加鎖成功(沒有競爭),會設置
- exclusiveOwnerThread 為Thread-1 ,state = 1;
- head 指向剛剛Thread-1 所在的Node,該Node清空Thread
- 原本的head因為從鏈表斷開,而可被垃圾回收
4.如果這時候有其他線程來競爭(非公平的體現(xiàn)),例如這時Thread-4 來了
如果不巧被Thread-4占了先機
- Thread-4 被設置為exclusiveOwnerThread , state = 1
- Thread-1 再次進入acquireQueued 流程,獲取鎖失敗,重新進入park 阻塞
源碼
先從構造器看,默認為非公平鎖實現(xiàn)
public ReentrantLock() {
sync = new NonfairSync();
}
NonfairSync 繼承自 Sync , Sync 繼承自AQS
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //獲取前驅節(jié)點
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
//創(chuàng)建一個節(jié)點,waitStatus = null
Node node = new Node(Thread.currentThread(), mode);
//如果當前隊列已經有節(jié)點了,直接將該節(jié)點加入到AQS隊列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果當前隊列為空,要先初始化,new Node()
enq(node);
return node;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
3.1.2 可重入原理
具體分析見源碼
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//當前線程沒有獲得鎖,嘗試修改狀態(tài)0->1
if (compareAndSetState(0, acquires)) {
//修改狀態(tài)成功,表示獲得鎖。設置owner為當前線程
setExclusiveOwnerThread(current);
return true;
}
}
//如果已經獲得了鎖,線程還是當前線程,表示發(fā)生了鎖重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//設置當前的state, 同一個線程鎖重入state++
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//c==0時重新設置 owner 返回true,否則只更新state值,返回false
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//是否獨占鎖
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
...
}
3.1.3 可打斷原理
不可打斷模式
在此模式下,即使它被打斷,仍會駐留在AQS 隊列中,等獲得鎖后方能繼續(xù)運行(是繼續(xù)運行,只是打斷標記被設置為true)
private final boolean parkAndCheckInterrupt() {
// 如果打斷標記已經是true,則park 會失效
LockSupport.park(this);
// interrupted 會清除打斷標記
return Thread.interrupted();
}
//未獲得到鎖時,調用該方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
//還是需要獲得鎖后,才能返回打斷狀態(tài)
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果是因為interrupt 被喚醒,返回打斷狀態(tài)為true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果打斷狀態(tài)為true
selfInterrupt();
}
static void selfInterrupt() {
// 重新產生一次中斷
Thread.currentThread().interrupt();
}
可打斷模式
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
// 如果沒有獲得鎖,進入
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//在park過程中如果被interrupt 會進入此
// 這時拋出異常,而不會再次進入 for(;;)
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
4. 公平鎖實現(xiàn)原理
非公平鎖
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果還沒有獲得鎖
if (c == 0) {
//嘗試用CAS獲得鎖,這里體現(xiàn)非公平性:不去檢查AQS隊列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已經獲得了鎖,線程還是當前線程,表示發(fā)生了鎖重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//獲取失敗,回到調用處
return false;
}
公平鎖實現(xiàn)
// 與非公平鎖的主要區(qū)別在于tryAcquire 方法的實現(xiàn)
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先檢查AQS隊列中是否有前驅節(jié)點
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
5. 條件變量的實現(xiàn)原理
每個條件變量其實就對應著一個等待隊列,實現(xiàn)類是ConditionObject
await 流程
開始 Thread-0持有鎖,調用await,進入 ConditionObject 的 addConditionWaiter 流程創(chuàng)建新的Node狀態(tài)為-2(Node.CONDITION),關聯(lián) Thread-0,加入等待隊列的尾部

public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//調用ConditionObject.await() 先進入addConditionWaiter()流程
Node node = addConditionWaiter();
//接下來進入AQS的fullyRelease 流程,釋放同步器上的鎖
//為什么這里要調用 fullyRelease(node)? 有可能一個線程重入好幾次
int savedState = fullyRelease(node);
int interruptMode = 0;
//進入一個循環(huán),判斷
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//給當前線程創(chuàng)建一個新節(jié)點,連接到鏈表上
//waitStatus == Node.CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
當前線程添加到 ConditionObject隊列中后,當前線程要釋放鎖
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
//釋放鎖成功,返回當前狀態(tài);若釋放失敗,設置當前節(jié)點的狀態(tài)為Node.CANCELLED
//release(savedState) 還會喚醒等待隊列中的下一個節(jié)點
//unpark AQS隊列的下一個節(jié)點,競爭鎖,假設沒有其他線程競爭,Thread-1競爭鎖成功
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
signal 流程
假設Thread-1 要來喚醒 Thread-0

進入ConditionObject 的doSignal()流程
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
*/
public final void signal() {
//如果當前線程沒有持有鎖,調用signal()拋異常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//取出等待隊列的頭結點
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
執(zhí)行 transferForSignal 流程,將該Node 加入 AQS隊列尾部,將Thread-0 的waitStatus 改為 0;將Thread-3的 waitStatus 改為 -1
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
//將這個節(jié)點從 condition隊列轉移到 sync隊列,如果成功則返回true
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//首先嘗試cas操作將waitStatus 改為0,若失敗,直接返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//拼接將該節(jié)點到 aqs 隊尾。調用 enq()返回該節(jié)點的前置節(jié)點
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true
}
Node節(jié)點定義
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}