AbstractQueuedSynchronizer(AQS)深入剖析

上一篇文章中我們對(duì)LockAbstractQueuedSynchronizer(AQS)有了初步的認(rèn)識(shí)。在同步組件的實(shí)現(xiàn)中,AQS是核心部分,同步組件的實(shí)現(xiàn)者通過使用AQS提供的模板方法實(shí)現(xiàn)同步組件語義,AQS則實(shí)現(xiàn)了對(duì)同步狀態(tài)的管理,以及對(duì)阻塞線程進(jìn)行排隊(duì),等待通知等等一些底層的實(shí)現(xiàn)處理。AQS的核心也包括了這些方面:同步隊(duì)列,獨(dú)占式鎖的獲取和釋放,共享鎖的獲取和釋放以及可中斷鎖,超時(shí)等待鎖獲取這些特性的實(shí)現(xiàn),而這些實(shí)際上則是AQS提供出來的模板方法,歸納整理如下:

獨(dú)占式鎖:

void acquire(int arg):獨(dú)占式獲取同步狀態(tài),如果獲取失敗則插入同步隊(duì)列進(jìn)行等待;
void acquireInterruptibly(int arg):acquire方法相同,但在同步隊(duì)列中進(jìn)行等待的時(shí)候可以檢測中斷;
boolean tryAcquireNanos(int arg, long nanosTimeout):acquireInterruptibly基礎(chǔ)上增加了超時(shí)等待功能,在超時(shí)時(shí)間內(nèi)沒有獲得同步狀態(tài)返回false;
boolean release(int arg):釋放同步狀態(tài),該方法會(huì)喚醒在同步隊(duì)列中的下一個(gè)節(jié)點(diǎn)

共享式鎖:

void acquireShared(int arg):共享式獲取同步狀態(tài),與獨(dú)占式的區(qū)別在于同一時(shí)刻有多個(gè)線程獲取同步狀態(tài);
void acquireSharedInterruptibly(int arg):在acquireShared方法基礎(chǔ)上增加了能響應(yīng)中斷的功能;
boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly基礎(chǔ)上增加了超時(shí)等待的功能;
boolean releaseShared(int arg):共享式釋放同步狀態(tài)

要想掌握AQS的底層實(shí)現(xiàn),其實(shí)也就是對(duì)這些模板方法的邏輯進(jìn)行學(xué)習(xí)。在學(xué)習(xí)這些模板方法之前,我們得首先了解下AQS中的同步隊(duì)列是一種什么樣的數(shù)據(jù)結(jié)構(gòu),因?yàn)橥疥?duì)列是AQS對(duì)同步狀態(tài)的管理的基石。

2. 同步隊(duì)列

AQS提供了一個(gè)基于FIFO隊(duì)列,可以用于構(gòu)建鎖或者其他相關(guān)同步裝置的基礎(chǔ)框架。該同步器(以下簡稱同步器)利用了一個(gè)int來表示狀態(tài),期望它能夠成為實(shí)現(xiàn)大部分同步需求的基礎(chǔ)。使用的方法是繼承,子類通過繼承同步器并需要實(shí)現(xiàn)它的方法來管理其狀態(tài),管理的方式就是通過類似acquire和release的方式來操縱狀態(tài)。然而多線程環(huán)境中對(duì)狀態(tài)的操縱必須確保原子性,因此子類對(duì)于狀態(tài)的把握,需要使用這個(gè)同步器提供的以下三個(gè)方法對(duì)狀態(tài)進(jìn)行操作:

  • java.util.concurrent.locks.AbstractQueuedSynchronizer.getState()
  • java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int)
  • java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int)

子類推薦被定義為自定義同步裝置的內(nèi)部類,同步器自身沒有實(shí)現(xiàn)任何同步接口,它僅僅是定義了若干acquire之類的方法來供使用。該同步器即可以作為排他模式也可以作為共享模式,當(dāng)它被定義為一個(gè)排他模式時(shí),其他線程對(duì)其的獲取就被阻止,而共享模式對(duì)于多個(gè)線程獲取都可以成功。

同步器是實(shí)現(xiàn)鎖的關(guān)鍵,利用同步器將鎖的語義實(shí)現(xiàn),然后在鎖的實(shí)現(xiàn)中聚合同步器??梢赃@樣理解:鎖的API是面向使用者的,它定義了與鎖交互的公共行為,而每個(gè)鎖需要完成特定的操作也是透過這些行為來完成的(比如:可以允許兩個(gè)線程進(jìn)行加鎖,排除兩個(gè)以上的線程),但是實(shí)現(xiàn)是依托給同步器來完成;同步器面向的是線程訪問和資源控制,它定義了線程對(duì)資源是否能夠獲取以及線程的排隊(duì)等操作。鎖和同步器很好的隔離了二者所需要關(guān)注的領(lǐng)域,嚴(yán)格意義上講,同步器可以適用于除了鎖以外的其他同步設(shè)施上(包括鎖)。
同步器的開始提到了其實(shí)現(xiàn)依賴于一個(gè)FIFO隊(duì)列,那么隊(duì)列中的元素Node就是保存著線程引用和線程狀態(tài)的容器,每個(gè)線程對(duì)同步器的訪問,都可以看做是隊(duì)列中的一個(gè)節(jié)點(diǎn)。Node的主要包含以下成員變量:

Node {
    int waitStatus;
    Node prev;
    Node next;
    Node nextWaiter;
    Thread thread;
}

以上五個(gè)成員變量主要負(fù)責(zé)保存該節(jié)點(diǎn)的線程引用,同步等待隊(duì)列(以下簡稱sync隊(duì)列)的前驅(qū)和后繼節(jié)點(diǎn),同時(shí)也包括了同步狀態(tài)。

