1 volatile 的工作原理

眾所周知,在如今的計(jì)算機(jī)時(shí)代,CPU的運(yùn)算處理速度與內(nèi)存讀寫速度的差異非常巨大,為了解決這種差異充分利用CPU的使用效率,于是在CPU 和內(nèi)存之間增加 CPU Cache(高速緩存),將運(yùn)算需要的數(shù)據(jù)先復(fù)制到CPU Cache中,在進(jìn)行運(yùn)算時(shí)CPU不再和內(nèi)存打交道,而是直接對CPU Cache 進(jìn)行讀寫。除此之外還可以減少 CPU 與 I/O 設(shè)備爭搶訪存,由于 CPU 和 I/O 設(shè)備會競爭同一條內(nèi)存總線,有可能出現(xiàn) CPU 等待 I/O 設(shè)備訪存的情況。而如果 CPU 能直接從緩存中獲取數(shù)據(jù),就可以減少競爭,提高 CPU 的使用率。
1.1 緩存一致性問題
CPU Cache 會引起緩存一致性問題, 在分析緩存一致性問題時(shí),忽略 L1 / L2 / L3 的多級緩存結(jié)構(gòu),提出緩存一致性抽象模型:

在單核 CPU 中,只需要考慮 Cache 與內(nèi)存的一致性。但是在多核 CPU 中,由于每個(gè)核心都有一份獨(dú)占的 Cache,就會存在一個(gè)核心修改Cache 數(shù)據(jù)后,兩個(gè)核心 Cache 數(shù)據(jù)不一致的問題。因此CPU 的緩存一致性問題應(yīng)該從 2 個(gè)維度考慮:
Cache 與內(nèi)存的一致性問題: 在修改 Cache 數(shù)據(jù)后,如何同步回內(nèi)存?
-
多核心 Cache 的一致性問題: 在一個(gè)核心修改 Cache 數(shù)據(jù)后,如何同步給其他核心 Cache?
1.1.1 Cache 與內(nèi)存的一致性問題
1> 寫直達(dá)策略(Write-Through)
寫直達(dá)策略的讀取過程:
1、CPU 在訪問內(nèi)存地址時(shí),會先檢查該地址的數(shù)據(jù)是否已經(jīng)加載到 Cache 中;
2、如果數(shù)據(jù)在 Cache 中,則直接讀取 Cache 上的數(shù)據(jù)到 CPU 中;
-
3、如果數(shù)據(jù)不在 Cache 中:
如果 Cache 已裝滿或者 Cache Line 被占用,先執(zhí)行替換策略,騰出空閑位置;
訪問內(nèi)存地址,并將內(nèi)存地址所處的整個(gè)Cache LIne 寫入到映射的 Cache Line 中;
讀取 Cache Line上的數(shù)據(jù)到 CPU 中。
寫直達(dá)策略的寫入過程:
1、如果數(shù)據(jù)不在 Cache 中,則直接將數(shù)據(jù)寫入內(nèi)存;
2、如果數(shù)據(jù)已經(jīng)加載到 Cache 中,則不僅要將數(shù)據(jù)寫入 Cache,還要將數(shù)據(jù)寫入內(nèi)存。
寫直達(dá)的優(yōu)點(diǎn)和缺點(diǎn):
優(yōu)點(diǎn): 每次讀取操作就是純粹的讀取,不涉及對內(nèi)存的寫入操作,讀取速度更快;
-
缺點(diǎn): 每次寫入操作都需要同時(shí)寫入 Cache 和寫入內(nèi)存,在寫入操作上失去了 CPU 高速緩存的價(jià)值,需要花費(fèi)更多時(shí)間。
2> 寫回策略(Write Back)
既然寫直達(dá)策略在每次寫入操作都會寫內(nèi)存,那么有沒有什么辦法可以減少寫回內(nèi)存的次數(shù)呢?這就是寫回策略:
1、寫回策略會在每個(gè) Cache Line上增加一個(gè) “臟(Dirty)” 標(biāo)記位 ,當(dāng)一個(gè) Cache Line 被標(biāo)記為臟時(shí),說明它的數(shù)據(jù)與內(nèi)存數(shù)據(jù)是不一致的;
2、在寫入操作時(shí),我們只需要修改 Cache Line 并將其標(biāo)記為臟,而不需要寫入內(nèi)存;
-
3、那么,什么時(shí)候才將臟數(shù)據(jù)寫回內(nèi)存呢?—— 就發(fā)生在 Cache 塊被替換出去的時(shí)候:
在寫入操作中,如果目標(biāo)內(nèi)存塊不在 Cache 中,需要先將內(nèi)存塊數(shù)據(jù)讀取到 Cache 中。如果替換策略換出的舊 Cache Line 是臟的,就會觸發(fā)一次寫回內(nèi)存操作;
在讀取操作中,如果目標(biāo)內(nèi)存塊不在 Cache 中,且替換策略換出的舊 Cache 塊是臟的,就會觸發(fā)一次寫回內(nèi)存操作;
可以看到,寫回策略只有當(dāng)一個(gè) Cache 數(shù)據(jù)將被替換出去時(shí)判斷數(shù)據(jù)的狀態(tài),清(未修改過,數(shù)據(jù)與內(nèi)存一致) 的 Cache 塊不需要寫回內(nèi)存,臟 的 Cache 塊才需要寫回內(nèi)存。這個(gè)策略能夠減少寫回內(nèi)存的次數(shù),性能會比寫直達(dá)更高。
當(dāng)然,寫回策略在讀取的時(shí)候,有可能不是純粹的讀取了,因?yàn)檫€可能會觸發(fā)一次臟 Cache Line 的寫入。

