了解LockSupport?
LockSupport 定義了一組的公共靜態(tài)方法,這些方法提供了最基本的線程阻 塞和喚醒功能,而 LockSupport 也成為構(gòu)建同步組件的基礎(chǔ)工具。
LockSupport 定義了一組以 park 開頭的方法用來阻塞當(dāng)前線程,以及 unpark(Threadthread)方法來喚醒一個被阻塞的線程。LockSupport 增加了 park(Objectblocker)、parkNanos(Objectblocker,longnanos)和 parkUntil(Object blocker,longdeadline)3 個方法,用于實現(xiàn)阻塞當(dāng)前線程的功能,其中參數(shù) blocker 是用來標(biāo)識當(dāng)前線程在等待的對象(以下稱為阻塞對象),該對象主要用于問題 排查和系統(tǒng)監(jiān)控。
CLH隊列鎖?
CLH 隊列鎖即 Craig,Landin,andHagersten(CLH)locks。?
CLH 隊列鎖也是一種基于鏈表的可擴展、高性能、公平的自旋鎖,申請線程 僅僅在本地變量上自旋,它不斷輪詢前驅(qū)的狀態(tài),假設(shè)發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束 自旋。?
當(dāng)一個線程需要獲取鎖時:
1. 創(chuàng)建一個的QNode,將其中的locked設(shè)置為true表示需要獲取鎖, myPred 表示對其前驅(qū)結(jié)點的引用

2. 線程 A 對 tail 域調(diào)用 getAndSet 方法,使自己成為隊列的尾部,同時獲取 一個指向其前驅(qū)結(jié)點的引用 myPred

線程 B 需要獲得鎖,同樣的流程再來一遍

3.線程就在前驅(qū)結(jié)點的 locked 字段上旋轉(zhuǎn),直到前驅(qū)結(jié)點釋放鎖(前驅(qū)節(jié)點 的鎖值 locked==false)
4.當(dāng)一個線程需要釋放鎖時,將當(dāng)前結(jié)點的 locked 域設(shè)置為 false,同時回收前驅(qū)結(jié)點

如上圖所示,前驅(qū)結(jié)點釋放鎖,線程 A 的 myPred 所指向的前驅(qū)結(jié)點的 locked 字段變?yōu)?false,線程 A 就可以獲取到鎖。
CLH 隊列鎖的優(yōu)點是空間復(fù)雜度低(如果有 n 個線程,L 個鎖,每個線程每 次只獲取一個鎖,那么需要的存儲空間是 O(L+n),n 個線程有 n 個 myNode, L 個鎖有 L 個 tail)。CLH 隊列鎖常用在 SMP 體系結(jié)構(gòu)下。
Java 中的 AQS 是 CLH 隊列鎖的一種變體實現(xiàn)。
擴展知識點
SMP(SymmetricMulti-Processor) 。即 對 稱 多 處 理 器 結(jié) 構(gòu) ,指 server中 多 個 CPU對 稱 工 作 ,每 一 個 CPU訪 問 內(nèi) 存 地 址 所 需 時 間 同 樣 。 其 主 要 特 征 是 共 享 , 包 括 對 CPU , 內(nèi) 存 , I/O等 進(jìn)行 共 享 。 SMP的 長 處 是 可 以 保 證 內(nèi) 存 一 致 性 。 缺 點 是 這 些 共 享 的 資 源 非 常 可 能 成 為 性 能 瓶、頸 。 隨 著 CPU數(shù) 量 的 添 加 , 每 一 個 CPU都 要 訪 問 同 樣 的 內(nèi) 存 資 源 , 可 能 導(dǎo) 致 內(nèi) 存 訪 問 沖 突 ,可 能 會 導(dǎo) 致 CPU資 源 的 浪 費 。 經(jīng) 常 使 用 的 PC機 就 屬 于 這 樣 的 。

非 一 致 存 儲 訪 問 , 將 CPU分 為 CPU模 塊 , 每 個 CPU模 塊 由 多 個 CPU組 成 , 并 且 具 有 獨立 的 本 地 內(nèi) 存 、 I/O槽 口 等 , 模 塊 之 間 可 以 通 過 互 聯(lián) 模 塊 相 互 訪 問 , 訪 問 本 地 內(nèi) 存 ( 本 CPU 模 塊 的 內(nèi) 存 ) 的 速 度 將 遠(yuǎn) 遠(yuǎn) 高 于 訪 問 遠(yuǎn) 地 內(nèi) 存 ( 其 他 CPU 模 塊 的 內(nèi) 存 ) 的 速 度 , 這 也 是 非 一 致 存 儲 訪 問 的 由 來 。 NUMA 較 好 地 解 決 SMP 的 擴 展 問 題 , 當(dāng) CPU 數(shù) 量 增 加 時 , 因 為 訪 問 遠(yuǎn) 地 內(nèi) 存 的 延 時 遠(yuǎn) 遠(yuǎn) 超 過 本 地 內(nèi) 存 , 系 統(tǒng) 性 能 無 法 線 性 增 加。

CLH唯 一 的 缺 點 是 在 NUMA系 統(tǒng) 結(jié) 構(gòu) 下 性 能 很 差 , 但 是 在 SMP系 統(tǒng) 結(jié) 構(gòu) 下 該法還是非常有效的 。解決 NUMA系 統(tǒng) 結(jié) 構(gòu) 的 思 路 是 MCS隊 列 鎖 。
AbstractQueuedSynchronizer
學(xué)習(xí)AQS的必要性?
隊列同步器 AbstractQueuedSynchronizer(以下簡稱同步器或 AQS),是用 來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,它使用了一個 int 成員變量表示同步狀 態(tài),通過內(nèi)置的 FIFO 隊列來完成資源獲取線程的排隊工作。并發(fā)包的大師(Doug Lea)期望它能夠成為實現(xiàn)大部分同步需求的基礎(chǔ)。
AQS使用方式和其中的設(shè)計模式
AQS 的主要使用方式是繼承,子類通過繼承 AQS 并實現(xiàn)它的抽象方法來管 理同步狀態(tài),在 AQS 里由一個 int 型的 state 來代表這個狀態(tài),在抽象方法的實 現(xiàn)過程中免不了要對同步狀態(tài)進(jìn)行更改,這時就需要使用同步器提供的 3 個方法 (getState()、setState(intnewState)和 compareAndSetState(intexpect,intupdate)) 來進(jìn)行操作,因為它們能夠保證狀態(tài)的改變是安全的。