屬性名稱 描述
int waitStatus 表示節(jié)點(diǎn)的狀態(tài)。其中包含的狀態(tài)有:1. CANCELLED,值為1,表示當(dāng)前的線程被取消; 2. SIGNAL,值為-1,表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)包含的線程需要運(yùn)行,也就是unpark; 3. CONDITION,值為-2,表示當(dāng)前節(jié)點(diǎn)在等待condition,也就是在condition隊(duì)列中; 4. PROPAGATE,值為-3,表示當(dāng)前場景下后續(xù)的acquireShared能夠得以執(zhí)行; 5. 值為0,表示當(dāng)前節(jié)點(diǎn)在sync隊(duì)列中,等待著獲取鎖。
Node prev 前驅(qū)節(jié)點(diǎn),比如當(dāng)前節(jié)點(diǎn)被取消,那就需要前驅(qū)節(jié)點(diǎn)和后繼節(jié)點(diǎn)來完成連接。
Node next 后繼節(jié)點(diǎn)。
Node nextWaiter 存儲(chǔ)condition隊(duì)列中的后繼節(jié)點(diǎn)。
Thread thread 入隊(duì)列時(shí)的當(dāng)前線程。

節(jié)點(diǎn)成為sync隊(duì)列和condition隊(duì)列構(gòu)建的基礎(chǔ),在同步器中就包含了sync隊(duì)列。同步器擁有三個(gè)成員變量:sync隊(duì)列的頭結(jié)點(diǎn)head、sync隊(duì)列的尾節(jié)點(diǎn)tail和狀態(tài)state。對(duì)于鎖的獲取,請(qǐng)求形成節(jié)點(diǎn),將其掛載在尾部,而鎖資源的轉(zhuǎn)移(釋放再獲?。┦菑念^部開始向后進(jìn)行。對(duì)于同步器維護(hù)的狀態(tài)state,多個(gè)線程對(duì)其的獲取將會(huì)產(chǎn)生一個(gè)鏈?zhǔn)降慕Y(jié)構(gòu)。


image

現(xiàn)在我們知道了節(jié)點(diǎn)的數(shù)據(jù)結(jié)構(gòu)類型,并且每個(gè)節(jié)點(diǎn)擁有其前驅(qū)和后繼節(jié)點(diǎn),很顯然這是一個(gè)雙向隊(duì)列。同樣的我們可以用一段demo看一下。

public class LockDemo {
    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(() -> {
                lock.lock();
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            });
            thread.start();
        }
    }
}

實(shí)例代碼中開啟了5個(gè)線程,先獲取鎖之后再睡眠10S中,實(shí)際上這里讓線程睡眠是想模擬出當(dāng)線程無法獲取鎖時(shí)進(jìn)入同步隊(duì)列的情況。通過debug,當(dāng)Thread-4(在本例中最后一個(gè)線程)獲取鎖失敗后進(jìn)入同步時(shí),AQS時(shí)現(xiàn)在的同步隊(duì)列如圖所示:

LockDemo debug png

Thread-0先獲得鎖后進(jìn)行睡眠,其他線程(Thread-1,Thread-2,Thread-3,Thread-4)獲取鎖失敗進(jìn)入同步隊(duì)列,同時(shí)也可以很清楚的看出來每個(gè)節(jié)點(diǎn)有兩個(gè)域:prev(前驅(qū))和next(后繼),并且每個(gè)節(jié)點(diǎn)用來保存獲取同步狀態(tài)失敗的線程引用以及等待狀態(tài)等信息。另外AQS中有兩個(gè)重要的成員變量:

private transient volatile Node head;
private transient volatile Node tail;

也就是說AQS實(shí)際上通過頭尾指針來管理同步隊(duì)列,同時(shí)實(shí)現(xiàn)包括獲取鎖失敗的線程進(jìn)行入隊(duì),釋放鎖時(shí)對(duì)同步隊(duì)列中的線程進(jìn)行通知等核心方法。其示意圖如下:

png

通過對(duì)源碼的理解以及做實(shí)驗(yàn)的方式,現(xiàn)在我們可以清楚的知道這樣幾點(diǎn):

  1. 節(jié)點(diǎn)的數(shù)據(jù)結(jié)構(gòu),即AQS的靜態(tài)內(nèi)部類Node,節(jié)點(diǎn)的等待狀態(tài)等信息;
  2. 同步隊(duì)列是一個(gè)雙向隊(duì)列,AQS通過持有頭尾指針管理同步隊(duì)列;

那么,節(jié)點(diǎn)如何進(jìn)行入隊(duì)和出隊(duì)是怎樣做的了?實(shí)際上這對(duì)應(yīng)著鎖的獲取和釋放兩個(gè)操作:獲取鎖失敗進(jìn)行入隊(duì)操作,獲取鎖成功進(jìn)行出隊(duì)操作。

3. 獨(dú)占鎖

3.1 獨(dú)占鎖的獲取(acquire方法)

我們繼續(xù)通過看源碼和debug的方式來看,還是以上面的demo為例,調(diào)用lock()方法是獲取獨(dú)占式鎖,獲取失敗就將當(dāng)前線程加入同步隊(duì)列,成功則線程執(zhí)行。而lock()方法實(shí)際上會(huì)調(diào)用AQSacquire()方法,源碼如下