通過寫直達(dá)或?qū)懟夭呗?,我們已?jīng)能夠解決 在修改 Cache 數(shù)據(jù)后,如何同步回內(nèi)存 的問題
1.1.2 多核心 Cache 的一致性問題
在單核 CPU 中,我們通過寫直達(dá)策略或?qū)懟夭呗员3至薈ache 與內(nèi)存的一致性。但是在多核 CPU 中,由于每個(gè)核心都有一份獨(dú)占的 Cache,就會存在一個(gè)核心修改數(shù)據(jù)后,兩個(gè)核心 Cache 不一致的問題。
舉個(gè)例子:
1、Core 1 和 Core 2 讀取了同一個(gè)內(nèi)存塊的數(shù)據(jù),在兩個(gè) Core 都緩存了一份內(nèi)存塊的副本。此時(shí)Cache 和內(nèi)存塊是一致的;
-
2、Core 1 執(zhí)行內(nèi)存寫入操作:
在寫直達(dá)策略中,新數(shù)據(jù)會直接寫回內(nèi)存,此時(shí),Cache 和內(nèi)存塊一致。但由于之前 Core 2 已經(jīng)讀過這塊數(shù)據(jù),所以 Core 2 緩存的數(shù)據(jù)還是舊的。此時(shí),Core 1 和 Core 2 不一致;
在寫回策略中,新數(shù)據(jù)會延遲寫回內(nèi)存,此時(shí) Cache 和內(nèi)存塊不一致。不管 Core 2 之前有沒有讀過這塊數(shù)據(jù),Core 2 的數(shù)據(jù)都是舊的。此時(shí),Core 1 和 Core 2 不一致。
-
3、由于 Core 2 無法感知到 Core 1 的寫入操作,如果繼續(xù)使用過時(shí)的數(shù)據(jù),就會出現(xiàn)邏輯問題。
可以看到:由于兩個(gè)核心的工作是獨(dú)立的,在一個(gè)核心上的修改行為不會被其它核心感知到,所以不管 CPU 使用寫直達(dá)策略還是寫回策略,都會出現(xiàn)緩存不一致問題。 所以,我們需要一種機(jī)制,將多個(gè)核心的工作聯(lián)合起來,共同保證多個(gè)核心下的 Cache 一致性,這就是緩存一致性機(jī)制。
1.1.3 寫傳播 & 事務(wù)串行化
緩存一致性機(jī)制必須解決的如下兩個(gè)問題:
-
寫傳播(Write Propagation): 一個(gè)CPU核心對Cache中的值進(jìn)行了修改,需要傳播其他CPU核心,也就是需要用到寫更新或者寫無效策略。
當(dāng)某個(gè) CPU 核心的Cache中執(zhí)行寫操作時(shí),其他 CPU 核心中對應(yīng)的Cache Line 的更新策略也有兩種
寫更新(Write Update):一個(gè)CPU核心對Cache中的值進(jìn)行修改時(shí),該 CPU 核心都必須先發(fā)起一次總線請求,通知其他 CPU 核心將它們的CPU Cache 值更新為剛寫入的值,所以寫更新會很占用總線帶寬。
寫無效(Write Invalidate) : 一個(gè)CPU核心對Cache中的值進(jìn)行修改時(shí),該 CPU 核心都必須先發(fā)起一次總線請求,通知其他 CPU 核心將它們的CPU Cache 設(shè)置為無效。MESI協(xié)議用的就是這個(gè)策略,也是絕大多數(shù) CPU 都會采用緩存一致性協(xié)議。這是因?yàn)槎啻螌懖僮髦恍枰l(fā)起一次總線事件即可,第一次寫已經(jīng)將其他緩存的值置為無效,之后的寫不必再更新狀態(tài),這樣可以有效地節(jié)省 CPU 核間總線帶寬。
事務(wù)串行化(Transaction Serialization): 各個(gè) CPU 核心所有寫入操作的順序,在所有 CPU 核心看起來是一致。
寫傳播解決了 感知 問題,如果一個(gè)核心修改了數(shù)據(jù),就需要同步給其它核心。但只做到同步還不夠,如果各個(gè)核心收到的同步信號順序不一致,那最終的同步結(jié)果也會不一致。
舉個(gè)例子:假如 CPU 有 4 個(gè)核心,Core 2 將共享數(shù)據(jù)修改為 1000,隨后 Core 1 將共享數(shù)據(jù)修改為 2000。在寫傳播下,“修改為 1000” 和 “修改為 2000” 兩個(gè)事務(wù)會同步到 Core 3 和 Core 4。但是如果沒有事務(wù)串行化,不同核心收到的事務(wù)順序可能是不同的,最終數(shù)據(jù)還是不一致。

1.1.4 總線嗅探 & 總線仲裁
寫傳播和事務(wù)串行化在 CPU 中是如何實(shí)現(xiàn)的呢?
寫傳播 - 總線嗅探(bus snooping): 本質(zhì)上就是進(jìn)行Cache讀寫操作時(shí)發(fā)送到總線請求,然后讓各個(gè)核心去嗅探這些請求,再根據(jù)本地的情況進(jìn)行響應(yīng);
事務(wù)串行化 - 總線仲裁: 總線的獨(dú)占性要求同一時(shí)刻最多只有一個(gè)模塊占用總線,天然地會將所有核心對內(nèi)存的讀寫操作串行化。如果多個(gè)核心同時(shí)發(fā)起總線事務(wù),此時(shí)總線仲裁單元會對競爭做出仲裁,未獲勝的事務(wù)只能等待獲勝的事務(wù)處理完成后才能執(zhí)行。
基于總線嗅探和總線仲裁,現(xiàn)代 CPU 逐漸形成了各種緩存一致性協(xié)議,例如 MESI 協(xié)議。
MESI 協(xié)議使用的是寫回策略和寫無效策略,MESI 協(xié)議通過Cache Line 的四種狀態(tài)非常有效地降低了 CPU 核間帶寬
1.1.5 MESI 協(xié)議