在實現(xiàn)上,子類推薦被定義為自定義同步組件的靜態(tài)內(nèi)部類,AQS 自身沒有 實現(xiàn)任何同步接口,它僅僅是定義了若干同步狀態(tài)獲取和釋放的方法來供自定義 同步組件使用,同步器既可以支持獨占式地獲取同步狀態(tài),也可以支持共享式地 獲取同步狀態(tài),這樣就可以方便實現(xiàn)不同類型的同步組件(ReentrantLock、 ReentrantReadWriteLock 和 CountDownLatch 等)。
同步器是實現(xiàn)鎖(也可以是任意同步組件)的關(guān)鍵,在鎖的實現(xiàn)中聚合同步器??梢赃@樣理解二者之間的關(guān)系:
鎖是面向使用者的,它定義了使用者與鎖交互的接口(比如可以允許兩個線程并行訪問),隱藏了實現(xiàn)細(xì)節(jié);
同步器面向的是鎖的實現(xiàn)者,它簡化了鎖的實現(xiàn)方式,屏蔽了同步狀態(tài)管理、 線程的排隊、等待與喚醒等底層操作。鎖和同步器很好地隔離了使用者和實現(xiàn)者 所需關(guān)注的領(lǐng)域。
實現(xiàn)者需要繼承同步器并重寫指定的方法,隨后將同步器組合在自定義同步 組件的實現(xiàn)中,并調(diào)用同步器提供的模板方法,而這些模板方法將會調(diào)用使用者 重寫的方法。
模板方法模式
同步器的設(shè)計基于模板方法模式。模板方法模式的意圖是,定義一個操作中 的算法的骨架,而將一些步驟的實現(xiàn)延遲到子類中。模板方法使得子類可以不改變一個算法的結(jié)構(gòu)即可重定義該算法的某些特定步驟。我們最常見的就是 Spring 框架里的各種 Template。
實際例子?
我們開了個蛋糕店,蛋糕店不能只賣一種蛋糕呀,于是我們決定先賣奶油蛋 糕,芝士蛋糕和慕斯蛋糕。三種蛋糕在制作方式上一樣,都包括造型,烘焙和涂 抹蛋糕上的東西。所以可以定義一個抽象蛋糕模型

然后就可以批量生產(chǎn)三種蛋糕




這樣一來,不但可以批量生產(chǎn)三種蛋糕,而且如果日后有擴展,只需要繼承 抽象蛋糕方法就可以了,十分方便,我們天天生意做得越來越賺錢。突然有一天, 我們發(fā)現(xiàn)市面有一種最簡單的小蛋糕銷量很好,這種蛋糕就是簡單烘烤成型就可以賣,并不需要涂抹什么食材,由于制作簡單銷售量大,這個品種也很賺錢,于是我們也想要生產(chǎn)這種蛋糕。但是我們發(fā)現(xiàn)了一個問題,抽象蛋糕是定義了抽象的涂抹方法的,也就是說擴展的這種蛋糕是必須要實現(xiàn)涂抹方法,這就很雞兒蛋疼了。怎么辦?我們可以將原來的模板修改為帶鉤子的模板。

做小蛋糕的時候通過 flag 來控制是否涂抹,其余已有的蛋糕制作不需要任何 修改可以照常進(jìn)行。

AQS中的方法
模板方法
實現(xiàn)自定義同步組件時,將會調(diào)用同步器提供的模板方法

這些模板方法同步器提供的模板方法基本上分為 3 類:獨占式獲取與釋放同 步狀態(tài)、共享式獲取與釋放、同步狀態(tài)和查詢同步隊列中的等待線程情況。
可重寫的方法