public final void acquire(int arg) {
        //先看同步狀態(tài)是否獲取成功,如果成功則方法結(jié)束返回
        //若失敗則先調(diào)用addWaiter()方法再調(diào)用acquireQueued()方法
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

上述邏輯主要包括:

  1. 嘗試獲取(調(diào)用tryAcquire更改狀態(tài),需要保證原子性);
    在tryAcquire方法中使用了同步器提供的對(duì)state操作的方法,利用compareAndSet保證只有一個(gè)線程能夠?qū)顟B(tài)進(jìn)行成功修改,而沒有成功修改的線程將進(jìn)入sync隊(duì)列排隊(duì)。
  2. 如果獲取不到,將當(dāng)前線程構(gòu)造成節(jié)點(diǎn)Node并加入sync隊(duì)列;
    進(jìn)入隊(duì)列的每個(gè)線程都是一個(gè)節(jié)點(diǎn)Node,從而形成了一個(gè)雙向隊(duì)列,類似CLH隊(duì)列,這樣做的目的是線程間的通信會(huì)被限制在較小規(guī)模(也就是兩個(gè)節(jié)點(diǎn)左右)。
  3. 再次嘗試獲取,如果沒有獲取到那么將當(dāng)前線程從線程調(diào)度器上摘下,進(jìn)入等待狀態(tài)。

總結(jié): 使用LockSupport將當(dāng)前線程unpark,關(guān)于LockSupport后續(xù)會(huì)詳細(xì)介紹。關(guān)鍵信息請(qǐng)看注釋,acquire根據(jù)當(dāng)前獲得同步狀態(tài)成功與否做了兩件事情:1. 成功,則方法結(jié)束返回,2. 失敗,則先調(diào)用addWaiter()然后在調(diào)用acquireQueued()方法。

獲取同步狀態(tài)失敗,入隊(duì)操作

當(dāng)線程獲取獨(dú)占式鎖失敗后就會(huì)將當(dāng)前線程加入同步隊(duì)列,那么加入隊(duì)列的方式是怎樣的了?我們接下來就應(yīng)該去研究一下addWaiter()acquireQueued()addWaiter()源碼如下:

private Node addWaiter(Node mode) {
        // 1\. 將當(dāng)前線程構(gòu)建成Node類型
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 2\. 當(dāng)前尾節(jié)點(diǎn)是否為null?
        Node pred = tail;
        if (pred != null) {
            // 2.2 將當(dāng)前節(jié)點(diǎn)尾插入的方式插入同步隊(duì)列中
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 2.1\. 當(dāng)前同步隊(duì)列尾節(jié)點(diǎn)為null,說明當(dāng)前線程是第一個(gè)加入同步隊(duì)列進(jìn)行等待的線程
        enq(node);
        return node;
}

上述邏輯主要包括:

  1. 使用當(dāng)前線程構(gòu)造Node;
    對(duì)于一個(gè)節(jié)點(diǎn)需要做的是將當(dāng)節(jié)點(diǎn)前驅(qū)節(jié)點(diǎn)指向尾節(jié)點(diǎn)(current.prev = tail),尾節(jié)點(diǎn)指向它(tail = current),原有的尾節(jié)點(diǎn)的后繼節(jié)點(diǎn)指向它(t.next = current)而這些操作要求是原子的。上面的操作是利用尾節(jié)點(diǎn)的設(shè)置來保證的,也就是compareAndSetTail來完成的。
  2. 先行嘗試在隊(duì)尾添加;
    如果尾節(jié)點(diǎn)已經(jīng)有了,然后做如下操作:
    (1)分配引用T指向尾節(jié)點(diǎn);
    (2)將節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)更新為尾節(jié)點(diǎn)(current.prev = tail);
    (3)如果尾節(jié)點(diǎn)是T,那么將當(dāng)尾節(jié)點(diǎn)設(shè)置為該節(jié)點(diǎn)(tail = current,原子更新);
    (4)T的后繼節(jié)點(diǎn)指向當(dāng)前節(jié)點(diǎn)(T.next = current)。
    注意第3點(diǎn)是要求原子的。
    這樣可以以最短路徑O(1)的效果來完成線程入隊(duì),是最大化減少開銷的一種方式。
  3. 如果隊(duì)尾添加失敗或者是第一個(gè)入隊(duì)的節(jié)點(diǎn)。
    如果是第1個(gè)節(jié)點(diǎn),也就是sync隊(duì)列沒有初始化,那么會(huì)進(jìn)入到enq這個(gè)方法,進(jìn)入的線程可能有多個(gè),或者說在addWaiter中沒有成功入隊(duì)的線程都將進(jìn)入enq這個(gè)方法。
    可以看到enq的邏輯是確保進(jìn)入的Node都會(huì)有機(jī)會(huì)順序的添加到sync隊(duì)列中,而加入的步驟如下:
    (1)如果尾節(jié)點(diǎn)為空,那么原子化的分配一個(gè)頭節(jié)點(diǎn),并將尾節(jié)點(diǎn)指向頭節(jié)點(diǎn),這一步是初始化;
    (2)然后是重復(fù)在addWaiter中做的工作,但是在一個(gè)while(true)的循環(huán)中,直到當(dāng)前節(jié)點(diǎn)入隊(duì)為止。
    進(jìn)入sync隊(duì)列之后,接下來就是要進(jìn)行鎖的獲取,或者說是訪問控制了,只有一個(gè)線程能夠在同一時(shí)刻繼續(xù)的運(yùn)行,而其他的進(jìn)入等待狀態(tài)。而每個(gè)線程都是一個(gè)獨(dú)立的個(gè)體,它們自省的觀察,當(dāng)條件滿足的時(shí)候(自己的前驅(qū)是頭結(jié)點(diǎn)并且原子性的獲取了狀態(tài)),那么這個(gè)線程能夠繼續(xù)運(yùn)行。
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //1\. 構(gòu)造頭結(jié)點(diǎn)
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 2\. 尾插入,CAS操作失敗自旋嘗試
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
}