CPU Cache 是由很多個(gè) Cache Line 組成的,CPU Line 是 CPU 從內(nèi)存讀取數(shù)據(jù)的基本單位,Cache Line 大小通常是64字節(jié)。
CPU對Cache的請求:
PrRd: CPU核心請求讀一個(gè) Cache Line
PrWr: CPU核心請求寫一個(gè) Cache Line
總線對Cache的請求:
BusRd: 其他CPU核心請求讀一個(gè) Cache Line
BusRdX: 其他CPU核心請求寫一個(gè)該CPU核心不擁有的緩存塊
BusUpgr: 其他CPU核心請求寫一個(gè)該CPU核心擁有的緩存塊
Flush: 請求回寫整個(gè)Cache Line到內(nèi)存
FlushOpt: 整個(gè)Cache Line被發(fā)到總線以發(fā)送給另外一個(gè)CPU核心(緩存到緩存的復(fù)制)
| 初始狀態(tài) | 操作 | 響應(yīng) |
|---|---|---|
| Shared(S) | PrRd | - 無總線事務(wù)生成 - 狀態(tài)保持不變 - 讀操作為緩存命中 |
| Shared(S) | PrWr | - 發(fā)出總線事務(wù)BusUpgr信號 - 狀態(tài)轉(zhuǎn)換為(M)Modified - 其他緩存看到BusUpgr總線信號,標(biāo)記其副本為(I)Invalid. |
| Modified(M) | PrRd | - 無總線事務(wù)生成 - 狀態(tài)保持不變 - 讀操作為緩存命中 |
| Modified(M) | PrWr | - 無總線事務(wù)生成 - 狀態(tài)保持不變 - 寫操作為緩存命中 |
| Invalid(I) | PrRd | - 發(fā)出總線事務(wù)BusRd信號 - 其他CPU核心看到BusRd,檢查自己是否有有效的數(shù)據(jù)副本,通知發(fā)出請求的緩存 - 狀態(tài)轉(zhuǎn)換為(S)Shared, 如果其他緩存有有效的副本 - 狀態(tài)轉(zhuǎn)換為(E)Exclusive, 如果其他緩存都沒有有效的副本 - 如果其他緩存有有效的副本, 其中一個(gè)緩存發(fā)出數(shù)據(jù)(FlushOpt);否則從主存獲得數(shù)據(jù) |
| Invalid(I) | PrWr | - 發(fā)出總線事務(wù)BusRdX信號 - 狀態(tài)轉(zhuǎn)換為(M)Modified - 如果其他緩存有有效的副本, 其中一個(gè)緩存發(fā)出數(shù)據(jù)(FlushOpt);否則從主存獲得數(shù)據(jù) - 如果其他緩存有有效的副本, 見到BusRdX信號后無效其副本 - 向Cache Line中寫入修改后的值 |
| Exclusive(E) | PrRd | - 無總線事務(wù)生成 - 狀態(tài)保持不變 - 讀操作為緩存命中 |
| Exclusive(E) | PrWr | - 無總線事務(wù)生成 - 狀態(tài)轉(zhuǎn)換為(M)Modified - 向Cache Line中寫入修改后的值 |
緩存一致性協(xié)議定義了Cache Line的4個(gè)狀態(tài):獨(dú)占(exclusive)、共享(share)、修改(modified)、失效(invalid)。
M(Modified,修改): 表明 Cache Line 被修改過,與內(nèi)存中不一致,只有本地一個(gè)拷貝(專有),并且其它核心的同一個(gè) Cache Line 會失效;
E(Exclusive,獨(dú)占): 表明 Cache LIne 只有本地一個(gè)拷貝(專有);
S(Share,共享): 表明 Cache Line 不僅有本地一個(gè)拷貝并且其它核心也存在其拷貝;
I(Invalid,失效): 未從內(nèi)存加載數(shù)據(jù)或者已失效;
在 獨(dú)占 和 共享 狀態(tài)下,Cache Line的數(shù)據(jù)是干凈的,任何讀取操作可以直接使用該Cache Line的數(shù)據(jù);
在 失效 和 修改 狀態(tài)下,Cache Line 的數(shù)據(jù)是臟的,其數(shù)據(jù)和內(nèi)存中的可能不一致,在讀取或?qū)懭?失效 Cache Line 時(shí),需要先將其它核心 修改 的Cache Line寫回內(nèi)存,再從內(nèi)存讀??;
在 共享 和 失效 狀態(tài),核心沒有獲得 Cache Line 的獨(dú)占權(quán)(鎖)。在修改數(shù)據(jù)時(shí)不能直接修改,而是要先向總線發(fā)起 RFO(Request For Ownership)請求 ,將其它核心的 Cache 置為 失效,等到獲得回應(yīng) ACK 后才算獲得 Cache Line 的獨(dú)占權(quán)。
這個(gè)獨(dú)占權(quán)這有點(diǎn)類似于開發(fā)語言層面的鎖概念,在修改資源之前,需要先獲取資源的鎖;
在 修改 和 獨(dú)占 狀態(tài)下,核心已經(jīng)獲得了 Cache Line 的獨(dú)占權(quán)(鎖)。在修改數(shù)據(jù)時(shí)不需要向總線發(fā)送RFO 請求,能夠減輕總線的通信壓力。
MESI 協(xié)議有一個(gè)非常 nice 的在線體驗(yàn)網(wǎng)站,你可以對照文章內(nèi)容,在網(wǎng)站上操作指令區(qū),并觀察內(nèi)存和緩存的數(shù)據(jù)和狀態(tài)變化。網(wǎng)站地址:(VivioJS MESI)

1.1.6 寫緩沖區(qū) & 失效隊(duì)列
MESI 協(xié)議保證了 CPU Cache 的一致性,但完全地遵循協(xié)議會影響性能。 因此,現(xiàn)代的 CPU 會增加寫緩沖區(qū)和失效隊(duì)列將 MESI 協(xié)議的請求異步化,以提高效率:
- 寫緩沖區(qū)(Store Buffer)
由于在寫入操作之前,CPU Core 需要先發(fā)起 RFO 請求獲得獨(dú)占權(quán),在其它 CPU Core 回應(yīng) ACK 之前,當(dāng)前 CPU Core 只能空等待,這對 CPU 資源是一種浪費(fèi)。因此現(xiàn)代 CPU 會采用 寫緩沖區(qū) 機(jī)制,寫入的數(shù)據(jù)放到寫緩沖區(qū)后并發(fā)送 RFO 請求后,CPU 就可以去執(zhí)行其它任務(wù),等收到 ACK 后再將寫入數(shù)據(jù)寫到 Cache 上。
當(dāng)前CPU核心如果要讀Cache Line中的數(shù)據(jù),需要先掃描Store Buffer之后再讀取Cache Line(Store-Buffer Forwarding)。
- 失效隊(duì)列(Invalidation Queue)
Store Buffer 容量是有限的,當(dāng)Store Buffer滿了之后CPU核心還是要卡住等待ACK。所以其他核心在收到 RFO 請求時(shí),需要及時(shí)回應(yīng) ACK。如果核心很忙不能及時(shí)回復(fù),就會造成發(fā)送 RFO 請求的核心在等待 ACK。因此現(xiàn)代 CPU 會采用 失效隊(duì)列 機(jī)制,先把其它核心發(fā)過來的 RFO 請求放到失效隊(duì)列,然后直接返回 ACK,等當(dāng)前核心處理完任務(wù)后再去處理失效隊(duì)列中的失效請求。
因此核心可能并不知道在它Cache里的某個(gè)Cache Line是Invalid狀態(tài)的,因?yàn)槭ш?duì)列包含有收到但還沒有處理的Invalidation消息。CPU在讀取數(shù)據(jù)的時(shí)候,并不像Store buffer那樣讀取Invalidate queue。

1.2 內(nèi)存屏障
CPU 已經(jīng)實(shí)現(xiàn)了 MESI 協(xié)議,已經(jīng)在硬件層面實(shí)現(xiàn)了寫傳播和事務(wù)串行化,為什么 Java 語言層面還需要定義 volatile 關(guān)鍵字呢?豈不是多此一舉?
MESI 協(xié)議解決了緩存一致性,一致性有強(qiáng)弱之分:
強(qiáng)一致性: 保證在任意時(shí)刻任意副本上的同一份數(shù)據(jù)都是相同的,或者允許不同,但是每次使用前都要刷新確保數(shù)據(jù)一致,所以最終還是一致。
弱一致性: 不保證在任意時(shí)刻任意副本上的同一份數(shù)據(jù)都是相同的,也不要求使用前刷新,但是隨著時(shí)間的遷移,不同副本上的同一份數(shù)據(jù)總是向趨同的方向變化,最終還是趨向一致。
例如,MESI 協(xié)議就是強(qiáng)一致性的,但引入寫緩沖區(qū)或失效隊(duì)列后就變成弱一致性,隨著寫緩沖區(qū)和失效隊(duì)列被消費(fèi),各個(gè)核心 Cache 最終還是會趨向一致狀態(tài)。
引入寫緩沖區(qū)或失效隊(duì)列后會導(dǎo)致內(nèi)存一致性的問題(出現(xiàn)內(nèi)存重排 Memory Reordering),舉個(gè)例子:初始狀態(tài)變量 a 和變量 b 都是 0,現(xiàn)在 Core1 和 Core2 分別執(zhí)行這兩段指令,最終 x 和 y 的結(jié)果是什么?
Core1
a = 1; // A1
x = b; // A2
Core2
b = 2; // B1
y = a; // B2
寫緩存區(qū)造成內(nèi)存重排:

可以看到:從內(nèi)存的視角看,直到 Core1 執(zhí)行 A3 來刷新寫緩沖區(qū),寫操作 A1 才算真正執(zhí)行了。雖然 Core 的執(zhí)行順序是 A1 → A2 → B1 → B2,但內(nèi)存看到的順序卻是 A2 → B1 → B2 → A1,變量 a 寫入沒有同步給對變量 a 的讀取,即發(fā)生了內(nèi)存重排。內(nèi)存重排是硬件上無法解決的問題,這個(gè)時(shí)候就必須加入內(nèi)存屏障來解決這個(gè)問題。
內(nèi)存屏障 (Memory Barrier)分為寫屏障(Store Barrier)、讀屏障(Load Barrier)和全屏障(Full Barrier),寫屏障會阻塞等待Store Buffer中的數(shù)據(jù)同步刷到Cache后再執(zhí)行屏障后面的讀寫操作;讀屏障會阻塞Invalid Queue中的消息理完成后再執(zhí)行屏障后面的讀寫操作。
JVM 并不直接顯露內(nèi)存屏障。(Memory barriers are not directly exposed by the JVM. )反之,為了保證語言級的并發(fā)原語語義,它們被 JVM 插入到指令序列中。(Instead they are inserted into the instruction sequence by the JVM in order to uphold the semantics of language level concurrency primitives. )
還是以上面的例子說明下:
Core1
// volatile 類型
a = 1; // A1
// 插入寫內(nèi)存屏障
// 插入讀內(nèi)存屏障
x = b; // A2
Core2
// volatile 類型
b = 2; // B1
// 插入寫內(nèi)存屏障
// 插入讀內(nèi)存屏障
y = a; // B2
初始狀態(tài)變量 a 和變量 b 都是 0 并且設(shè)置為volatile類型,現(xiàn)在 Core1 和 Core2 分別執(zhí)行這兩段指令
由于在 A1和B1之后插入一個(gè)寫內(nèi)存屏障,所以在執(zhí)行A2和B2之前會將a=1和b=2刷到Cache
由于在 A2前插入一個(gè)讀內(nèi)存屏障,所以在執(zhí)行A2之前會將b=1同步到Core1的Cache
由于在 B2前插入一個(gè)讀內(nèi)存屏障,所以在執(zhí)行B2之前會將a=1同步到Core2的Cache
內(nèi)存屏障解決了內(nèi)存重排的問題,除此意外內(nèi)存屏障可以阻止屏障兩側(cè)的指令重排序(即編譯器重排)
需要注意的是,內(nèi)存屏障并不是Java源代碼中的一部分,它們是在編譯到機(jī)器指令時(shí)由Java內(nèi)存模型隱式插入的,寫Java代碼時(shí)是看不到的。
x86架構(gòu)CPU提供了比較強(qiáng)的緩存一致性支持,但有的ARM構(gòu)的CPU指供的緩存一致性支持就很弱,大部分Android手機(jī)采用的CPU是ARM架構(gòu),所以需要在正確的位置插入內(nèi)存屏障來保證一致性。
對于app開發(fā)者來說理解到這里就可以了,下面從源碼角度分析,有興趣的同學(xué)可以自己看看Hotspot虛擬機(jī)arm架構(gòu)下的模板解釋器中的putfield和getfield方法。
2 synchronized 的工作原理
首先舉個(gè)例子:
public class UnSafeTest {
@SneakyThrows
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(10);
CustomService customService = new CustomService();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 100; j++) {
customService.add();
}
countDownLatch.countDown();
}).start();
}
// 等待其他線程執(zhí)行完畢
countDownLatch.await();
System.out.println("num:" + customService.getNum());
}
static class CustomService {
private int num = 0;
public void add() {
num++;
}
public int getNum() {
return num;
}
}
}
上述代碼啟動了10個(gè)線程,每個(gè)線程使num累加100次,期望結(jié)果是1000,但打印通常小于1000,這是一個(gè)典型的線程安全問題。 我們在多線程環(huán)境下調(diào)用了CustomService.add(),因而導(dǎo)致線程不安全,那么為什么會有線程安全問題呢?我們來看關(guān)鍵代碼:
public void add() {
num++;
}
num++ 實(shí)際上并不是原子操作,這需要看其對應(yīng)的字節(jié)碼信息(可以使用javap -v xx.class進(jìn)行查看):
public void add();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=3, locals=1, args_size=1
0: aload_0
1: dup
2: getfield #2 // Field num:I
5: iconst_1
6: iadd
7: putfield #2 // Field num:I
10: return
可以發(fā)現(xiàn)num++實(shí)際上由3個(gè)指令組成getfield、iadd、putfield組成。
getfield:獲取變量值
iadd:執(zhí)行+1
putfield:設(shè)置變量值
既然num++實(shí)際上由3步組成,那么在多線程環(huán)境中就無法保證其執(zhí)行過程的原子性,通過下圖可以更加清晰的說明這一點(diǎn):