訪問或修改同步狀態(tài)的方法
重寫同步器指定的方法時,需要使用同步器提供的如下 3 個方法來訪問或修 改同步狀態(tài)。?
?getState():獲取當(dāng)前同步狀態(tài)。?
?setState(intnewState):設(shè)置當(dāng)前同步狀態(tài)。?
?compareAndSetState(intexpect,intupdate):使用 CAS 設(shè)置當(dāng)前狀態(tài),該方 法能夠保證狀態(tài)設(shè)置的原子性。
實現(xiàn)一個自己的獨占鎖
/**
*不可重入鎖
*類說明:實現(xiàn)我們自己獨占鎖,不可重入
*/
public class SelfLockimplements Lock {
// 靜態(tài)內(nèi)部類,自定義同步器
? ? private static class Syncextends AbstractQueuedSynchronizer {
/*判斷處于占用狀態(tài)*/
? ? ? ? @Override
? ? ? ? protected boolean isHeldExclusively() {
return getState()==1;
}
/*獲得鎖*/
? ? ? ? @Override
? ? ? ? protected boolean tryAcquire(int arg) {
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/*釋放鎖*/
? ? ? ? @Override
? ? ? ? protected boolean tryRelease(int arg) {
if(getState()==0){
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
//compareAndSetState(1,0);
? ? ? ? ? ? return true;
}
// 返回一個Condition,每個condition都包含了一個condition隊列
? ? ? ? Condition newCondition() {
return new ConditionObject();
}
}
// 僅需要將操作代理到Sync上即可
? ? private final Syncsync =new Sync();
@Override
? ? public void lock() {
System.out.println(Thread.currentThread().getName()+" ready get lock");
sync.acquire(1);
System.out.println(Thread.currentThread().getName()+" already got lock");
}
@Override
? ? public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
? ? public void unlock() {
System.out.println(Thread.currentThread().getName()+" ready release lock");
sync.release(1);
System.out.println(Thread.currentThread().getName()+" already released lock");
}
@Override
? ? public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
@Override
? ? public void lockInterruptibly()throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
? ? public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
調(diào)用我們自定義的鎖
**
*類說明:對自定義的鎖測試
*/
public class TestMyLock {
public void test() {
final Lock lock =new SelfLock();
class Workerextends Thread {
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName());
try {
sleep(1000);
}finally {
lock.unlock();
}
}
}
// 啟動4個子線程
? ? ? ? for (int i =0; i <4; i++) {
Worker w =new Worker();
//w.setDaemon(true);
? ? ? ? ? ? w.start();
}
// 主線程每隔1秒換行
? ? ? ? for (int i =0; i <10; i++) {
SleepTools.second(1);
//System.out.println();
? ? ? ? }
}
public static void main(String[] args) {
TestMyLock testMyLock =new TestMyLock();
testMyLock.test();
}
}
這把鎖當(dāng)我們遞歸調(diào)用就會發(fā)現(xiàn),該鎖會將自己鎖死,原因是該鎖并不具備重復(fù)性調(diào)用
深入源碼
AQS中 的 數(shù) 據(jù) 結(jié) 構(gòu) 節(jié) 點 和 同 步 隊 列
節(jié)點Node
既然說 Java 中的 AQS 是 CLH 隊列鎖的一種變體實現(xiàn),毫無疑問,作為隊列來 說,必然要有一個節(jié)點的數(shù)據(jù)結(jié)構(gòu)來保存我們前面所說的各種域,比如前驅(qū)節(jié)點, 節(jié)點的狀態(tài)等,這個數(shù)據(jù)結(jié)構(gòu)就是 AQS 中的內(nèi)部類 Node。作為這個數(shù)據(jù)結(jié)構(gòu)應(yīng) 該關(guān)心些什么信息?
1、線程信息,肯定要知道我是哪個線程;?
2、隊列中線程狀態(tài),既然知道是哪一個線程,肯定還要知道線程當(dāng)前處在 什么狀態(tài),是已經(jīng)取消了“獲鎖”請求,還是在“”等待中”,或者說“即將得 到鎖”?
3、前驅(qū)和后繼線程,因為是一個等待隊列,那么也就需要知道當(dāng)前線程前 面的是哪個線程,當(dāng)前線程后面的是哪個線程(因為當(dāng)前線程釋放鎖以后,理當(dāng) 立馬通知后繼線程去獲取鎖)。?
所以這個 Node 類是這么設(shè)計的:

其中包括了:?
線程的 2 種等待模式:?
SHARED:表示線程以共享的模式等待鎖(如 ReadLock)
?EXCLUSIVE:表示線程以互斥的模式等待鎖(如 ReetrantLock),互斥就是一 把鎖只能由一個線程持有,不能同時存在多個線程使用同一個鎖?
線程在隊列中的狀態(tài)枚舉:?
CANCELLED:值為 1,表示線程的獲鎖請求已經(jīng)“取消” SIGNAL:值為-1,表示該線程一切都準(zhǔn)備好了,就等待鎖空閑出來給我?
CONDITION:值為-2,表示線程等待某一個條件(Condition)被滿足?
PROPAGATE:值為-3,當(dāng)線程處在“SHARED”模式時,該字段才會被使用 上 初始化 Node 對象時,默認(rèn)為 0
?成員變量:?
waitStatus:該 int 變量表示線程在隊列中的狀態(tài),其值就是上述提到的 CANCELLED、SIGNAL、CONDITION、PROPAGATE?
prev:該變量類型為 Node 對象,表示該節(jié)點的前一個 Node 節(jié)點(前驅(qū))
next:該變量類型為 Node 對象,表示該節(jié)點的后一個 Node 節(jié)點(后繼)?
thread:該變量類型為 Thread 對象,表示該節(jié)點的代表的線程?
nextWaiter:該變量類型為 Node 對象,表示等待 condition 條件的 Node 節(jié) 點
當(dāng)前線程獲取同步狀態(tài)失敗時,同步器會將當(dāng)前線程以及等待狀態(tài)等信息構(gòu) 造成為一個節(jié)點(Node)并將其加入同步隊列,同時會阻塞當(dāng)前線程,當(dāng)同步 狀態(tài)釋放時,會把首節(jié)點中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。同步隊列 中的節(jié)點(Node)用來保存獲取同步狀態(tài)失敗的線程引用、等待狀態(tài)以及前驅(qū) 和后繼節(jié)點。?
head和tail?
AQS 還擁有首節(jié)點(head)和尾節(jié)點(tail)兩個引用,一個指向隊列頭節(jié) 點,而另一個指向隊列尾節(jié)點。
注 意 : 因 為 首 節(jié) 點 head是 不 保 存 線 程 信 息 的 節(jié) 點 , 僅 僅 是 因 為 數(shù) 據(jù) 結(jié) 構(gòu) 設(shè) 計 上 的 需 要 , 在 數(shù) 據(jù) 結(jié) 構(gòu) 上 , 這 種 做 法 往 往 叫 做 “ 空 頭 節(jié) 點 鏈 表 ” 。 對 應(yīng) 的 就 有 “ 非 空 頭 結(jié) 點 鏈 表 ”
節(jié)點在同步隊列中的增加和移出
節(jié)點加入到同步隊列
當(dāng)一個線程成功地獲取了同步狀態(tài)(或者鎖),其他線程將無法獲取到同步 狀態(tài),也就是獲取同步狀態(tài)失敗,AQS 會將這個線程以及等待狀態(tài)等信息構(gòu)造成 為一個節(jié)點(Node)并將其加入同步隊列的尾部。而這個加入隊列的過程必須要保證線程安全,因此同步器提供了一個基于 CAS 的設(shè)置尾節(jié)點的方法: compareAndSetTail(Nodeexpect,Nodeupdate),它需要傳遞當(dāng)前線程“認(rèn)為”的尾 節(jié)點和當(dāng)前節(jié)點,只有設(shè)置成功后,當(dāng)前節(jié)點才正式與之前的尾節(jié)點建立關(guān)聯(lián)。

首節(jié)點的變化
首節(jié)點是獲取同步狀態(tài)成功的節(jié)點,首節(jié)點的線程在釋放同步狀態(tài)時,將會 喚醒后繼節(jié)點,而后繼節(jié)點將會在獲取同步狀態(tài)成功時將自己設(shè)置為首節(jié)點。設(shè) 置首節(jié)點是通過獲取同步狀態(tài)成功的線程來完成的,由于只有一個線程能夠成功 獲取到同步狀態(tài),因此設(shè)置頭節(jié)點的方法并不需要使用 CAS 來保證,它只需要將 首節(jié)點設(shè)置成為原首節(jié)點的后繼節(jié)點并斷開原首節(jié)點的 next 引用即可。
獨占式同步狀態(tài)獲取與釋放

獲?。?br>
通過調(diào)用同步器的 acquire(intarg)方法可以獲取同步狀態(tài),主要完成了同步 狀態(tài)獲取、節(jié)點構(gòu)造、加入同步隊列以及在同步隊列中自旋等待的相關(guān)工作,其 主要邏輯是:

首先調(diào)用自定義同步器實現(xiàn)的 tryAcquire(intarg)方法,該方法需要保證線程 安全的獲取同步狀態(tài)。
如果同步狀態(tài)獲取失?。╰ryAcquire 返回 false),則構(gòu)造同步節(jié)點(獨占式 Node.EXCLUSIVE,同一時刻只能有一個線程成功獲取同步狀態(tài))并通過 addWaiter(Nodenode)方法將該節(jié)點加入到同步隊列的尾部,?
最后調(diào)用 acquireQueued(Nodenode,intarg)方法,使得該節(jié)點以“死循環(huán)” 的方式獲取同步狀態(tài)。如果獲取不到則阻塞節(jié)點中的線程,而被阻塞線程的喚醒 主要依靠前驅(qū)節(jié)點的出隊或阻塞線程被中斷來實現(xiàn)。
addWaiter(Nodenode)方法中