在上面的分析中我們可以看出在第1步中會(huì)先創(chuàng)建頭結(jié)點(diǎn),說明同步隊(duì)列是帶頭結(jié)點(diǎn)的鏈?zhǔn)酱鎯?chǔ)結(jié)構(gòu)。帶頭結(jié)點(diǎn)與不帶頭結(jié)點(diǎn)相比,會(huì)在入隊(duì)和出隊(duì)的操作中獲得更大的便捷性,因此同步隊(duì)列選擇了帶頭結(jié)點(diǎn)的鏈?zhǔn)酱鎯?chǔ)結(jié)構(gòu)。那么帶頭節(jié)點(diǎn)的隊(duì)列初始化時(shí)機(jī)是什么?自然而然是在tail為null時(shí),即當(dāng)前線程是第一次插入同步隊(duì)列。compareAndSetTail(t, node)方法會(huì)利用CAS操作設(shè)置尾節(jié)點(diǎn),如果CAS操作失敗會(huì)在for (;;)for死循環(huán)中不斷嘗試,直至成功return返回為止。因此,對(duì)enq()方法可以做這樣的總結(jié):

  1. 在當(dāng)前線程是第一個(gè)加入同步隊(duì)列時(shí),調(diào)用compareAndSetHead(new Node())方法,完成鏈?zhǔn)疥?duì)列的頭結(jié)點(diǎn)的初始化
  2. 自旋不斷嘗試CAS尾插入節(jié)點(diǎn)直至成功為止。

現(xiàn)在我們已經(jīng)很清楚獲取獨(dú)占式鎖失敗的線程包裝成Node然后插入同步隊(duì)列的過程了?那么緊接著會(huì)有下一個(gè)問題?在同步隊(duì)列中的節(jié)點(diǎn)(線程)會(huì)做什么事情了來保證自己能夠有機(jī)會(huì)獲得獨(dú)占式鎖了?帶著這樣的問題我們就來看看acquireQueued()方法,從方法名就可以很清楚,這個(gè)方法的作用就是排隊(duì)獲取鎖的過程,源碼如下:

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 1\. 獲得當(dāng)前節(jié)點(diǎn)的先驅(qū)節(jié)點(diǎn)
                final Node p = node.predecessor();
                // 2\. 當(dāng)前節(jié)點(diǎn)能否獲取獨(dú)占式鎖                 
                // 2.1 如果當(dāng)前節(jié)點(diǎn)的先驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且成功獲取同步狀態(tài),即可以獲得獨(dú)占式鎖
                if (p == head && tryAcquire(arg)) {
                    //隊(duì)列頭指針用指向當(dāng)前節(jié)點(diǎn)
                    setHead(node);
                    //釋放前驅(qū)節(jié)點(diǎn)
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 2.2 獲取鎖失敗,線程進(jìn)入等待狀態(tài)等待獲取獨(dú)占式鎖
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

上述邏輯主要包括:

  1. 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn);
    需要獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),而頭結(jié)點(diǎn)所對(duì)應(yīng)的含義是當(dāng)前站有鎖且正在運(yùn)行。
  2. 當(dāng)前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且能夠獲取狀態(tài),代表該當(dāng)前節(jié)點(diǎn)占有鎖;
    如果滿足上述條件,那么代表能夠占有鎖,根據(jù)節(jié)點(diǎn)對(duì)鎖占有的含義,設(shè)置頭結(jié)點(diǎn)為當(dāng)前節(jié)點(diǎn)。
  3. 否則進(jìn)入等待狀態(tài)。
    如果沒有輪到當(dāng)前節(jié)點(diǎn)運(yùn)行,那么將當(dāng)前線程從線程調(diào)度器上摘下,也就是進(jìn)入等待狀態(tài)。

這里針對(duì)acquire做一下總結(jié):

  1. 狀態(tài)的維護(hù);
    需要在鎖定時(shí),需要維護(hù)一個(gè)狀態(tài)(int類型),而對(duì)狀態(tài)的操作是原子和非阻塞的,通過同步器提供的對(duì)狀態(tài)訪問的方法對(duì)狀態(tài)進(jìn)行操縱,并且利用compareAndSet來確保原子性的修改。
  2. 狀態(tài)的獲??;
    一旦成功的修改了狀態(tài),當(dāng)前線程或者說節(jié)點(diǎn),就被設(shè)置為頭節(jié)點(diǎn)。
  3. sync隊(duì)列的維護(hù)。
    在獲取資源未果的過程中條件不符合的情況下(不該自己,前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn)或者沒有獲取到資源)進(jìn)入睡眠狀態(tài),停止線程調(diào)度器對(duì)當(dāng)前節(jié)點(diǎn)線程的調(diào)度。
    這時(shí)引入的一個(gè)釋放的問題,也就是說使睡眠中的Node或者說線程獲得通知的關(guān)鍵,就是前驅(qū)節(jié)點(diǎn)的通知,而這一個(gè)過程就是釋放,釋放會(huì)通知它的后繼節(jié)點(diǎn)從睡眠中返回準(zhǔn)備運(yùn)行。整體示意圖為下圖:
png

獲取鎖成功,出隊(duì)操作

獲取鎖的節(jié)點(diǎn)出隊(duì)的邏輯是:

//隊(duì)列頭結(jié)點(diǎn)引用指向當(dāng)前節(jié)點(diǎn)
setHead(node);
//釋放前驅(qū)節(jié)點(diǎn)
p.next = null; // help GC
failed = false;
return interrupted;

setHead()方法為:

private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
}

將當(dāng)前節(jié)點(diǎn)通過setHead()方法設(shè)置為隊(duì)列的頭結(jié)點(diǎn),然后將之前的頭結(jié)點(diǎn)的next域設(shè)置為null并且pre域也為null,即與隊(duì)列斷開,無任何引用方便GC時(shí)能夠?qū)?nèi)存進(jìn)行回收。示意圖如下:

png