上面這種情況是原子性問題導(dǎo)致線程不安全,synchronized是一個(gè)同步鎖,在同一時(shí)刻被修飾的方法或代碼塊只有一個(gè)線程能執(zhí)行,以保證線程安全。很多人都稱之為重量級鎖(也叫做悲觀鎖),但是隨著JDK1.6對synchronized進(jìn)行了鎖升級的優(yōu)化后,在線程競爭不激烈的情況下性能還是不錯(cuò)的,接下來看看synchronized怎么解決線程安全問題:
public synchronized void add() {
num++;
}
或者
public void add() {
synchronized (CustomService.class) {
num++;
}
}
接下來看一下字節(jié)碼
public synchronized void add();
descriptor: ()V
flags: ACC_PUBLIC, ACC_SYNCHRONIZED
Code:
stack=3, locals=1, args_size=1
0: aload_0
1: dup
2: getield #2 // Field num:I
5: iconst_1
6: iadd
7: putfield #2 // Field num:I
10: return
或者
public void add();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=3, locals=3, args_size=1
0: ldc #3 // class com/cytmxk/trainproject/UnSafeTest$CustomService
2: dup
3: astore_1
4: monitorenter
5: aload_0
6: dup
7: getfield #2 // Field num:I
10: iconst_1
11: iadd
12: putfield #2 // Field num:I
15: aload_1
16: monitorexit
17: goto 25
20: astore_2
21: aload_1
22: monitorexit
23: aload_2
24: athrow
25: return
synchronized修飾方法時(shí),會在訪問標(biāo)識符(flags)中加入ACC_SYNCHRONIZED標(biāo)識
方法級同步是隱式執(zhí)行的。當(dāng)調(diào)用這些方法時(shí),如果發(fā)現(xiàn)會ACC_SYNCHRONIZED標(biāo)識,則會進(jìn)入一個(gè)monitor,執(zhí)行方法,然后退出monitor。這時(shí)如果其他線程來請求執(zhí)行方法,會因?yàn)闊o法獲得監(jiān)視器鎖而被阻斷住。
無論方法調(diào)用正常還是發(fā)生異常,都會自動退出monitor,也就是釋放鎖。
synchronized修飾代碼塊時(shí),會增加monitorenter和monitorexit指令
每個(gè)對象都與一個(gè)監(jiān)視器monitor關(guān)聯(lián),執(zhí)行monitorenter指令的線程嘗試獲取鎖對象關(guān)聯(lián)的監(jiān)視器monitor的所有權(quán),此時(shí)其他線程將阻塞等待,直到執(zhí)行monitorexit退出monitor時(shí),
其他線程才有機(jī)會來獲取監(jiān)視器monitor的所有權(quán)。
synchronized保證了有且僅有一個(gè)線程獲取執(zhí)行權(quán)(即保證了原子性)。
synchronized也是基于MESI協(xié)議和內(nèi)存屏障實(shí)現(xiàn)的緩存一致性
在monitorenter指令之后有一個(gè)Load內(nèi)存屏障將CPU Cache更新到最新值
在monitorexit指令之后會有一個(gè)Store內(nèi)存屏障將Store Buffer更新到CPU Cache
3 通過ReentrantLock解決線程安全問題
3.1 LockSupport 的工作原理
AQS使用LockSupport來控制線程的阻塞和喚醒,我們更加熟悉的阻塞喚醒操作是wait/notify方式,它是以O(shè)bject的角度來設(shè)計(jì),而LockSupport提供的park/unpark則是以線程的角度來設(shè)計(jì)。LockSupport主要提供兩類操作:
1 park 操作提供了4個(gè)方法
/**
* 許可證可用則消耗許可證并且調(diào)用立即返回;否則當(dāng)前線程會進(jìn)入WAITING或者TIMED_WAITING狀態(tài),直到以下三種情況之一發(fā)生:
* 1> 其他線程以當(dāng)前線程為目標(biāo)調(diào)用 unpark
* 2> 其他線程中斷當(dāng)前線程
* 3> 調(diào)用錯(cuò)誤地(沒有原因地)返回
*
* 此方法不報(bào)告導(dǎo)致方法返回的原因,調(diào)用方應(yīng)首先重新檢查導(dǎo)致線程停止的條件,調(diào)用方還可以在返回時(shí)確定線程的中斷狀態(tài)。
*/
public static void park() {
U.park(false, 0L);
}
/**
* 同上
*/
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, 0L);
setBlocker(t, null);
}
/**
* 許可證可用則消耗許可證并且調(diào)用立即返回;否則當(dāng)前線程會進(jìn)入WAITING或者TIMED_WAITING狀態(tài),直到以下三種情況之一發(fā)生:
* 1> 其他線程以當(dāng)前線程為目標(biāo)調(diào)用 unpark
* 2> 其他線程中斷當(dāng)前線程
* 3> 指定的等待時(shí)間已到
* 4> 調(diào)用錯(cuò)誤地(沒有原因地)返回
*
* 此方法不報(bào)告導(dǎo)致方法返回的原因,調(diào)用方應(yīng)首先重新檢查導(dǎo)致線程停止的條件,調(diào)用方還可以在返回時(shí)確定線程的中斷狀態(tài)或者調(diào)用花費(fèi)的時(shí)間。
*/
public static void parkNanos(Object blocker, long nanos) {
if (nanos > 0) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, nanos);
setBlocker(t, null);
}
}
/**
* 許可證可用則消耗許可證并且調(diào)用立即返回;否則當(dāng)前線程會進(jìn)入WAITING或者TIMED_WAITING狀態(tài),直到以下三種情況之一發(fā)生:
* 1> 其他線程以當(dāng)前線程為目標(biāo)調(diào)用 unpark
* 2> 其他線程中斷當(dāng)前線程
* 3> 到達(dá)指定的時(shí)間線
* 4> 調(diào)用錯(cuò)誤地(沒有原因地)返回
*
* 此方法不報(bào)告導(dǎo)致方法返回的原因,調(diào)用方應(yīng)首先重新檢查導(dǎo)致線程停止的條件,調(diào)用方還可以在返回時(shí)確定線程的中斷狀態(tài)或者調(diào)用返回時(shí)的時(shí)間。
*/
public static void parkUntil(Object blocker, long deadline) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(true, deadline);
setBlocker(t, null);
}
上面三種形式的park都有blocker參數(shù),此參數(shù)在線程進(jìn)入WAITING或者TIMED_WAITING狀態(tài)時(shí)被記錄,以幫助監(jiān)視工具和診斷工具確定線程受阻塞的原因(通過getBlocker方法獲取blocker)。
看下線程dump的結(jié)果來理解blocker的作用。
2 unpark 操作
/**
* 使給定線程的許可證可用(如果尚未可用),如果線程在 park上被阻塞那么它將解除阻塞,否則保證下一次調(diào)用park不會被阻塞。如果指定的線
* 程尚未啟動,則不能保證此操作有任何效果。
*/
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}
歸根結(jié)底,LockSupport調(diào)用的是Unsafa的native代碼:
// park方法
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
...
// 調(diào)用parker的park方法
thread->parker()->park(isAbsolute != 0, time);
...
UNSAFE_END
// unpark方法
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
...
if (p != NULL) {
HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
// 調(diào)用parker的unpark方法
p->unpark();
}
UNSAFE_END
每個(gè)JAVA線程都有一個(gè)Parker成員變量:
// JSR166 per-thread parker
private:
Parker* _parker;
public:
Parker* parker() { return _parker; }
park函數(shù)使當(dāng)前線程進(jìn)入WAITING或者TIMED_WAITING狀態(tài),而其unpark函數(shù)則是將指定線程喚醒,看一下Parker的park和unpark方法:
class Parker : public os::PlatformParker {
private:
volatile int _counter ;
...
public:
Parker() : PlatformParker() {
_counter = 0 ; //初始化為0
...
}
...
public:
// For simplicity of interface with Java, all forms of park (indefinite,
// relative, and absolute) are multiplexed into one call.
void park(bool isAbsolute, jlong time);
void unpark();
...
};
os::PlatformParker是對底層的抽象,根據(jù)不同的操作系統(tǒng)有不同的實(shí)現(xiàn),在Linux系統(tǒng)下,是用Posix線程庫pthreads中的mutex(互斥量)和condition(條件變量)實(shí)現(xiàn)的,mutex和condition保護(hù)了一個(gè)counter的變量,當(dāng)park時(shí)counter被設(shè)置為0,當(dāng)unpark時(shí)_counter被設(shè)置為1。接下來看一下Linux系統(tǒng)中的實(shí)現(xiàn):
class PlatformParker : public CHeapObj<mtInternal> {
protected:
enum {
REL_INDEX = 0,
ABS_INDEX = 1
};
int _cur_index; // which cond is in use: -1, 0, 1
pthread_mutex_t _mutex [1] ; // pthread互斥鎖
pthread_cond_t _cond [2] ; // pthread條件變量數(shù)組,一個(gè)用于相對時(shí)間,一個(gè)用于絕對時(shí)間
public: // TODO-FIXME: make dtor private
~PlatformParker() { guarantee (0, "invariant") ; }
public:
PlatformParker() {
int status;
status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());
assert_status(status == 0, status, "cond_init rel");
status = pthread_cond_init (&_cond[ABS_INDEX], NULL);
assert_status(status == 0, status, "cond_init abs");
status = pthread_mutex_init (_mutex, NULL);
assert_status(status == 0, status, "mutex_init");
_cur_index = -1; // mark as unused
}
};
下面是park和unpark方法的具體實(shí)現(xiàn):
void Parker::park(bool isAbsolute, jlong time) {
// Ideally we'd do something useful while spinning, such
// as calling unpackTime().
// Optional fast-path check:
// Return immediately if a permit is available.
// We depend on Atomic::xchg() having full barrier semantics
// since we are doing a lock-free update to _counter.
if (Atomic::xchg(0, &_counter) > 0) return; // 使用 xchgl 指令將_counter的值設(shè)置為0,如果_counter原來的值大于0則返回
Thread* thread = Thread::current();
assert(thread->is_Java_thread(), "Must be JavaThread");
JavaThread *jt = (JavaThread *)thread;
// Optional optimization -- avoid state transitions if there's an interrupt pending.
// Check interrupt before trying to wait
if (Thread::is_interrupted(thread, false)) { // 如果線程處于中斷狀態(tài),直接返回
return;
}
// Next, demultiplex/decode time arguments
timespec absTime;
// 如果time小于0,或者isAbsolute是true并且time等于0則直接返回
if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
return;
}
if (time > 0) {
unpackTime(&absTime, isAbsolute, time);
}
// Enter safepoint region
// Beware of deadlocks such as 6317397.
// The per-thread Parker:: mutex is a classic leaf-lock.
// In particular a thread must never block on the Threads_lock while
// holding the Parker:: mutex. If safepoints are pending both the
// the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
ThreadBlockInVM tbivm(jt); // 構(gòu)造當(dāng)前線程的 ThreadBlockInVM
// 如果當(dāng)前線程設(shè)置了中斷標(biāo)志,或者獲取mutex互斥鎖失敗則直接返回
// 由于Parker是每個(gè)線程都有的,所以_counter cond mutex都是每個(gè)線程都有的,不是所有線程共享的,所以加鎖失敗只有兩種情況,
// 1> unpark已經(jīng)加鎖這時(shí)只需要返回即可,對應(yīng)unpark先調(diào)用的情況
// 2> 調(diào)用pthread_mutex_trylock出錯(cuò)
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { // 嘗試獲取鎖,失敗則返回
return;
}
int status ;
// 如果當(dāng)前線程持有許可(即_counter大于0),說明之前已經(jīng)調(diào)用unpark方法將_counter置為了1
if (_counter > 0) {
_counter = 0; // 將許可消耗掉
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
// Paranoia to ensure our locked and lock-free paths interact
// correctly with each other and Java-level accesses.
OrderAccess::fence();
return;
}
#ifdef ASSERT
// Don't catch signals while blocked; let the running threads have the signals.
// (This allows a debugger to break into the running thread.)
sigset_t oldsigs;
sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
jt->set_suspend_equivalent();
// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()
assert(_cur_index == -1, "invariant");
// 如果time等于0,說明是相對時(shí)間也就是isAbsolute是fasle(否則前面就直接返回了)
if (time == 0) {
_cur_index = REL_INDEX; // arbitrary choice when not timed
// 當(dāng)前線程進(jìn)入WAITING狀態(tài)
status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
} else {
// 判斷isAbsolute是false還是true,false的話使用_cond[0],否則用_cond[1]
_cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
// 使用條件變量當(dāng)前線程進(jìn)入TIMED_WAITING狀態(tài)
status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
if (status != 0 && WorkAroundNPTLTimedWaitHang) {
pthread_cond_destroy (&_cond[_cur_index]) ;
pthread_cond_init (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
}
}
// 如果當(dāng)前線程被喚醒則繼續(xù)向下執(zhí)行
_cur_index = -1;
assert_status(status == 0 || status == EINTR ||
status == ETIME || status == ETIMEDOUT,
status, "cond_timedwait");
#ifdef ASSERT
pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif
_counter = 0 ; // 返回后 _counter 狀態(tài)位重置
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
// 使用內(nèi)存屏障使_counter對其它線程可見
OrderAccess::fence();
// If externally suspended while waiting, re-suspend
if (jt->handle_special_suspend_equivalent_condition()) {
jt->java_suspend_self();
}
}
void Parker::unpark() {
int s, status ;
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
_counter = 1;
if (s < 1) {
// 說明當(dāng)前parker對應(yīng)的線程掛起了,因?yàn)開cur_index初始是-1,并且等待條件變量的線程被喚醒
// 后也會將_cur_index重置-1
if (_cur_index != -1) {
// 如果設(shè)置了WorkAroundNPTLTimedWaitHang先調(diào)用signal再調(diào)用unlock,在hotspot在Linux下默認(rèn)使用這種方式
// 即先調(diào)用signal再調(diào)用unlock
if (WorkAroundNPTLTimedWaitHang) {
status = pthread_cond_signal (&_cond[_cur_index]);
assert (status == 0, "invariant");
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
} else {
// must capture correct index before unlocking
int index = _cur_index;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant");
status = pthread_cond_signal (&_cond[index]);
assert (status == 0, "invariant");
}
} else { // 如果_cur_index == -1說明線程沒在等待條件變量,則直接解鎖
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
} else { // 如果_counter == 1,說明線程調(diào)用了一次或多次unpark但是沒調(diào)用park,則直接解鎖
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}
inline jint Atomic::xchg (jint exchange_value, volatile jint* dest) {
__asm__ volatile ( "xchgl (%2),%0"
: "=r" (exchange_value)
: "0" (exchange_value), "r" (dest)
: "memory");
return exchange_value;
}
__asm__ 代表后面的xchgl是匯編指令,xchgl是一個(gè)原子操作,交換exchange_value和dest的值,并且將exchange_value的值返回。
首先說明一下上面用到的三個(gè)方法:
int pthread_mutex_lock(pthread_mutex_t* mutex); // 阻塞式的加鎖
int pthread_mutex_trylock(pthread_mutex_t* mutex); // 非阻塞式的加鎖,加不上鎖就返回
int pthread_mutex_unlock(pthread_mutex_t* mutex); // 解鎖
// 以上函數(shù)成功返回0;失敗返回錯(cuò)誤碼
接下來總結(jié)一下park方法的邏輯:
1> 使用 xchgl 指令將counter修改為 0 返回,如果counter原來的值大于0則返回,即有許可直接消費(fèi)掉
2> pthread_mutex_trylock 嘗試獲取鎖,失敗則返回非0的錯(cuò)誤碼(其他線程unpark該線程時(shí)持有mutex互斥量或者pthread_mutex_trylock調(diào)用出錯(cuò)) 3> 如果持有許可(即counter >0成立),則消費(fèi)掉許可(即執(zhí)行counter = 0),釋放鎖(執(zhí)行pthread_mutex_unlock)然后返回
4> 如果 3 不成立,根據(jù)時(shí)間的不同執(zhí)行不同的等待函數(shù),如果等待正確返回,則消費(fèi)掉許可(即執(zhí)行counter = 0),釋放鎖(執(zhí)行pthread_mutex_unlock)然后返回
unpark的邏輯為:
1> pthread_mutex_lock 獲取鎖,可能會阻塞線程
2> 將參數(shù)線程設(shè)置為持有許可(即_counter 設(shè)置為 1)
3> 判斷 _counter 的舊值:
小于 1 時(shí),調(diào)用 pthread_cond_signal 喚醒在 park 阻塞的線程;
等于 1 時(shí),直接返回
3.2 CAS 的工作原理
說到CAS,第一個(gè)想到的就是原子類型,其實(shí)4.1中的問題就可以通過AtomicInteger解決:
static class CustomService {
private final AtomicInteger num = new AtomicInteger();
public void add() {
num.getAndIncrement();
}
public int getNum() {
return num.get();
}
}
為什么上面的代碼可以解決線程安全的問題呢?先看下AtomicInteger的部分源碼:
// Unsafe 對象,可以直接根據(jù)內(nèi)存地址操作數(shù)據(jù),可以突破JVM的現(xiàn)在直接操作內(nèi)存,所以是不安全的
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
// 存儲value屬性在AtomicInteger類實(shí)例內(nèi)部的偏移地址
private static final long VALUE;
static {
try {
// 在類初始化的時(shí)候就獲取就獲取到了value屬性在對象內(nèi)部的偏移地址
VALUE = U.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
// 存儲實(shí)際的值
private volatile int value;
......
/**
* Atomically increments by one the current value.
*
* @return the previous value
*/
public final int getAndIncrement() {
return U.getAndAddInt(this, VALUE, 1);
}
1> 首先內(nèi)部持有一個(gè)Unsafe對象,原子類底層的操作都是基于Unsafe進(jìn)行的。
2> volatile int value 是AtomicInteger實(shí)例的實(shí)際數(shù)值,volatile保證并發(fā)中的可見性和有序性。
3> VALUE 是 value屬性在AtomicInteger對象內(nèi)部的偏移地址,通過Unsafe類是可以在內(nèi)存級別給變量賦值的,要操作AtomicInteger實(shí)例的數(shù)據(jù),首先要知道AtomicInteger實(shí)例的內(nèi)存地址,其次是要知道value屬性在對象內(nèi)部的偏移量VALUE,接著直接給這塊內(nèi)存賦值就行了:

4> AtomicInteger的getAndIncrement()方法是基于Unsafe的getAndAddInt的,Unsafe的getAndAddInt方法源碼:
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}
1> o + offset 得到value屬性在內(nèi)存中的地址,然后根據(jù)地址從內(nèi)存中獲取value的值,保存在v中(即期望值)。
2> compareAndSwapInt : v的值跟當(dāng)前內(nèi)存的值(o + offsetSet 地址對應(yīng)的值)進(jìn)行對比,如果相等則將內(nèi)存的值修改為 v + delta(即CAS操作成功),如果值不相等,則進(jìn)入下一次循環(huán),
直到CAS操作成功為止。
3> CAS操作是怎么保證原子性的呢(比較和修改這是兩個(gè)動作,如果比較的時(shí)候是一樣的,修改的時(shí)候被別的線程修改了)?
接下來hotspot中compareAndSwapInt的實(shí)現(xiàn):
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::is_MP();
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
__asm_ "cmpxchg面的xchgl是匯編指令,LOCK_IF_MP代表如果設(shè)計(jì)多核心則添加lock,使cmpxchgl指令原子性
CAS保證了加1操作的原子性,volatile確保了可見性、有序性,所以可以解決上面的問題

3.3 ReentrantLock的工作原理
LockSupport 底層使用互斥量mutex和condition,互斥量mutex和condition是內(nèi)核提供的能力,所以性能消耗是非常大的。
首先通過下圖整體看一下ReentrantLock的類結(jié)構(gòu)

首先
ReentrantLock繼承自父類Lock,然后有3個(gè)內(nèi)部類,其中Sync內(nèi)部類繼承自AQS,另外的兩個(gè)內(nèi)部類繼承自Sync,這兩個(gè)類分別是用來實(shí)現(xiàn)公平鎖和非公平鎖的,首先看一下非公平鎖的流程圖:
接下來就是對應(yīng)的源碼:
// ReentrantLock.java
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
// 通過CAS的方式先判斷state == 0是否成立,如果成立則將當(dāng)前state設(shè)置為1
if (compareAndSetState(0, 1))
// state成功設(shè)置為1,說明當(dāng)前線程獲取到資源,則將當(dāng)前線程保存下來
setExclusiveOwnerThread(Thread.currentThread());
else
// 失敗的話將當(dāng)前線程放到等待隊(duì)列
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
// 非公平的嘗試獲取鎖
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// c == 0 表示沒有線程占有鎖
if (c == 0) {
// 通過CAS的方式先判斷state == 0是否成立,如果成立則將當(dāng)前state設(shè)置為1
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果當(dāng)前線程已經(jīng)占有鎖,則state加1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
...
}
// AbstractQueuedSynchronizer.java
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long STATE;
private static final long HEAD;
private static final long TAIL;
static {
try {
STATE = U.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
HEAD = U.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
TAIL = U.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
// Reduce the risk of rare disastrous classloading in first call to
// LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
}
/**
* The synchronization state.
*/
private volatile int state;
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSwapInt(this, STATE, expect, update);
}
// 請求鎖
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
Sync通過acquire和release方法獲取個(gè)釋放鎖,所以 ReentrantLock實(shí)現(xiàn)的是AQS的獨(dú)占模式,也就是獨(dú)占鎖。
上面說明了非公平鎖的獲取和釋放流程,接下來看一下公平鎖:
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
// 將當(dāng)前線程放到等待隊(duì)列
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// c == 0 表示沒有線程占有鎖
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 等待隊(duì)列中不存在有效節(jié)點(diǎn)并且CAS成功的情況下當(dāng)前線程獲取到鎖
setExclusiveOwnerThread(current);
return true;
}
}
// 如果當(dāng)前線程已經(jīng)獲取到鎖,則state加1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
釋放鎖的流程和非公平鎖一樣,不一樣的地方體現(xiàn)在獲取鎖的流程。
接下來看一下AQS中核心等待隊(duì)列的管理:
3.3.1 線程加入隊(duì)列的時(shí)機(jī)
當(dāng)執(zhí)行Acquire(1)時(shí),會通過tryAcquire獲取鎖。在這種情況下,如果獲取鎖失敗,就會調(diào)用addWaiter加入到等待隊(duì)列中去。
獲取鎖失敗后,會執(zhí)行 addWaiter(Node.EXCLUSIVE) 加入等待隊(duì)列,具體實(shí)現(xiàn)方法如下:
private Node addWaiter(Node mode) {
// 創(chuàng)建于當(dāng)前線程關(guān)聯(lián)的Node節(jié)點(diǎn)(即將thread字段設(shè)置為當(dāng)前線程)
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
// 將node的prev設(shè)置為oldTail
U.putObject(node, Node.PREV, oldTail);
// 使用CAS的方式判斷oldTail地址和tail的地址是否相同,成立的話將tail設(shè)置為node
if (compareAndSetTail(oldTail, node)) {
// 最終完成將Node插入隊(duì)尾
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
private final void initializeSyncQueue() {
Node h;
// 創(chuàng)建第一個(gè)節(jié)點(diǎn)(虛節(jié)點(diǎn),占位用),head和tail同時(shí)指向該節(jié)點(diǎn)
if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
tail = h;
}

回到公平鎖的代碼,hasQueuedPredecessors是公平鎖加鎖時(shí)判斷等待隊(duì)列中是否存在有效節(jié)點(diǎn)的方法。如果返回False,說明當(dāng)前線程可以爭取共享資源;如果返回True,說明隊(duì)列中存在有效節(jié)點(diǎn),當(dāng)前線程必須加入到等待隊(duì)列中。
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// h != t 成立代表等待對列中存在等待的節(jié)點(diǎn)
// (s = h.next) == null 成立代表其他線程正在添加到等待隊(duì)列,進(jìn)行到了tail已經(jīng)指向新Node,但是Head還沒有指向新Node,此時(shí)隊(duì)列中有元素,需要返回true
// s.thread != Thread.currentThread() 成立代表等待隊(duì)列的第一個(gè)有效節(jié)點(diǎn)線程與當(dāng)前線程不同,所以當(dāng)前線程不可以獲取鎖,當(dāng)前線程必須加入進(jìn)等待隊(duì)列
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
回到最初的代碼:
// 請求鎖
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上文解釋了addWaiter方法,這個(gè)方法其實(shí)就是把對應(yīng)的線程以Node的數(shù)據(jù)結(jié)構(gòu)形式加入到雙端隊(duì)列里,返回的是一個(gè)包含該線程的Node。而這個(gè)Node會作為參數(shù),進(jìn)入到acquireQueued方法中。acquireQueued會把放入隊(duì)列中的線程不斷去獲取鎖,直到獲取成功或者被park掛起:
final boolean acquireQueued(final Node node, int arg) {
try {
// 標(biāo)記等待過程中是否中斷過
boolean interrupted = false;
// 開始自旋,要么獲取鎖,要么中斷
for (;;) {
// 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
// 如果p是首節(jié)點(diǎn),說明當(dāng)前節(jié)點(diǎn)在真實(shí)數(shù)據(jù)隊(duì)列的頭部,就嘗試獲取鎖(別忘了頭結(jié)點(diǎn)是虛節(jié)點(diǎn))
if (p == head && tryAcquire(arg)) {
// 獲取鎖成功,head指針移動到當(dāng)前node,node節(jié)點(diǎn)變成虛節(jié)點(diǎn)
setHead(node);
p.next = null; // help GC
return interrupted;
}
// 說明p為首節(jié)點(diǎn)且當(dāng)前沒有獲取到鎖(可能是鎖被其他線程占了)或者是p不是首節(jié)點(diǎn),這個(gè)時(shí)候就要判斷當(dāng)前node是否要被阻塞(被阻塞條件:前驅(qū)節(jié)點(diǎn)的waitStatus為SIGNAL),
// 防止無限循環(huán)浪費(fèi)資源。具體兩個(gè)方法下面細(xì)細(xì)分析
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
// 將指定的節(jié)點(diǎn)設(shè)置為首節(jié)點(diǎn)(即虛節(jié)點(diǎn))
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
// 靠前驅(qū)節(jié)點(diǎn)判斷當(dāng)前線程是否應(yīng)該被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取前驅(qū)節(jié)點(diǎn)的節(jié)點(diǎn)狀態(tài)
int ws = pred.waitStatus;
// 說明前驅(qū)節(jié)點(diǎn)處于SIGNAL狀態(tài)(在獨(dú)占鎖機(jī)制中,waitStatus 只會使用到 CANCELLED 和 SIGNAL 兩個(gè)狀態(tài))
if (ws == Node.SIGNAL)
return true;
// 通過枚舉值我們知道waitStatus>0是取消狀態(tài)
if (ws > 0) {
do {
// 循環(huán)向前查找取消節(jié)點(diǎn),把取消節(jié)點(diǎn)從隊(duì)列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 設(shè)置前驅(qū)節(jié)點(diǎn)等待狀態(tài)為SIGNAL
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt主要用于掛起當(dāng)前線程,阻塞調(diào)用棧,返回當(dāng)前線程的中斷狀態(tài)。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
上述方法的流程圖如下:

從上圖可以看出,跳出當(dāng)前循環(huán)的條件是當(dāng)
前置節(jié)點(diǎn)是頭結(jié)點(diǎn),且當(dāng)前線程獲取鎖成功。為了防止因死循環(huán)導(dǎo)致CPU資源被浪費(fèi),我們會判斷前置節(jié)點(diǎn)的狀態(tài)來決定是否要將當(dāng)前線程掛起,具體掛起流程用流程圖表示如下(shouldParkAfterFailedAcquire流程):
3.3.2 等待隊(duì)列中出隊(duì)時(shí)機(jī)
// ReentrantLock.java
public void unlock() {
sync.release(1);
}
abstract static class Sync extends AbstractQueuedSynchronizer {
...
// 嘗試釋放鎖
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 如果當(dāng)前線程沒有占有鎖則拋出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// c == 0 代表當(dāng)前線程要釋放鎖
if (c == 0) {
free = true;
// 清空占有鎖的線程對象
setExclusiveOwnerThread(null);
}
// 更新state的值
setState(c);
return free;
}
}
// AbstractQueuedSynchronizer.java
// 釋放鎖
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 解鎖隊(duì)列總的第一個(gè)有效節(jié)點(diǎn)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
// 獲取當(dāng)前節(jié)點(diǎn)的waitStatus
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);
}