將當(dāng)前線程包裝成 Node 后,隊列不為空的情況下,先嘗試把當(dāng)前節(jié)點加入 隊列并成為尾節(jié)點,如果不成功或者隊列為空進(jìn)入 enq(finalNodenode)方法。

在 enq(finalNodenode)方法中,同步器通過“死循環(huán)”來保證節(jié)點的正確添 加,這個死循環(huán)中,做了兩件事,第一件,如果隊列為空,初始化隊列,new 出 一個空節(jié)點,并讓首節(jié)點(head)和尾節(jié)點(tail)兩個引用都指向這個空節(jié)點; 第二件事,把當(dāng)前節(jié)點加入隊列。
在“死循環(huán)”中只有通過 CAS 將節(jié)點設(shè)置成為尾節(jié)點之后,當(dāng)前線程才能從 該方法返回,否則,當(dāng)前線程不斷地嘗試設(shè)置。
節(jié)點進(jìn)入同步隊列之后,觀察 acquireQueued(Nodenode,intarg)方法

其實就是一個自旋的過程,每個節(jié)點(或者說每個線程)都在自省地觀察, 當(dāng)條件滿足,獲取到了同步狀態(tài),就可以從這個自旋過程中退出,否則依舊留在 這個自旋過程中(并會阻塞節(jié)點的線程)。
在 acquireQueued(finalNodenode,intarg)方法中,當(dāng)前線程在“死循環(huán)”中 嘗試獲取同步狀態(tài),而只有前驅(qū)節(jié)點是頭節(jié)點才能夠嘗試獲取同步狀態(tài),這是為 什么?原因有兩個。
第一,頭節(jié)點是成功獲取到同步狀態(tài)的節(jié)點,而頭節(jié)點的線程釋放了同步狀 態(tài)之后,將會喚醒其后繼節(jié)點,后繼節(jié)點的線程被喚醒后需要檢查自己的前驅(qū)節(jié) 點是否是頭節(jié)點。
第二,維護(hù)同步隊列的 FIFO 原則。 當(dāng)前線程獲取到同步狀態(tài)后,讓首節(jié)點(head)這個引用指向自己所在節(jié)點。 當(dāng)同步狀態(tài)獲取成功后,當(dāng)前線程就從 acquire 方法返回了。如果同步器實現(xiàn)的 是鎖,那就代表當(dāng)前線程獲得了鎖。
釋放:
當(dāng)前線程獲取同步狀態(tài)并執(zhí)行了相應(yīng)邏輯之后,就需要釋放同步狀態(tài),使得 后續(xù)節(jié)點能夠繼續(xù)獲取同步狀態(tài)。通過調(diào)用同步器的 release(intarg)方法可以釋 放同步狀態(tài),該方法在釋放了同步狀態(tài)之后,會喚醒其后繼節(jié)點(進(jìn)而使后繼節(jié) 點重新嘗試獲取同步狀態(tài))。

該方法執(zhí)行時,會喚醒首節(jié)點(head)所指向節(jié)點的后繼節(jié)點線程, unparkSuccessor(Nodenode)方法使用 LockSupport 來喚醒處于等待狀態(tài)的線程。 而在 unparkSuccessor 中,