那么當(dāng)獲取鎖失敗的時(shí)候會(huì)調(diào)用shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法,看看他們做了什么事情。shouldParkAfterFailedAcquire()方法源碼為:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

shouldParkAfterFailedAcquire()方法主要邏輯是使用compareAndSetWaitStatus(pred, ws, Node.SIGNAL)使用CAS將節(jié)點(diǎn)狀態(tài)由INITIAL設(shè)置成SIGNAL,表示當(dāng)前線程阻塞。當(dāng)compareAndSetWaitStatus設(shè)置失敗則說明shouldParkAfterFailedAcquire方法返回false,然后會(huì)在acquireQueued()方法中for (;;)死循環(huán)中會(huì)繼續(xù)重試,直至compareAndSetWaitStatus設(shè)置節(jié)點(diǎn)狀態(tài)位為SIGNAL時(shí)shouldParkAfterFailedAcquire返回true時(shí)才會(huì)執(zhí)行方法parkAndCheckInterrupt()方法,該方法的源碼為:

private final boolean parkAndCheckInterrupt() {
        //使得該線程阻塞
        LockSupport.park(this);
        return Thread.interrupted();
}

該方法的關(guān)鍵是會(huì)調(diào)用LookSupport.park()方法(關(guān)于LookSupport會(huì)在以后的文章進(jìn)行討論),該方法是用來阻塞當(dāng)前線程的。因此到這里就應(yīng)該清楚了,acquireQueued()在自旋過程中主要完成了兩件事情:

  1. 如果當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),并且能夠獲得同步狀態(tài)的話,當(dāng)前線程能夠獲得鎖該方法執(zhí)行結(jié)束退出
  2. 獲取鎖失敗的話,先將節(jié)點(diǎn)狀態(tài)設(shè)置成SIGNAL,然后調(diào)用LookSupport.park方法使得當(dāng)前線程阻塞

經(jīng)過上面的分析,獨(dú)占式鎖的獲取過程也就是acquire()方法的執(zhí)行流程如下圖所示:

acquirepng

3.2 獨(dú)占鎖的釋放(release()方法)

獨(dú)占鎖的釋放就相對(duì)來說比較容易理解了,廢話不多說先來看下源碼:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

這段代碼邏輯就比較容易理解了,如果同步狀態(tài)釋放成功(tryRelease返回true)則會(huì)執(zhí)行if塊中的代碼,當(dāng)head指向的頭結(jié)點(diǎn)不為null,并且該節(jié)點(diǎn)的狀態(tài)值不為0的話才會(huì)執(zhí)行unparkSuccessor()方法。unparkSuccessor方法源碼:

private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */

    //頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //后繼節(jié)點(diǎn)不為null時(shí)喚醒該線程
        LockSupport.unpark(s.thread);
}

源碼的關(guān)鍵信息請(qǐng)看注釋,首先獲取頭節(jié)點(diǎn)的后繼節(jié)點(diǎn),當(dāng)后繼節(jié)點(diǎn)的時(shí)候會(huì)調(diào)用LookSupport.unpark()方法,該方法會(huì)喚醒該節(jié)點(diǎn)的后繼節(jié)點(diǎn)所包裝的線程。因此,每一次鎖釋放后就會(huì)喚醒隊(duì)列中該節(jié)點(diǎn)的后繼節(jié)點(diǎn)所引用的線程,從而進(jìn)一步可以佐證獲得鎖的過程是一個(gè)FIFO(先進(jìn)先出)的過程。

到現(xiàn)在我們終于啃下了一塊硬骨頭了,通過學(xué)習(xí)源碼的方式非常深刻的學(xué)習(xí)到了獨(dú)占式鎖的獲取和釋放的過程以及同步隊(duì)列??梢宰鲆幌驴偨Y(jié):

  1. 線程獲取鎖失敗,線程被封裝成Node進(jìn)行入隊(duì)操作,核心方法在于addWaiter()和enq(),同時(shí)enq()完成對(duì)同步隊(duì)列的頭結(jié)點(diǎn)初始化工作以及CAS操作失敗的重試;
  2. 線程獲取鎖是一個(gè)自旋的過程,當(dāng)且僅當(dāng) 當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且成功獲得同步狀態(tài)時(shí),節(jié)點(diǎn)出隊(duì)即該節(jié)點(diǎn)引用的線程獲得鎖,否則,當(dāng)不滿足條件時(shí)就會(huì)調(diào)用LookSupport.park()方法使得線程阻塞;
  3. 釋放鎖的時(shí)候會(huì)喚醒后繼節(jié)點(diǎn);

總體來說:在獲取同步狀態(tài)時(shí),AQS維護(hù)一個(gè)同步隊(duì)列,獲取同步狀態(tài)失敗的線程會(huì)加入到隊(duì)列中進(jìn)行自旋;移除隊(duì)列(或停止自旋)的條件是前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)并且成功獲得了同步狀態(tài)。在釋放同步狀態(tài)時(shí),同步器會(huì)調(diào)用unparkSuccessor()方法喚醒后繼節(jié)點(diǎn)。

獨(dú)占鎖特性學(xué)習(xí)

3.3 可中斷式獲取鎖(acquireInterruptibly方法)

我們知道lock相較于synchronized有一些更方便的特性,比如能響應(yīng)中斷以及超時(shí)等待等特性,現(xiàn)在我們依舊采用通過學(xué)習(xí)源碼的方式來看看能夠響應(yīng)中斷是怎么實(shí)現(xiàn)的??身憫?yīng)中斷式鎖可調(diào)用方法lock.lockInterruptibly();而該方法其底層會(huì)調(diào)用AQSacquireInterruptibly方法,源碼為:

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        //線程獲取鎖失敗
        doAcquireInterruptibly(arg);
}

