本篇不寫前言,直接扒衣服!
1:concurrent包結(jié)構(gòu)

最底層:
volatile變量:volatile保證變量在內(nèi)存中的可見性。java線程模型包括線程的私有內(nèi)存和所有線程的公共內(nèi)存,這與cpu緩存和內(nèi)存類似。線程對變量進行操作的時候一般是從公共內(nèi)存中拷貝變量的副本,等修改完之后重新寫入到公共內(nèi)存,這存在并發(fā)風險。而被volatile標注的變量通過CPU原語保證了變量的內(nèi)存可見性,也就是說一個線程讀到工作內(nèi)存中的變量一定是其他線程已經(jīng)更新到內(nèi)存中的變量。為簡單起見,你可以想象成所有線程都在公共內(nèi)存中操作volatile變量,這樣一個線程對volatile的更改其他線程都看的見,即內(nèi)存可見性!
CAS:compare and swap,比較-相等-替換新值,跟底層CPU有關(guān),一般CAS操作的都是volatile變量。CAS操作包含內(nèi)存位置(V)、預期原值(A)和新值(B),?如果內(nèi)存位置的值與預期原值相匹配,那么處理器會自動將該位置值更新為新值 。否則,處理器不做任何操作。如方法a.CAS(b,c),如果內(nèi)存中a與預期值b相等,那么把a更新成c。cas操作在java中由sun.misc.Unsafe類全權(quán)代理了!
最底層結(jié):concurrent包中用cas死循環(huán)修改volatile變量直到修改成功是最常見的手法!
第二層:
AtomicXXX類:常用基本類型和引用都有Atomic實現(xiàn),其中最重要的兩點當然是一個volatile變量和一個CAS操作。++i和i++操作就是volatile和cas的典型用法,incrementAndget和getAndincrement是兩個實現(xiàn)方法,死循環(huán):首先獲取volatile變量值a,然后執(zhí)行a.cas(a,a+1)直到修改成功。
AQS框架:總有面試官會問你AQS框架,盡管我懷疑他們并不真正了解所有細節(jié),Doug lea最屌的思想應該都在AQS框架里了,直接上圖(借用,侵刪)