這段代碼的意思,一般情況下,被喚醒的是 head 指向節(jié)點的后繼節(jié)點線程, 如果這個后繼節(jié)點處于被 cancel 狀態(tài),(我推測開發(fā)者的思路這樣的:后繼節(jié)點 處于被 cancel 狀態(tài),意味著當(dāng)鎖競爭激烈時,隊列的第一個節(jié)點等了很久(一直 被還未加入隊列的節(jié)點搶走鎖),包括后續(xù)的節(jié)點 cancel 的幾率都比較大,所以) 先從尾開始遍歷,找到最前面且沒有被 cancel 的節(jié)點。
總結(jié)
在獲取同步狀態(tài)時,同步器維護(hù)一個同步隊列,獲取狀態(tài)失敗的線程都會被 加入到隊列中并在隊列中進(jìn)行自旋;移出隊列(或停止自旋)的條件是前驅(qū)節(jié)點 為頭節(jié)點且成功獲取了同步狀態(tài)。在釋放同步狀態(tài)時,同步器調(diào)用 tryRelease(int arg)方法釋放同步狀態(tài),然后喚醒 head 指向節(jié)點的后繼節(jié)點。
共享式同步狀態(tài)獲取與釋放
共享式獲取與獨占式獲取最主要的區(qū)別在于同一時刻能否有多個線程同時 獲取到同步狀態(tài)。以讀寫為例,如果一個程序在進(jìn)行讀操作,那么這一時刻寫操 作均被阻塞,而讀操作能夠同時進(jìn)行。寫操作要求對資源的獨占式訪問,而讀操 作可以是共享式訪問。?
在 acquireShared(intarg)方法中,同步器調(diào)用 tryAcquireShared(intarg)方法嘗 試獲取同步狀態(tài),tryAcquireShared(intarg)方法返回值為 int 類型,當(dāng)返回值大于 等于 0 時,表示能夠獲取到同步狀態(tài)。因此,在共享式獲取的自旋過程中,成功 獲取到同步狀態(tài)并退出自旋的條件就是 tryAcquireShared(intarg)方法返回值大于 等于 0??梢钥吹?,在 doAcquireShared(intarg)方法的自旋過程中,如果當(dāng)前節(jié)點的前驅(qū)為頭節(jié)點時,嘗試獲取同步狀態(tài),如果返回值大于等于 0,表示該次獲 取同步狀態(tài)成功并從自旋過程中退出。?
該方法在釋放同步狀態(tài)之后,將會喚醒后續(xù)處于等待狀態(tài)的節(jié)點。對于能夠 支持多個線程同時訪問的并發(fā)組件(比如 Semaphore),它和獨占式主要區(qū)別在 于 tryReleaseShared(intarg)方法必須確保同步狀態(tài)(或者資源數(shù))線程安全釋放, 一般是通過循環(huán)和 CAS 來保證的,因為釋放同步狀態(tài)的操作會同時來自多個線程。
共享式的同步工具類?
設(shè)計一個同步工具:該工具在同一時刻,只允許至多 3 個線程同時訪問,超 過 3 個線程的訪問將被阻塞。?
首先,確定訪問模式。TrinityLock 能夠在同一時刻支持多個線程的訪問,這 顯然是共享式訪問,因此,需要使用同步器提供的 acquireShared(intargs)方法等 和 Shared 相關(guān)的方法,這就要求 TwinsLock 必須重寫 tryAcquireShared(intargs) 方法和 tryReleaseShared(intargs)方法,這樣才能保證同步器的共享式同步狀態(tài)的 獲取與釋放方法得以執(zhí)行。?
其次,定義資源數(shù)。TrinityLock 在同一時刻允許至多三個線程的同時訪問, 表明同步資源數(shù)為 3,這樣可以設(shè)置初始狀態(tài) status 為 3,當(dāng)一個線程進(jìn)行獲取, status 減 1,該線程釋放,則 status 加 1,狀態(tài)的合法范圍為 0、1 和 2,3,其中 0 表示當(dāng)前已經(jīng)有 3 個線程獲取了同步資源,此時再有其他線程對同步狀態(tài)進(jìn)行獲 取,該線程只能被阻塞。在同步狀態(tài)變更時,需要使用 compareAndSet(int expect,intupdate)方法做原子性保障。?
最后,組合自定義同步器。前面的章節(jié)提到,自定義同步組件通過組合自定 義同步器來完成同步功能,一般情況下自定義同步器會被定義為自定義同步組件 的內(nèi)部類。
了解Condition的實現(xiàn)
Condition的數(shù)據(jù)結(jié)構(gòu)?
等待隊列是一個 FIFO 的隊列,在隊列中的每個節(jié)點都包含了一個線程引用, 該線程就是在Condition對象上等待的線程,如果一個線程調(diào)用了Condition.await() 方法,那么該線程將會釋放鎖、構(gòu)造成節(jié)點加入等待隊列并進(jìn)入等待狀態(tài)。事實 上,節(jié)點的定義復(fù)用了同步器中節(jié)點的定義,也就是說,同步隊列和等待隊列中 節(jié)點類型都是同步器的靜態(tài)內(nèi)部類。