在獲取同步狀態(tài)失敗后就會(huì)調(diào)用doAcquireInterruptibly方法:

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    //將節(jié)點(diǎn)插入到同步隊(duì)列中
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //獲取鎖出隊(duì)
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //線程中斷拋異常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

關(guān)鍵信息請(qǐng)看注釋,現(xiàn)在看這段代碼就很輕松了吧:),與acquire方法邏輯幾乎一致,唯一的區(qū)別是當(dāng)parkAndCheckInterrupt返回true時(shí)即線程阻塞時(shí)該線程被中斷,代碼拋出被中斷異常。

3.4 超時(shí)等待式獲取鎖(tryAcquireNanos()方法)

通過調(diào)用lock.tryLock(timeout,TimeUnit)方式達(dá)到超時(shí)等待獲取鎖的效果,該方法會(huì)在三種情況下才會(huì)返回:

  1. 在超時(shí)時(shí)間內(nèi),當(dāng)前線程成功獲取了鎖;
  2. 當(dāng)前線程在超時(shí)時(shí)間內(nèi)被中斷;
  3. 超時(shí)時(shí)間結(jié)束,仍未獲得鎖返回false。

我們?nèi)匀煌ㄟ^采取閱讀源碼的方式來學(xué)習(xí)底層具體是怎么實(shí)現(xiàn)的,該方法會(huì)調(diào)用AQS的方法tryAcquireNanos(),源碼為:

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        //實(shí)現(xiàn)超時(shí)等待的效果
        doAcquireNanos(arg, nanosTimeout);
}

很顯然這段源碼最終是靠doAcquireNanos方法實(shí)現(xiàn)超時(shí)等待的效果,該方法源碼如下:

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    //1\. 根據(jù)超時(shí)時(shí)間和當(dāng)前時(shí)間計(jì)算出截止時(shí)間
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //2\. 當(dāng)前線程獲得鎖出隊(duì)列
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 3.1 重新計(jì)算超時(shí)時(shí)間
            nanosTimeout = deadline - System.nanoTime();
            // 3.2 已經(jīng)超時(shí)返回false
            if (nanosTimeout <= 0L)
                return false;
            // 3.3 線程阻塞等待 
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // 3.4 線程被中斷拋出被中斷異常
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

程序邏輯如圖所示:

doAcquireNanos

程序邏輯同獨(dú)占鎖可響應(yīng)中斷式獲取基本一致,唯一的不同在于獲取鎖失敗后,對(duì)超時(shí)時(shí)間的處理上,在第1步會(huì)先計(jì)算出按照現(xiàn)在時(shí)間和超時(shí)時(shí)間計(jì)算出理論上的截止時(shí)間,比如當(dāng)前時(shí)間是8h10min,超時(shí)時(shí)間是10min,那么根據(jù)deadline = System.nanoTime() + nanosTimeout計(jì)算出剛好達(dá)到超時(shí)時(shí)間時(shí)的系統(tǒng)時(shí)間就是8h 10min+10min = 8h 20min。然后根據(jù)deadline - System.nanoTime()就可以判斷是否已經(jīng)超時(shí)了,比如,當(dāng)前系統(tǒng)時(shí)間是8h 30min很明顯已經(jīng)超過了理論上的系統(tǒng)時(shí)間8h 20min,deadline - System.nanoTime()計(jì)算出來就是一個(gè)負(fù)數(shù),自然而然會(huì)在3.2步中的If判斷之間返回false。如果還沒有超時(shí)即3.2步中的if判斷為true時(shí)就會(huì)繼續(xù)執(zhí)行3.3步通過LockSupport.parkNanos使得當(dāng)前線程阻塞,同時(shí)在3.4步增加了對(duì)中斷的檢測,若檢測出被中斷直接拋出被中斷異常。

4. 共享鎖

4.1 執(zhí)行過程概述

獲取鎖的過程:

  1. 當(dāng)線程調(diào)用acquireShared()申請(qǐng)獲取鎖資源時(shí),如果成功,則進(jìn)入臨界區(qū)。
  2. 當(dāng)獲取鎖失敗時(shí),則創(chuàng)建一個(gè)共享類型的節(jié)點(diǎn)并進(jìn)入一個(gè)FIFO等待隊(duì)列,然后被掛起等待喚醒。
  3. 當(dāng)隊(duì)列中的等待線程被喚醒以后就重新嘗試獲取鎖資源,如果成功則喚醒后面還在等待的共享節(jié)點(diǎn)并把該喚醒事件傳遞下去,即會(huì)依次喚醒在該節(jié)點(diǎn)后面的所有共享節(jié)點(diǎn),然后進(jìn)入臨界區(qū),否則繼續(xù)掛起等待。

釋放鎖過程:

  1. 當(dāng)線程調(diào)用releaseShared()進(jìn)行鎖資源釋放時(shí),如果釋放成功,則喚醒隊(duì)列中等待的節(jié)點(diǎn),如果有的話。

4.2 源碼深入分析

基于上面所說的共享鎖執(zhí)行流程,我們接下來看下源碼實(shí)現(xiàn)邏輯:
首先來看下獲取鎖的方法acquireShared(),如下

   public final void acquireShared(int arg) {
        //嘗試獲取共享鎖,返回值小于0表示獲取失敗
        if (tryAcquireShared(arg) < 0)
            //執(zhí)行獲取鎖失敗以后的方法
            doAcquireShared(arg);
    }