該類最主要的兩部分就是state狀態(tài)和Node節(jié)點(即隊列中的元素),前一個是資源狀態(tài),后一個還是cas操作volatile的典型應用,Node還分為獨占模式和共享模式
第二層源碼解析:
以獨占模式為例,想象醫(yī)院某診室排隊看病。
A:如何競爭資源(去看病)?
public final void acquire(int arg) {
? ? ? ? if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
? ? ? ? ? ? selfInterrupt();
}
這是競爭資源的入口,不管有多少線程、是富得流油還是窮的吃土都要從這進!
B:獲取資源后發(fā)生了啥?
首先,嘗試獲取資源,成true敗false,此方法每個子類有自己的實現(xiàn),就像有的醫(yī)院需要排隊(公平鎖),有的可以花錢不用排隊(非公平鎖)。
protected boolean tryAcquire(int arg) {
? ? ? ? throw new UnsupportedOperationException();
}
然后,為當前線程創(chuàng)建節(jié)點Node并設置成獨占模式(相當于每個看病的人取了一個號,并且是獨占診室的那種號),先用快速方法加入隊尾,加入失敗的話調(diào)用enq方法死循環(huán)加入,總之要保證每個病人都取到了號,并且進入排隊狀態(tài)。
private Node addWaiter(Node mode) {
? ? ? ? Node node = new Node(Thread.currentThread(), mode);
? ? ? ? // 快速加入隊尾,如果失敗就調(diào)用enq方法死循環(huán)加入
? ? ? ? Node pred = tail;
? ? ? ? if (pred != null) {
? ? ? ? ? ? node.prev = pred;
? ? ? ? ? ? if (compareAndSetTail(pred, node)) {
? ? ? ? ? ? ? ? pred.next = node;
? ? ? ? ? ? ? ? return node;
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? enq(node);
? ? ? ? return node;
? ? }
private Node enq(final Node node) {
? ? ? ? //CAS外加死循環(huán)操作volatile的Node節(jié)點,這個套路我們太熟悉了,保證了隊尾競爭的線程安全
? ? ? ? for (;;) {
? ? ? ? ? ? Node t = tail;
? ? ? ? ? ? if (t == null) { // Must initialize
? ? ? ? ? ? ? ? if (compareAndSetHead(new Node()))
? ? ? ? ? ? ? ? ? ? tail = head;
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? node.prev = t;
? ? ? ? ? ? ? ? if (compareAndSetTail(t, node)) {
? ? ? ? ? ? ? ? ? ? t.next = node;
? ? ? ? ? ? ? ? ? ? return t;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
再次,線程不能一直等待,需要休息,于是線程輪詢查看前驅(qū)節(jié)點是否占用了資源,如果是那么自己嘗試獲取資源,獲取到了就把自己設置為頭結(jié)點。如果不是,那么沒關(guān)系,調(diào)用shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()兩個方法。
final boolean acquireQueued(final Node node, int arg) {
? ? ? ? boolean failed = true;
? ? ? ? try {
? ? ? ? ? ? boolean interrupted = false;
? ? ? ? ? ? for (;;) {
? ? ? ? ? ? ? ? final Node p = node.predecessor();
? ? ? ? ? ? ? ? if (p == head && tryAcquire(arg)) {
? ? ? ? ? ? ? ? ? ? setHead(node);
? ? ? ? ? ? ? ? ? ? p.next = null; // 幫助垃圾回收
? ? ? ? ? ? ? ? ? ? failed = false;
? ? ? ? ? ? ? ? ? ? return interrupted;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
? ? ? ? ? ? ? ? ? ? interrupted = true;
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? if (failed)
? ? ? ? ? ? ? ? cancelAcquire(node);
? ? ? ? }
? ? }
然后,假如當前線程前驅(qū)節(jié)點沒有占用資源或者人家占用資源還沒釋放,那么調(diào)用shouldParkAfterFailedAcquire(p, node)方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
? ? ? ? int ws = pred.waitStatus;
? ? ? ? if (ws == Node.SIGNAL)
? ? ? ? ? ? return true;
? ? ? ? if (ws > 0) {
? ? ? ? ? ? do {
? ? ? ? ? ? ? ? node.prev = pred = pred.prev;
? ? ? ? ? ? } while (pred.waitStatus > 0);
? ? ? ? ? ? pred.next = node;
? ? ? ? } else {
? ? ? ? ? ? compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
? ? ? ? }
? ? ? ? return false;
? ? }

看上面代碼,ws是當前線程前驅(qū)節(jié)點的狀態(tài),如果前驅(qū)節(jié)點狀態(tài)是SIGNAL,意思是后繼節(jié)點應該等待,那么返回true。否則,如果前驅(qū)節(jié)點狀態(tài)大于0,意思就是取消了(不排隊了)那么就移動到不排隊的節(jié)點前面去,不排隊的線程最后會被垃圾回收器回收。如果前驅(qū)節(jié)點狀態(tài)是其他,那么就把前驅(qū)節(jié)點狀態(tài)設置成SIGNAL,告訴他當前線程需要被等待。以上過程執(zhí)行完當前線程就可以等待了parkAndCheckInterrupt()(跟前面排隊的病人溝通,告訴他我要休息一下,快叫到我的時候通知我一下)。
private final boolean parkAndCheckInterrupt() {
? ? ? ? LockSupport.park(this);
? ? ? ? return Thread.interrupted();
? ? }
最后,可見這里面的等待實際上是調(diào)用LockSupport.park方法實現(xiàn)的,這里的park實際上與wait方法類似。
C:有借有還,再借不難,釋放資源
public final boolean release(int arg) {
? ? ? ? if (tryRelease(arg)) {
? ? ? ? ? ? Node h = head;
? ? ? ? ? ? if (h != null && h.waitStatus != 0)
? ? ? ? ? ? ? ? unparkSuccessor(h);
? ? ? ? ? ? return true;
? ? ? ? }
? ? ? ? return false;
? ? }
首先,釋放資源入口是release方法,release實際上與acquire恰好鏡像,如tryRelease可以自己實現(xiàn),然后真正釋放的時候?qū)嶋H上調(diào)用了unparkSuccessor(h)方法
private void unparkSuccessor(Node node) {
? ? ? ? int ws = node.waitStatus;
? ? ? ? if (ws < 0)
? ? ? ? ? ? compareAndSetWaitStatus(node, ws, 0);
? ? ? ? Node s = node.next;
? ? ? ? if (s == null || s.waitStatus > 0) {
? ? ? ? ? ? s = null;
? ? ? ? ? ? for (Node t = tail; t != null && t != node; t = t.prev)
? ? ? ? ? ? ? ? if (t.waitStatus <= 0)
? ? ? ? ? ? ? ? ? ? s = t;
? ? ? ? }
? ? ? ? if (s != null)
? ? ? ? ? ? LockSupport.unpark(s.thread);
? ? }
解除占用方法,可以看到,首先將占用資源的線程節(jié)點狀態(tài)由設置成0,然后調(diào)用LockSupport.unpark方法喚醒next節(jié)點的線程,相當于await/signal方法中的signal。
獨占模式說完下面分析下共享模式。
A:如何競爭資源(想想排隊上廁所)?
猜也能猜個大概,肯定是acquireShare巴拉巴拉的。
public final void acquireShared(int arg) {
? ? ? ? if (tryAcquireShared(arg) < 0)
? ? ? ? ? ? doAcquireShared(arg);
? ? }
上面代碼是獲取共享資源的,首先嘗試獲取,這個與獨占模式類似,稍有不同的地方是其返回值,-1表示獲取失敗,0表示獲取成功但沒有可用資源,1表示獲取成功并且有資源。-1表示廁所門都沒進去、0表示進廁所門了但是沒有坑位、1表示即進入了廁所門又有坑位。
重點:共享資源可以有多份,這樣可以同時讓多個線程共享資源,所以PROPAGATE狀態(tài)可以保證在一個線程釋放資源后其他狀態(tài)為PROPAGATE的線程都能被喚醒。
B:怎么競爭資源?
private void doAcquireShared(int arg) {
? ? ? ? final Node node = addWaiter(Node.SHARED);
? ? ? ? boolean failed = true;
? ? ? ? try {
? ? ? ? ? ? boolean interrupted = false;
? ? ? ? ? ? for (;;) {
? ? ? ? ? ? ? ? final Node p = node.predecessor();
? ? ? ? ? ? ? ? if (p == head) {
? ? ? ? ? ? ? ? ? ? int r = tryAcquireShared(arg);
? ? ? ? ? ? ? ? ? ? if (r >= 0) {
? ? ? ? ? ? ? ? ? ? ? ? setHeadAndPropagate(node, r);
? ? ? ? ? ? ? ? ? ? ? ? p.next = null; // help GC
? ? ? ? ? ? ? ? ? ? ? ? if (interrupted)
? ? ? ? ? ? ? ? ? ? ? ? ? ? selfInterrupt();
? ? ? ? ? ? ? ? ? ? ? ? failed = false;
? ? ? ? ? ? ? ? ? ? ? ? return;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? if (shouldParkAfterFailedAcquire(p, node) &&
? ? ? ? ? ? ? ? ? ? parkAndCheckInterrupt())
? ? ? ? ? ? ? ? ? ? interrupted = true;
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? if (failed)
? ? ? ? ? ? ? ? cancelAcquire(node);
? ? ? ? }
? ? }
上述代碼在嘗試獲取資源失敗后進入(即沒進入廁所門就進入該代碼塊)。addWaiter獨占模式中已經(jīng)講過了,先快速入隊,快速失敗后enq死循環(huán)cas操作隊尾入隊,只不過這里設置的模式使共享而已。然后看死循環(huán)里面的內(nèi)容,是不是太熟悉了?過程實際上是一樣的,先看下排在自己前面的人是不是占著坑位,如果是就嘗試獲取下資源,如果失敗就告訴前面排隊的你的下一位是我,釋放之后通知我,我要去等待了(實際上就是把前驅(qū)節(jié)點狀態(tài)設置成SIGNAL)。
private void setHeadAndPropagate(Node node, int propagate) {
? ? ? ? Node h = head; // Record old head for check below
? ? ? ? setHead(node);
? ? ? ? if (propagate > 0 || h == null || h.waitStatus < 0 ||
? ? ? ? ? ? (h = head) == null || h.waitStatus < 0) {
? ? ? ? ? ? Node s = node.next;
? ? ? ? ? ? if (s == null || s.isShared())
? ? ? ? ? ? ? ? doReleaseShared();
? ? ? ? }
? ? }
注意上面代碼與獨占模式的小區(qū)別,setHeadAndPropagate()執(zhí)行的時候表示當前節(jié)點線程已經(jīng)獲取到了資源。此時它把自己設置為頭結(jié)點,并需要喚醒其后繼節(jié)點(如果有的話),喚醒過程實際上調(diào)用了doReleaseShared方法。
C:有借有還,再借不難,釋放資源
public final boolean releaseShared(int arg) {
? ? ? ? if (tryReleaseShared(arg)) {
? ? ? ? ? ? doReleaseShared();
? ? ? ? ? ? return true;
? ? ? ? }
? ? ? ? return false;
? ? }
ok,套路與獨占模式相同,先嘗試釋放(子類自己實現(xiàn)),然后執(zhí)行釋放
private void doReleaseShared() {
? ? ? ? for (;;) {
? ? ? ? ? ? Node h = head;
? ? ? ? ? ? if (h != null && h != tail) {
? ? ? ? ? ? ? ? int ws = h.waitStatus;
? ? ? ? ? ? ? ? if (ws == Node.SIGNAL) {
? ? ? ? ? ? ? ? ? ? if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
? ? ? ? ? ? ? ? ? ? ? ? continue;? ? ? ? ? ? // loop to recheck cases
? ? ? ? ? ? ? ? ? ? unparkSuccessor(h);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
? ? ? ? ? ? ? ? ? ? continue;? ? ? ? ? ? ? ? // loop on failed CAS
? ? ? ? ? ? }
? ? ? ? ? ? if (h == head)? ? ? ? ? ? ? ? ? // loop if head changed
? ? ? ? ? ? ? ? break;
? ? ? ? }
? ? }
上述代碼是共享模式下釋放資源的實質(zhì)代碼。首先查看后繼線程是否需要被喚醒,如果需要那么執(zhí)行喚醒,讓它過來搶占資源,然后會把自身狀態(tài)設置成PROPAGATE保證傳播喚醒。
非阻塞數(shù)據(jù)結(jié)構(gòu):這一層的各種數(shù)據(jù)結(jié)構(gòu)如concurrentHashMap,LinkedBlockingQueue咱們后續(xù)再寫,先趁熱打鐵把AQS的子子孫孫們扒個精光!
最頂層:
Lock:頂層接口,里面有l(wèi)ock(),unlock(),tryLock()等重要方法需要子類實現(xiàn)
Condition:頂層接口,里面有await(),signal(),signalAll()等重要方法類似于wait/notify等待通知模型。
ReadWriteLock:頂層接口,兩個方法,readLock()和writeLock()分別獲取讀鎖和寫鎖。
ReentrantLock類:

該類包含了Sync和其子類FairSync、NonfairSync,前者是公平鎖,后者是非公平鎖。默認構(gòu)造函數(shù)是非公平鎖,可以通過boolean值構(gòu)造公平鎖。既然是一個鎖,那么最重要的兩個方法當然是關(guān)鎖和開鎖,該類中對開鎖和關(guān)鎖分別實現(xiàn)了公平和非公平兩個方法,我們來看下具體實現(xiàn):
非公平鎖關(guān)鎖的實現(xiàn):
final void lock() {
????if (compareAndSetState(0, 1))
????????setExclusiveOwnerThread(Thread.currentThread());
????else
????????acquire(1);
}
public final void acquire(int arg) {
????if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
????????selfInterrupt();
}
final boolean nonfairTryAcquire(int acquires) {
? ? ? ? ? ? final Thread current = Thread.currentThread();
? ? ? ? ? ? int c = getState();
? ? ? ? ? ? if (c == 0) {
? ? ? ? ? ? ? ? if (compareAndSetState(0, acquires)) {
? ? ? ? ? ? ? ? ? ? setExclusiveOwnerThread(current);
? ? ? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? else if (current == getExclusiveOwnerThread()) {
? ? ? ? ? ? ? ? int nextc = c + acquires;
? ? ? ? ? ? ? ? if (nextc < 0) // overflow
? ? ? ? ? ? ? ? ? ? throw new Error("Maximum lock count exceeded");
? ? ? ? ? ? ? ? setState(nextc);
? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? }
? ? ? ? ? ? return false;
? ? ? ? }
上述代碼是非公平鎖的實現(xiàn)方式。先快速占有state資源,如果沒占有成功再調(diào)用aquire去競爭,競爭過程又先調(diào)用了自己實現(xiàn)的tryAcquire方法:”
用當前線程獲取AQS中的資源state,我們前文說過,state等于0表示沒有線程占用該資源,所以,這里設置獨占,而如果發(fā)現(xiàn)當前線程已經(jīng)占用了資源(state>0并且current==owner)那么就在state基礎上加上獲取的次數(shù)1。
非公平鎖開鎖:
protected final boolean tryRelease(int releases) {
? ? ? ? ? ? int c = getState() - releases;
? ? ? ? ? ? if (Thread.currentThread() != getExclusiveOwnerThread())
? ? ? ? ? ? ? ? throw new IllegalMonitorStateException();
? ? ? ? ? ? boolean free = false;
? ? ? ? ? ? if (c == 0) {
? ? ? ? ? ? ? ? free = true;
? ? ? ? ? ? ? ? setExclusiveOwnerThread(null);
? ? ? ? ? ? }
? ? ? ? ? ? setState(c);
? ? ? ? ? ? return free;
? ? ? ? }
上述代碼是非公平鎖解鎖過程,之前state被重入了多少次這里就需要釋放多少次,并且把當前占有資源的線程設置為null!直接退出就行。
公平鎖關(guān)鎖過程:
final void lock() {? ?
????acquire(1);
}
public final void acquire(int arg) {???
????if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))????
????????selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
? ? ? ? ? ? final Thread current = Thread.currentThread();
? ? ? ? ? ? int c = getState();
? ? ? ? ? ? if (c == 0) {
? ? ? ? ? ? ? ? if (!hasQueuedPredecessors() &&
? ? ? ? ? ? ? ? ? ? compareAndSetState(0, acquires)) {
? ? ? ? ? ? ? ? ? ? setExclusiveOwnerThread(current);
? ? ? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? else if (current == getExclusiveOwnerThread()) {
? ? ? ? ? ? ? ? int nextc = c + acquires;
? ? ? ? ? ? ? ? if (nextc < 0)
? ? ? ? ? ? ? ? ? ? throw new Error("Maximum lock count exceeded");
? ? ? ? ? ? ? ? setState(nextc);
? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? }
? ? ? ? ? ? return false;
? ? ? ? }
上述代碼是公平鎖實現(xiàn)過程,重點在?if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) 這個hasQueuePredescessors方法!只有這個地方公平鎖與非公平鎖存在差異,該方法顧名思義,判斷當前線程節(jié)點是否有前驅(qū)節(jié)點,如果沒有才會獨占資源。
公平鎖的釋放過程與非公平鎖相同!
那么公平性與非公平性體現(xiàn)在哪?兩個方面:
A:非公平鎖在lock()的時候直接用當前線程占用資源失敗之后才會調(diào)用acquire方法,這是其一
B:非公平鎖在acquire方法調(diào)用過程中調(diào)用了自己實現(xiàn)的tryAcquire方法,該方法不會等待判斷當前線程是否有前驅(qū)節(jié)點,而是直接用當前線程占用state資源,這是其二
小結(jié):現(xiàn)在可以理解多線程執(zhí)行一個用lock和unlock括起來的代碼發(fā)生的事情了。
多個線程到達lock()方法
如果是公平鎖,那么多個線程同時競爭,競爭成功的就占有state資源,競爭失敗的就去clh隊列里面排隊,排隊過程中自旋那么一兩次進入await等待被前驅(qū)線程喚醒。
如果是非公平鎖,那么每個線程過來直接嘗試占有資源,如果沒有成功,那么也是不排隊直接嘗試獲取資源,如果再不成功還是要進入clh隊列排隊,自旋一兩次await等待被前驅(qū)線程喚醒。
一個線程釋放資源到達unlock方法
直接通知他的后繼節(jié)點過來爭搶資源,這個過程還存在新來的線程直接獲取state的風險(非公平)。
CountDownLatch類:
先說下該類是干啥的,調(diào)用CountDownLatch.await()方法的線程會等待構(gòu)造方法(CountDownLatch(int i))中i個線程都執(zhí)行過countDown方法后才會繼續(xù)執(zhí)行。說白了就是調(diào)用await方法的線程請客,等所有線程都到齊之后這個線程才開始做飯。
下面分析下源碼實現(xiàn):
構(gòu)造方法:構(gòu)造方法實際上把state資源設置成了多份。
public CountDownLatch(int count) {
? ? ? ? if (count < 0) throw new IllegalArgumentException("count < 0");
? ? ? ? this.sync = new Sync(count);
}
Sync(int count) {
? ? ? ? ? ? setState(count);
? }
await方法:實際上是共享模式下獲取資源,當前線程在沒有獲取到資源的情況下會進入到資源競爭隊列,共享模式下獲取資源。假如現(xiàn)在state=10共10個資源,等待隊列里有10個線程,前8個線程獲取到了10個資源,第一個和第二個線程分別占用了兩個資源,那么當?shù)谝粋€線程釋放了2個資源后會通知整個隊列的所有標記為shared的10個線程來競爭資源,由于競爭過程是公平的,所以如果此時第9個和第10個線程分別需要1個資源,那么他們兩個都會得到滿足,加入第9個需要3個資源,那么他需要等待。這就很好的解釋了CountDownLatch的await方法,由于等待的線程獲取到共享鎖之后加入到了隊列尾部,它等待的實際上是state變?yōu)?,即所有的線程都釋放,這個時候r>0執(zhí)行return;否則state!=0證明有線程在占用共享資源,那么它可能被LockSupport》park方法await等待。
public void await() throws InterruptedException {
? ? ? ? sync.acquireSharedInterruptibly(1);
}
private void doAcquireSharedInterruptibly(int arg)
? ? ? ? throws InterruptedException {
? ? ? ? final Node node = addWaiter(Node.SHARED);
? ? ? ? boolean failed = true;
? ? ? ? try {
? ? ? ? ? ? for (;;) {
? ? ? ? ? ? ? ? final Node p = node.predecessor();
? ? ? ? ? ? ? ? if (p == head) {
? ? ? ? ? ? ? ? ? ? int r = tryAcquireShared(arg);
? ? ? ? ? ? ? ? ? ? if (r >= 0) {
? ? ? ? ? ? ? ? ? ? ? ? setHeadAndPropagate(node, r);
? ? ? ? ? ? ? ? ? ? ? ? p.next = null; // help GC
? ? ? ? ? ? ? ? ? ? ? ? failed = false;
? ? ? ? ? ? ? ? ? ? ? ? return;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? if (shouldParkAfterFailedAcquire(p, node) &&
? ? ? ? ? ? ? ? ? ? parkAndCheckInterrupt())
? ? ? ? ? ? ? ? ? ? throw new InterruptedException();
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? if (failed)
? ? ? ? ? ? ? ? cancelAcquire(node);
? ? ? ? }
? ? }
countDown方法:可以看到該方法實際上是把資源state減一!?。?!
public void countDown() {
? ? ? ? sync.releaseShared(1);
? ? }
public final boolean releaseShared(int arg) {
? ? ? ? if (tryReleaseShared(arg)) {
? ? ? ? ? ? doReleaseShared();
? ? ? ? ? ? return true;
? ? ? ? }
? ? ? ? return false;
? ? }
protected boolean tryReleaseShared(int releases) {
? ? ? ? ? ? // Decrement count; signal when transition to zero
? ? ? ? ? ? for (;;) {
? ? ? ? ? ? ? ? int c = getState();
? ? ? ? ? ? ? ? if (c == 0)
? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? int nextc = c-1;
? ? ? ? ? ? ? ? if (compareAndSetState(c, nextc))
? ? ? ? ? ? ? ? ? ? return nextc == 0;
? ? ? ? ? ? }
? ? ? ? }
CyclicBarrier類:
先說下這個類是干嘛的,多個線程執(zhí)行任務,每個任務內(nèi)部先調(diào)用了CyclicBarrier.await()方法后就會進入等待,直到構(gòu)造方法中i個線程都執(zhí)行了await方法才會繼續(xù)執(zhí)行任務。相當于4個人打麻將,所有人都到齊之后4個人才開始玩起來。
這里的源碼就不做解釋了,不是基于第二層的,而是基于Condition lock等最頂層的,簡單寫個用法:
public class CyclicBarrierTest2 {
????static CyclicBarrier c = new CyclicBarrier(2, new A());
????public static void main(String[] args) {
????????new Thread(new Runnable() {
????????@Override
????????public void run() {
????????try {
????????c.await();
????????} catch (Exception e) {
????????}
????????System.out.println(1);
}
????????}).start();
????try {
????c.await();
????} catch (Exception e) {
????}
????????System.out.println(2);
????}
????static class A implements Runnable {
????????@Override
????????public void run() {
????????????System.out.println(3);
????????}
????}
}
Semphore類:
Semaphore可以控制某個資源可被同時訪問的個數(shù),acquire()獲取一個許可,如果沒有就等待,而release()釋放一個許可。比如在Windows下可以設置共享文件的最大客戶端訪問個數(shù)。
Semphore類本質(zhì)上把state資源分成多份,通過Shared模式獲取和釋放資源,并實現(xiàn)了公平獲取釋放和非公平獲取釋放兩種操作,我們以非公平獲取釋放為例查看其源碼:
protected int tryAcquireShared(int acquires) {
? ? ? ? ? ? return nonfairTryAcquireShared(acquires);
?}
final int nonfairTryAcquireShared(int acquires) {
? ? ? ? ? ? for (;;) {
? ? ? ? ? ? ? ? int available = getState();
? ? ? ? ? ? ? ? int remaining = available - acquires;
? ? ? ? ? ? ? ? if (remaining < 0 ||
? ? ? ? ? ? ? ? ? ? compareAndSetState(available, remaining))
? ? ? ? ? ? ? ? ? ? return remaining;
? ? ? ? ? }
? ? }
上述代碼就是非公平鎖模式下獲取信號量資源,首先獲取state值(比如信號量總數(shù)為50),然后當前線程需要的數(shù)acquires與可用的數(shù)做差看是否夠用,如果不夠用或者設置資源余量成功那么返回資源余量。注意設置資源余量方法是一定會執(zhí)行的。如果資源余量小于0又會進入到AQS中的方法,涉及到排隊,等待等等。這里不再贅述了!
if (tryAcquireShared(arg) < 0)
? ? ? ? ? ? doAcquireSharedInterruptibly(arg);
? ? }
protected final boolean tryReleaseShared(int releases) {
? ? ? ? ? ? for (;;) {
? ? ? ? ? ? ? ? int current = getState();
? ? ? ? ? ? ? ? int next = current + releases;
? ? ? ? ? ? ? ? if (next < current) // overflow
? ? ? ? ? ? ? ? ? ? throw new Error("Maximum permit count exceeded");
? ? ? ? ? ? ? ? if (compareAndSetState(current, next))
? ? ? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? }
? ? ? ? }
上述是非公平模式下信號量的釋放,同樣是操作state資源,不說了,自己看吧
公平鎖的模式就加了一個hasQueuedPredecessors判斷。。。。前文已經(jīng)解釋過了。。。。。
ConditionObject類:
Condition接口的唯一實現(xiàn)類,該類用于生產(chǎn)者消費者模式,與Object.wait()/notify()類似但是比其更加強大,支持await()一段時間,還支持多個等待隊列。conditionObject最重要的兩個方法當然是構(gòu)造方法、await()和signal()幾個方法了,下面分析其源碼:
public Condition newCondition() {
? ? ? ? return sync.newCondition();
}
上述代碼是ConditionObject的構(gòu)造方法,該Condition只能從lock中獲取,所以線程獲取Condition有兩個時機,一種是調(diào)用lock()方法前,一種是調(diào)用lock()方法后。我們看下await()方法:
public final void await() throws InterruptedException {
? ? ? ? ? ? if (Thread.interrupted())
? ? ? ? ? ? ? ? throw new InterruptedException();
? ? ? ? ? ? Node node = addConditionWaiter();? //加入到當前Condition的等待隊列
? ? ? ? ? ? int savedState = fullyRelease(node);? //完全釋放自己占有的state資源
? ? ? ? ? ? int interruptMode = 0;
? ? ? ? ? ? while (!isOnSyncQueue(node)) {? ? ? ? //如果當前線程節(jié)點不在同步隊列里就一直掛起
? ? ? ? ? ? ? ? LockSupport.park(this);
? ? ? ? ? ? ? ? if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? }
? ? ? ? ? ? if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
? ? ? ? ? ? ? ? interruptMode = REINTERRUPT;
? ? ? ? ? ? if (node.nextWaiter != null) // clean up if cancelled
? ? ? ? ? ? ? ? unlinkCancelledWaiters();
? ? ? ? ? ? if (interruptMode != 0)
? ? ? ? ? ? ? ? reportInterruptAfterWait(interruptMode);
? ? }
我們看上面代碼,關(guān)鍵部分已經(jīng)做了注釋,關(guān)鍵點有三,其一,Condition等待隊列是啥?其二,從哪獲取資源了需要釋放?其三,為什么判斷當前線程是否在同步隊列里?我們先看addCondtionWaiter方法:
private Node addConditionWaiter() {
? ? ? ? ? ? Node t = lastWaiter;? ? ? ? ? ? ?
????????????//這里的隊列與同步CLH隊列不是同一個隊列,通過nextWaiter而不是next指針串起來的,每個condtion都有自己的隊列
? ? ? ? ? ? if (t != null && t.waitStatus != Node.CONDITION) { //如果最后一個節(jié)點被取消了就清除
? ? ? ? ? ? ? ? unlinkCancelledWaiters();
? ? ? ? ? ? ? ? t = lastWaiter;
? ? ? ? ? ? }
? ? ? ? ? ? Node node = new Node(Thread.currentThread(), Node.CONDITION); //加入對了之前新建的狀態(tài)是Condition而不是0
? ? ? ? ? ? if (t == null)
? ? ? ? ? ? ? ? firstWaiter = node;
? ? ? ? ? ? else
? ? ? ? ? ? ? ? t.nextWaiter = node;? ? ? ? ?//這里體現(xiàn)出來不是CLH同步隊列,因為調(diào)用的是nextWaiter=node
? ? ? ? ? ? lastWaiter = node;
? ? ? ? ? ? return node;
? ? ? ? }
上述代碼,每一個Condition都有一個自己的等待隊列,使用nextWaiter指針而不是CLH隊列使用的next指針維護著隊列。如果不調(diào)用lock()的條件下直接多線程調(diào)用Condition.await方法顯而易見會出現(xiàn)并發(fā)問題,所以一般await()方法都在lock.lock()鎖內(nèi)進行。因為在lock()鎖內(nèi)進行,所以肯定存在一個CLH隊列維護著多線程的節(jié)點,而且當前線程一定競爭到了資源才會執(zhí)行await方法,現(xiàn)在要釋放鎖讓其他線程進來,所以調(diào)用:
final int fullyRelease(Node node) {
? ? ? ? boolean failed = true;
? ? ? ? try {
? ? ? ? ? ? int savedState = getState();? ? ? //獲取當前線程占有的資源數(shù)目
? ? ? ? ? ? if (release(savedState)) {? ? ? ? ? ?//釋放當前占有的資源,喚醒后繼節(jié)點線程,返回釋放資源的數(shù)量
? ? ? ? ? ? ? ? failed = false;
? ? ? ? ? ? ? ? return savedState;
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? throw new IllegalMonitorStateException();
? ? ? ? ? ? }
? ? ? ? } finally {
? ? ? ? ? ? if (failed)
? ? ? ? ? ? ? ? node.waitStatus = Node.CANCELLED;
? ? ? ? }
? ? }
上述代碼表示當前線程在真正等待之前首先釋放了資源并喚醒了CLH隊列中的后繼線程。釋放了資源之后現(xiàn)在該考慮掛起了。
final boolean isOnSyncQueue(Node node) {? ? //while循環(huán)執(zhí)行該代碼
? ? ? ? if (node.waitStatus == Node.CONDITION || node.prev == null)? //如果當前線程節(jié)點狀態(tài)是Condition或者不在CLH隊列中返回false
? ? ? ? ? ? return false;
? ? ? ? if (node.next != null)? ? ? ? ? ? ? ? ? ? ? //如果當前線程節(jié)點有后繼節(jié)點證明在CLH隊列中,返回True
? ? ? ? ? ? return true;
? ? ? ? return findNodeFromTail(node);? ?//該方法同樣是判斷是否在同步隊列中
? ? }
private boolean findNodeFromTail(Node node) {
????Node t = tail;
????for (;;) {
????????if (t == node)
????????????return true;
????????if (t == null)
????????????return false;
????????????t = t.prev;
????}
}
當前線程不在CLH隊列中就會把自己掛起,我們看下signal方法來解釋下為什么不在CLH隊列中就執(zhí)行掛起
public final void signal() {
? ? ? ? ? ? if (!isHeldExclusively())
? ? ? ? ? ? ? ? throw new IllegalMonitorStateException();
? ? ? ? ? ? Node first = firstWaiter;
? ? ? ? ? ? if (first != null)? ? ? ? ? ? ? ? //如果Condition的等待隊列里面有等待線程就執(zhí)行通知,否則啥也不做。
? ? ? ? ? ? ? ? doSignal(first);
? ? ? ? }
private void doSignal(Node first) {
? ? ? ? ? ? do {? ? ? ? ? ? //do while循環(huán),關(guān)鍵方法在transferForSignal方法
? ? ? ? ? ? ? ? if ( (firstWaiter = first.nextWaiter) == null)
? ? ? ? ? ? ? ? ? ? lastWaiter = null;
? ? ? ? ? ? ? ? first.nextWaiter = null;
? ? ? ? ? ? } while (!transferForSignal(first) &&(first = firstWaiter) != null);
? ? ? ? }
如果Condition等待隊列的頭結(jié)點沒有把狀態(tài)變換成SIGNAL就一直執(zhí)行do循環(huán)清空等待隊列,我們看下這個方法做了什么?
final boolean transferForSignal(Node node) {
? ? ? ? if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))? ??
? ? ? ? ? ? return false;
? ? ? ? //把當前節(jié)點放入CLH隊列,這也解釋了之前為什么不在CLH隊列就一直掛起!
? ? ? ? Node p = enq(node);
? ? ? ? int ws = p.waitStatus;
? ? ? ? if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //如果當前節(jié)點被取消或者設置SIGNAL狀態(tài)失敗那么喚醒該線程。
? ? ? ? ? ? LockSupport.unpark(node.thread);
? ? ? ? return true;
? ? }
上面就是喚醒Condition線程的關(guān)鍵代碼,你可能會有疑問,從哪喚醒了等待隊列中的線程呢?哈哈,關(guān)鍵就在于enq(node)就是把當前線程加入到了CLH隊列中并把等待節(jié)點的狀態(tài)設置成了SIGNAL,還記得之前CLH隊列中的設置SIGNAL嗎?(告訴前面線程如果排到號了通知我),沒錯,實際情況就是signal方法調(diào)用之后當前線程需要重新進入到CLH隊列競爭鎖,而且是排在隊尾哦。。。
總結(jié):
最高層里面大多數(shù)的類都依賴AQS框架的state和acquire/release方法。這就是AQS框架的精妙所在,模板方法模式,一勞永逸。
本文很長,后續(xù)還會對阻塞隊列、并發(fā)容器、執(zhí)行器做源碼分析。