一個 Condition 包含一個等待隊列,Condition 擁有首節(jié)點(firstWaiter)和尾 節(jié)點(lastWaiter)。當(dāng)前線程調(diào)Condition.await()方法,將會以當(dāng)前線程構(gòu)造 節(jié)點,并將節(jié)點從尾部加入等待隊列。Condition 擁有首尾節(jié)點的引用,而新增節(jié)點只需要將原有的尾節(jié)點 nextWaiter 指向它,并且更新尾節(jié)點即可。上述節(jié)點 引用更新的過程并沒有使用 CAS 保證,原因在于調(diào)用 await()方法的線程必定是 獲取了鎖的線程,也就是說該過程是由鎖來保證線程安全的。?
Lock(更確切地說是同步器)擁有一個同步隊列和多個等待隊列。
調(diào)用 Condition 的 await()方法(或者以 await 開頭的方法),會使當(dāng)前線程 進(jìn)入等待隊列并釋放鎖,同時線程狀態(tài)變?yōu)榈却隣顟B(tài)。當(dāng)從 await()方法返回時, 當(dāng)前線程一定獲取了 Condition 相關(guān)聯(lián)的鎖。?
如果從隊列(同步隊列和等待隊列)的角度看 await()方法,當(dāng)調(diào)用 await() 方法時,相當(dāng)于同步隊列的首節(jié)點(獲取了鎖的節(jié)點)移動到 Condition 的等待 隊列中。調(diào)用該方法的線程成功獲取了鎖的線程,也就是同步隊列中的首節(jié)點, 該方法會將當(dāng)前線程構(gòu)造成節(jié)點并加入等待隊列中,然后釋放同步狀態(tài),喚醒同 步隊列中的后繼節(jié)點,然后當(dāng)前線程會進(jìn)入等待狀態(tài)。當(dāng)?shù)却犃兄械墓?jié)點被喚 醒,則喚醒節(jié)點的線程開始嘗試獲取同步狀態(tài)。如果不是通過其他線程調(diào)用 Condition.signal()方法喚醒,而是對等待線程進(jìn)行中斷,則會拋出 InterruptedException。

如圖所示,同步隊列的首節(jié)點并不會直接加入等待隊列,而是通過 addConditionWaiter()方法把當(dāng)前線程構(gòu)造成一個新的節(jié)點并將其加入等待隊列 中。