這里tryAcquireShared()方法是留給用戶去實(shí)現(xiàn)具體的獲取鎖邏輯的。關(guān)于該方法的實(shí)現(xiàn)有兩點(diǎn)需要特別說明:

  1. 該方法必須自己檢查當(dāng)前上下文是否支持獲取共享鎖,如果支持再進(jìn)行獲取。

  2. 該方法返回值是個(gè)重點(diǎn)。其一、由上面的源碼片段可以看出返回值小于0表示獲取鎖失敗,需要進(jìn)入等待隊(duì)列。其二、如果返回值等于0表示當(dāng)前線程獲取共享鎖成功,但它后續(xù)的線程是無法繼續(xù)獲取的,也就是不需要把它后面等待的節(jié)點(diǎn)喚醒。最后、如果返回值大于0,表示當(dāng)前線程獲取共享鎖成功且它后續(xù)等待的節(jié)點(diǎn)也有可能繼續(xù)獲取共享鎖成功,也就是說此時(shí)需要把后續(xù)節(jié)點(diǎn)喚醒讓它們?nèi)L試獲取共享鎖。

有了上面的約定,我們?cè)賮砜聪耫oAcquireShared方法的實(shí)現(xiàn):

//參數(shù)不多說,就是傳給acquireShared()的參數(shù)
private void doAcquireShared(int arg) {
    //添加等待節(jié)點(diǎn)的方法跟獨(dú)占鎖一樣,唯一區(qū)別就是節(jié)點(diǎn)類型變?yōu)榱斯蚕硇?,不再贅?    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            //表示前面的節(jié)點(diǎn)已經(jīng)獲取到鎖,自己會(huì)嘗試獲取鎖
            if (p == head) {
                int r = tryAcquireShared(arg);
                //注意上面說的, 等于0表示不用喚醒后繼節(jié)點(diǎn),大于0需要
                if (r >= 0) {
                    //這里是重點(diǎn),獲取到鎖以后的喚醒操作,后面詳細(xì)說
                    setHeadAndPropagate(node, r);
                    p.next = null;
                    //如果是因?yàn)橹袛嘈褋韯t設(shè)置中斷標(biāo)記位
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            //掛起邏輯跟獨(dú)占鎖一樣,不再贅述
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //獲取失敗的取消邏輯跟獨(dú)占鎖一樣,不再贅述
        if (failed)
            cancelAcquire(node);
    }
}

獨(dú)占鎖模式獲取成功以后設(shè)置頭結(jié)點(diǎn)然后返回中斷狀態(tài),結(jié)束流程。而共享鎖模式獲取成功以后,調(diào)用了setHeadAndPropagate方法,從方法名就可以看出除了設(shè)置新的頭結(jié)點(diǎn)以外還有一個(gè)傳遞動(dòng)作,一起看下代碼:

    //兩個(gè)入?yún)ⅲ粋€(gè)是當(dāng)前成功獲取共享鎖的節(jié)點(diǎn),一個(gè)就是tryAcquireShared方法的返回值,注意上面說的,它可能大于0也可能等于0
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; //記錄當(dāng)前頭節(jié)點(diǎn)
        //設(shè)置新的頭節(jié)點(diǎn),即把當(dāng)前獲取到鎖的節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)
        //注:這里是獲取到鎖之后的操作,不需要并發(fā)控制
        setHead(node);
        //這里意思有兩種情況是需要執(zhí)行喚醒操作
        //1.propagate > 0 表示調(diào)用方指明了后繼節(jié)點(diǎn)需要被喚醒
        //2.頭節(jié)點(diǎn)后面的節(jié)點(diǎn)需要被喚醒(waitStatus<0),不論是老的頭結(jié)點(diǎn)還是新的頭結(jié)點(diǎn)
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            //如果當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)是共享類型或者沒有后繼節(jié)點(diǎn),則進(jìn)行喚醒
            //這里可以理解為除非明確指明不需要喚醒(后繼等待節(jié)點(diǎn)是獨(dú)占類型),否則都要喚醒
            if (s == null || s.isShared())
                //后面詳細(xì)說
                doReleaseShared();
        }
    }

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

最終的喚醒操作也很復(fù)雜,專門拿出來分析一下:
注:這個(gè)喚醒操作在releaseShare()方法里也會(huì)調(diào)用。

private void doReleaseShared() {
        for (;;) {
            //喚醒操作由頭結(jié)點(diǎn)開始,注意這里的頭節(jié)點(diǎn)已經(jīng)是上面新設(shè)置的頭結(jié)點(diǎn)了
            //其實(shí)就是喚醒上面新獲取到共享鎖的節(jié)點(diǎn)的后繼節(jié)點(diǎn)
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //表示后繼節(jié)點(diǎn)需要被喚醒
                if (ws == Node.SIGNAL) {
                    //這里需要控制并發(fā),因?yàn)槿肟谟衧etHeadAndPropagate跟release兩個(gè),避免兩次unpark
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;      
                    //執(zhí)行喚醒操作      
                    unparkSuccessor(h);
                }
                //如果后繼節(jié)點(diǎn)暫時(shí)不需要喚醒,則把當(dāng)前節(jié)點(diǎn)狀態(tài)設(shè)置為PROPAGATE確保以后可以傳遞下去
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                
            }
            //如果頭結(jié)點(diǎn)沒有發(fā)生變化,表示設(shè)置完成,退出循環(huán)
            //如果頭結(jié)點(diǎn)發(fā)生變化,比如說其他線程獲取到了鎖,為了使自己的喚醒動(dòng)作可以傳遞,必須進(jìn)行重試
            if (h == head)                   
                break;
        }
    }

接下來看下釋放共享鎖的過程:

public final boolean releaseShared(int arg) {
        //嘗試釋放共享鎖
        if (tryReleaseShared(arg)) {
            //喚醒過程,詳情見上面分析
            doReleaseShared();
            return true;
        }
        return false;
    }

注:上面的setHeadAndPropagate()方法表示等待隊(duì)列中的線程成功獲取到共享鎖,這時(shí)候它需要喚醒它后面的共享節(jié)點(diǎn)(如果有),但是當(dāng)通過releaseShared()方法去釋放一個(gè)共享鎖的時(shí)候,接下來等待獨(dú)占鎖跟共享鎖的線程都可以被喚醒進(jìn)行嘗試獲取。

4.3 總結(jié)

跟獨(dú)占鎖相比,共享鎖的主要特征在于當(dāng)一個(gè)在等待隊(duì)列中的共享節(jié)點(diǎn)成功獲取到鎖以后(它獲取到的是共享鎖),既然是共享,那它必須要依次喚醒后面所有可以跟它一起共享當(dāng)前鎖資源的節(jié)點(diǎn),毫無疑問,這些節(jié)點(diǎn)必須也是在等待共享鎖(這是大前提,如果等待的是獨(dú)占鎖,那前面已經(jīng)有一個(gè)共享節(jié)點(diǎn)獲取鎖了,它肯定是獲取不到的)。當(dāng)共享鎖被釋放的時(shí)候,可以用讀寫鎖為例進(jìn)行思考,當(dāng)一個(gè)讀鎖被釋放,此時(shí)不論是讀鎖還是寫鎖都是可以競爭資源的。

5. 總結(jié):

如果獲取共享鎖失敗后,將請(qǐng)求共享鎖的線程封裝成Node對(duì)象放入AQS的隊(duì)列中,并掛起Node對(duì)象對(duì)應(yīng)的線程,實(shí)現(xiàn)請(qǐng)求鎖線程的等待操作。待共享鎖可以被獲取后,從頭節(jié)點(diǎn)開始,依次喚醒頭節(jié)點(diǎn)及其以后的所有共享類型的節(jié)點(diǎn)。實(shí)現(xiàn)共享狀態(tài)的傳播。這里有幾點(diǎn)值得注意:
1. 與AQS的獨(dú)占功能一樣,共享鎖是否可以被獲取的判斷為空方法,交由子類去實(shí)現(xiàn)。
2. 與AQS的獨(dú)占功能不同,當(dāng)鎖被頭節(jié)點(diǎn)獲取后,獨(dú)占功能是只有頭節(jié)點(diǎn)獲取鎖,其余節(jié)點(diǎn)的線程繼續(xù)沉睡,等待鎖被釋放后,才會(huì)喚醒下一個(gè)節(jié)點(diǎn)的線程,而共享功能是只要頭節(jié)點(diǎn)獲取鎖成功,就在喚醒自身節(jié)點(diǎn)對(duì)應(yīng)的線程的同時(shí),繼續(xù)喚醒AQS隊(duì)列中的下一個(gè)節(jié)點(diǎn)的線程,每個(gè)節(jié)點(diǎn)在喚醒自身的同時(shí)還會(huì)喚醒下一個(gè)節(jié)點(diǎn)對(duì)應(yīng)的線程,以實(shí)現(xiàn)共享狀態(tài)的“向后傳播”,從而實(shí)現(xiàn)共享功能。

以上的分析都是從AQS子類的角度去看待AQS的部分功能的,而如果直接看待AQS,或許可以這么去解讀:
首先,AQS并不關(guān)心“是什么鎖”,對(duì)于AQS來說它只是實(shí)現(xiàn)了一系列的用于判斷“資源”是否可以訪問的API,并且封裝了在“訪問資源”受限時(shí)將請(qǐng)求訪問的線程的加入隊(duì)列、掛起、喚醒等操作,AQS只關(guān)心“資源不可以訪問時(shí),怎么處理?”、“資源是可以被同時(shí)訪問,還是在同一時(shí)間只能被一個(gè)線程訪問?”、“如果有線程等不及資源了,怎么從AQS的隊(duì)列中退出?”等一系列圍繞資源訪問的問題,而至于“資源是否可以被訪問?”這個(gè)問題則交給AQS的子類去實(shí)現(xiàn)。
當(dāng)AQS的子類是實(shí)現(xiàn)獨(dú)占功能時(shí),例如ReentrantLock,“資源是否可以被訪問”被定義為只要AQS的state變量不為0,并且持有鎖的線程不是當(dāng)前線程,則代表資源不能訪問。
當(dāng)AQS的子類是實(shí)現(xiàn)共享功能時(shí),例如:CountDownLatch,“資源是否可以被訪問”被定義為只要AQSstate變量不為0,說明資源不能訪問。這是典型的將規(guī)則和操作分開的設(shè)計(jì)思路:規(guī)則子類定義,操作邏輯因?yàn)榫哂泄眯?,放在父類中去封裝。當(dāng)然,正式因?yàn)?code>AQS只是關(guān)心“資源在什么條件下可被訪問”,所以子類還可以同時(shí)使用AQS的共享功能和獨(dú)占功能的API以實(shí)現(xiàn)更為復(fù)雜的功能。
比如:ReentrantReadWriteLock,我們知道ReentrantReadWriteLock的中也有一個(gè)叫Sync的內(nèi)部類繼承了AQS,而AQS的隊(duì)列可以同時(shí)存放共享鎖和獨(dú)占鎖,對(duì)于ReentrantReadWriteLock來說分別代表讀鎖和寫鎖,當(dāng)隊(duì)列中的頭節(jié)點(diǎn)為讀鎖時(shí),代表讀操作可以執(zhí)行,而寫操作不能執(zhí)行,因此請(qǐng)求寫操作的線程會(huì)被掛起,當(dāng)讀操作依次推出后,寫鎖成為頭節(jié)點(diǎn),請(qǐng)求寫操作的線程被喚醒,可以執(zhí)行寫操作,而此時(shí)的讀請(qǐng)求將被封裝成Node放入AQS的隊(duì)列中。如此往復(fù),實(shí)現(xiàn)讀寫鎖的讀寫交替進(jìn)行。

參考文獻(xiàn)

《java并發(fā)編程的藝術(shù)》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容