調(diào)用 Condition 的 signal()方法,將會喚醒在等待隊列中等待時間最長的節(jié)點 (首節(jié)點),在喚醒節(jié)點之前,會將節(jié)點移到同步隊列中。?
調(diào)用該方法的前置條件是當(dāng)前線程必須獲取了鎖,可以看到 signal()方法進(jìn) 行了 isHeldExclusively()檢查,也就是當(dāng)前線程必須是獲取了鎖的線程。接著獲取 等待隊列的首節(jié)點,將其移動到同步隊列并使用 LockSupport 喚醒節(jié)點中的線程。
通過調(diào)用同步器的 enq(Nodenode)方法,等待隊列中的頭節(jié)點線程安全地移 動到同步隊列。當(dāng)節(jié)點移動到同步隊列后,當(dāng)前線程再使用 LockSupport 喚醒該 節(jié)點的線程。?
被喚醒后的線程,將從 await()方法中的 while 循環(huán)中退出 (isOnSyncQueue(Nodenode)方法返回 true,節(jié)點已經(jīng)在同步隊列中),進(jìn)而調(diào) 用同步器的 acquireQueued()方法加入到獲取同步狀態(tài)的競爭中。
成功獲取同步狀態(tài)(或者說鎖)之后,被喚醒的線程將從先前調(diào)用的 await() 方法返回,此時該線程已經(jīng)成功地獲取了鎖。?
Condition 的 signalAll()方法,相當(dāng)于對等待隊列中的每個節(jié)點均執(zhí)行一次 signal()方法,效果就是將等待隊列中所有節(jié)點全部移動到同步隊列中,并喚醒每 個節(jié)點的線程。?
回頭看Lock的實現(xiàn)
ReentrantLock的實現(xiàn)
鎖的可重入
重進(jìn)入是指任意線程在獲取到鎖之后能夠再次獲取該鎖而不會被鎖所阻塞, 該特性的實現(xiàn)需要解決以下兩個問題。
1)線程再次獲取鎖。鎖需要去識別獲取鎖的線程是否為當(dāng)前占據(jù)鎖的線程, 如果是,則再次成功獲取
2)鎖的最終釋放。線程重復(fù) n 次獲取了鎖,隨后在第 n 次釋放該鎖后,其 他線程能夠獲取到該鎖。鎖的最終釋放要求鎖對于獲取進(jìn)行計數(shù)自增,計數(shù)表示 當(dāng)前鎖被重復(fù)獲取的次數(shù),而鎖被釋放時,計數(shù)自減,當(dāng)計數(shù)等于 0 時表示鎖已 經(jīng)成功釋放。?
nonfairTryAcquire 方法增加了再次獲取同步狀態(tài)的處理邏輯:通過判斷當(dāng)前 線程是否為獲取鎖的線程來決定獲取操作是否成功,如果是獲取鎖的線程再次請 求,則將同步狀態(tài)值進(jìn)行增加并返回 true,表示獲取同步狀態(tài)成功。同步狀態(tài)表 示鎖被一個線程重復(fù)獲取的次數(shù)。?
如果該鎖被獲取了 n 次,那么前(n-1)次 tryRelease(intreleases)方法必須返回 false,而只有同步狀態(tài)完全釋放了,才能返回 true??梢钥吹?,該方法將同步狀 態(tài)是否為 0 作為最終釋放的條件,當(dāng)同步狀態(tài)為 0 時,將占有線程設(shè)置為 null, 并返回 true,表示釋放成功。
公平和非公平鎖?
ReentrantLock 的構(gòu)造函數(shù)中,默認(rèn)的無參構(gòu)造函數(shù)將會把 Sync 對象創(chuàng)建為 NonfairSync 對象,這是一個“非公平鎖”;而另一個構(gòu)造函數(shù) ReentrantLock(booleanfair)傳入?yún)?shù)為 true 時將會把 Sync 對象創(chuàng)建為“公平鎖” FairSync。?
nonfairTryAcquire(intacquires)方法,對于非公平鎖,只要 CAS 設(shè)置同步狀態(tài) 成功,則表示當(dāng)前線程獲取了鎖,而公平鎖則不同。tryAcquire 方法,該方法與 nonfairTryAcquire(intacquires)比較,唯一不同的位置為判斷條件多了 hasQueuedPredecessors()方法,即加入了同步隊列中當(dāng)前節(jié)點是否有前驅(qū)節(jié)點的 判斷,如果該方法返回 true,則表示有線程比當(dāng)前線程更早地請求獲取鎖,因此 需要等待前驅(qū)線程獲取并釋放鎖之后才能繼續(xù)獲取鎖。
改造我們的獨占鎖為可重入?
/**
* 類說明:實現(xiàn)我們自己獨占鎖,可重入
*/
public class ReenterSelfLock implements Lock {
? ? // 靜態(tài)內(nèi)部類,自定義同步器
? ? private static class Sync extends AbstractQueuedSynchronizer {
? ? ? ? // 是否處于占用狀態(tài)
? ? ? ? @Override
? ? ? ? protected boolean isHeldExclusively() {
? ? ? ? ? ? return getState() > 0;
? ? ? ? }
? ? ? ? // 當(dāng)狀態(tài)為0的時候獲取鎖
? ? ? ? @Override
? ? ? ? public boolean tryAcquire(int acquires) {
? ? ? ? ? ? if (compareAndSetState(0, 1)) {
? ? ? ? ? ? ? ? setExclusiveOwnerThread(Thread.currentThread());
? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? } else if (getExclusiveOwnerThread() == Thread.currentThread()) {
? ? ? ? ? ? ? ? setState(getState() + 1);
? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? }
? ? ? ? ? ? return false;
? ? ? ? }
? ? ? ? // 釋放鎖,將狀態(tài)設(shè)置為0
? ? ? ? @Override
? ? ? ? protected boolean tryRelease(int releases) {
? ? ? ? ? ? if (getExclusiveOwnerThread() != Thread.currentThread()) {
? ? ? ? ? ? ? ? throw new IllegalMonitorStateException();
? ? ? ? ? ? }
? ? ? ? ? ? if (getState() == 0) {
? ? ? ? ? ? ? ? throw new IllegalMonitorStateException();
? ? ? ? ? ? }
? ? ? ? ? ? setState(getState() - 1);
? ? ? ? ? ? if (getState() == 0) {
? ? ? ? ? ? ? ? setExclusiveOwnerThread(null);
? ? ? ? ? ? }
? ? ? ? ? ? return true;
? ? ? ? }
? ? ? ? // 返回一個Condition,每個condition都包含了一個condition隊列
? ? ? ? Condition newCondition() {
? ? ? ? ? ? return new ConditionObject();
? ? ? ? }
? ? }
? ? // 僅需要將操作代理到Sync上即可
? ? private final Sync sync = new Sync();
? ? @Override
? ? public void lock() {
? ? ? ? System.out.println(Thread.currentThread().getName() + " ready get lock");
? ? ? ? sync.acquire(1);
? ? ? ? System.out.println(Thread.currentThread().getName() + " already got lock");
? ? }
? ? @Override
? ? public boolean tryLock() {
? ? ? ? return sync.tryAcquire(1);
? ? }
? ? @Override
? ? public void unlock() {
? ? ? ? System.out.println(Thread.currentThread().getName() + " ready release lock");
? ? ? ? sync.release(1);
? ? ? ? System.out.println(Thread.currentThread().getName() + " already released lock");
? ? }
? ? @Override
? ? public Condition newCondition() {
? ? ? ? return sync.newCondition();
? ? }
? ? public boolean isLocked() {
? ? ? ? return sync.isHeldExclusively();
? ? }
? ? public boolean hasQueuedThreads() {
? ? ? ? return sync.hasQueuedThreads();
? ? }
? ? @Override
? ? public void lockInterruptibly() throws InterruptedException {
? ? ? ? sync.acquireInterruptibly(1);
? ? }
? ? @Override
? ? public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
? ? ? ? return sync.tryAcquireNanos(1, unit.toNanos(timeout));
? ? }
}
實戰(zhàn),實現(xiàn)一個奇葩點的三元共享同步工具類
/**
*類說明:共享同步工具類
*/
public class TrinityLock? implements Lock {
? ? //為n表示允許n個線程同時獲得鎖
? ? private final Sync sync = new Sync(4);
? ? private static final class Sync extends AbstractQueuedSynchronizer {
? ? ? ? //private static final long serialVersionUID = -7889272986162341211L;
? ? ? ? Sync(int count) {
? ? ? ? ? ? if (count <= 0) {
? ? ? ? ? ? ? ? throw new IllegalArgumentException("count must large than zero.");
? ? ? ? ? ? }
? ? ? ? ? ? setState(count);
? ? ? ? }
? ? ? ? /**
? ? ? ? *
? ? ? ? * @param reduceCount? 扣減個數(shù)
? ? ? ? * @return? 返回小于0,表示當(dāng)前線程獲得同步狀態(tài)失敗
? ? ? ? * 大于0,表示當(dāng)前線程獲得同步狀態(tài)成功
? ? ? ? */
? ? ? ? @Override
? ? ? ? public int tryAcquireShared(int reduceCount) {
? ? ? ? ? ? for (;;) {
? ? ? ? ? ? ? ? int current = getState();
? ? ? ? ? ? ? ? int newCount = current - reduceCount;
? ? ? ? ? ? ? ? if (newCount < 0 || compareAndSetState(current, newCount)) {
? ? ? ? ? ? ? ? ? ? return newCount;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? /**
? ? ? ? *
? ? ? ? * @param returnCount 歸還個數(shù)
? ? ? ? * @return
? ? ? ? */
? ? ? ? @Override
? ? ? ? public boolean tryReleaseShared(int returnCount) {
? ? ? ? ? ? for (;;) {
? ? ? ? ? ? ? ? int current = getState();
? ? ? ? ? ? ? ? int newCount = current + returnCount;
? ? ? ? ? ? ? ? if (compareAndSetState(current, newCount)) {
? ? ? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? final ConditionObject newCondition() {
? ? ? ? ? ? return new ConditionObject();
? ? ? ? }
? ? }
? ? @Override
? ? public void lock() {
? ? ? ? sync.acquireShared(1);
? ? }
? ? @Override
? ? public void unlock() {
? ? ? ? sync.releaseShared(1);
? ? }
? ? @Override
? ? public void lockInterruptibly() throws InterruptedException {
? ? ? ? sync.acquireSharedInterruptibly(1);
? ? }
? ? @Override
? ? public boolean tryLock() {
? ? ? ? return sync.tryAcquireShared(1) >= 0;
? ? }
? ? @Override
? ? public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
? ? ? ? return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
? ? }
? ? @Override
? ? public Condition newCondition() {
? ? ? ? return sync.newCondition();
? ? }
}
ReentrantReadWriteLock的實現(xiàn)
讀寫狀態(tài)的設(shè)計
讀寫鎖同樣依賴自定義同步器來實現(xiàn)同步功能,而讀寫狀態(tài)就是其同步器的 同步狀態(tài)。?
回想 ReentrantLock 中自定義同步器的實現(xiàn),同步狀態(tài)表示鎖被一個線程重 復(fù)獲取的次數(shù),而讀寫鎖的自定義同步器需要在同步狀態(tài)(一個整型變量)上維 護(hù)多個讀線程和一個寫線程的狀態(tài),使得該狀態(tài)的設(shè)計成為讀寫鎖實現(xiàn)的關(guān)鍵。
如果在一個整型變量上維護(hù)多種狀態(tài),就一定需要“按位切割使用”這個變 量,讀寫鎖將變量切分成了兩個部分,高 16 位表示讀,低 16 位表示寫,讀寫鎖 是如何迅速確定讀和寫各自的狀態(tài)呢??
答案是通過位運算。假設(shè)當(dāng)前同步狀態(tài)值為 S,寫狀態(tài)等于 S&0x0000FFFF (將高 16 位全部抹去),讀狀態(tài)等于 S>>>16(無符號補 0 右移 16 位)。當(dāng)寫 狀態(tài)增加1時,等于S+1,當(dāng)讀狀態(tài)增加1時,等于S+(1<<16),也就是S+0x00010000。 根據(jù)狀態(tài)的劃分能得出一個推論:S 不等于 0 時,當(dāng)寫狀態(tài)(S&0x0000FFFF)等 于 0 時,則讀狀態(tài)(S>>>16)大于 0,即讀鎖已被獲取。
寫鎖的獲取與釋放
寫鎖是一個支持重進(jìn)入的排它鎖。如果當(dāng)前線程已經(jīng)獲取了寫鎖,則增加寫 狀態(tài)。如果當(dāng)前線程在獲取寫鎖時,讀鎖已經(jīng)被獲取(讀狀態(tài)不為 0)或者該線 程不是已經(jīng)獲取寫鎖的線程,則當(dāng)前線程進(jìn)入等待狀態(tài)。
該方法除了重入條件(當(dāng)前線程為獲取了寫鎖的線程)之外,增加了一個讀 鎖是否存在的判斷。如果存在讀鎖,則寫鎖不能被獲取,原因在于:讀寫鎖要確 保寫鎖的操作對讀鎖可見,如果允許讀鎖在已被獲取的情況下對寫鎖的獲取,那
么正在運行的其他讀線程就無法感知到當(dāng)前寫線程的操作。因此,只有等待其他 讀線程都釋放了讀鎖,寫鎖才能被當(dāng)前線程獲取,而寫鎖一旦被獲取,則其他讀 寫線程的后續(xù)訪問均被阻塞。
寫鎖的釋放與 ReentrantLock 的釋放過程基本類似,每次釋放均減少寫狀態(tài), 當(dāng)寫狀態(tài)為 0 時表示寫鎖已被釋放,從而等待的讀寫線程能夠繼續(xù)訪問讀寫鎖, 同時前次寫線程的修改對后續(xù)讀寫線程可見。
讀鎖的獲取與釋放
讀鎖是一個支持重進(jìn)入的共享鎖,它能夠被多個線程同時獲取,在沒有其他 寫線程訪問(或者寫狀態(tài)為 0)時,讀鎖總會被成功地獲取,而所做的也只是(線 程安全的)增加讀狀態(tài)。如果當(dāng)前線程已經(jīng)獲取了讀鎖,則增加讀狀態(tài)。
如果當(dāng)前線程在獲取讀鎖時,寫鎖已被其他線程獲取,則進(jìn)入等待狀態(tài)。讀 狀態(tài)是所有線程獲取讀鎖次數(shù)的總和,而每個線程各自獲取讀鎖的次數(shù)只能選擇 保存在 ThreadLocal 中,由線程自身維護(hù)。在 tryAcquireShared(intunused)方法中, 如果其他線程已經(jīng)獲取了寫鎖,則當(dāng)前線程獲取讀鎖失敗,進(jìn)入等待狀態(tài)。如果 當(dāng)前線程獲取了寫鎖或者寫鎖未被獲取,則當(dāng)前線程(線程安全,依靠 CAS 保證) 增加讀狀態(tài),成功獲取讀鎖。讀鎖的每次釋放(線程安全的,可能有多個讀線程 同時釋放讀鎖)均減少讀狀態(tài)。
鎖的升降級
鎖降級指的是寫鎖降級成為讀鎖。如果當(dāng)前線程擁有寫鎖,然后將其釋放, 最后再獲取讀鎖,這種分段完成的過程不能稱之為鎖降級。
鎖降級是指把持?。ó?dāng)前擁有的)寫鎖,再獲取到讀鎖,隨后釋放(先前擁 有的)寫鎖的過程。
RentrantReadWriteLock 不支持鎖升級(把持讀鎖、獲取寫鎖,最后釋放讀鎖 的過程)。目的是保證數(shù)據(jù)可見性,如果讀鎖已被多個線程獲取,其中任意線程 成功獲取了寫鎖并更新了數(shù)據(jù),則其更新對其他獲取到讀鎖的線程是不可見的。