徹底理解 AQS 我是懂了,你呢?

背景

早前寫過一篇深入理解 AQS 的文章

https://mp.weixin.qq.com/s?__biz=MzI3Njk5ODg4OQ==&mid=2247484343&idx=1&sn=0c0ac16161f09cadd00483addbf6e598&chksm=eb6dbc31dc1a35278931f76fce310d6ead4aba125fc2370aeb52a03b2dc4a78c0d4d95fae420&token=1325698290&lang=zh_CN#rd

過了一段時間后,我發(fā)現(xiàn)有些地方記得不清楚了,所以我感覺其實(shí)理解的并不深入透徹,于是決定再好好找資料看一看,把之前一知半解的地方徹底弄清楚!

看文檔

先看下源碼, AbstractQueuedSynchronizer 這個類有代碼+注釋共有 2335 行,可以說很長,我們先看第一手資料即源碼注釋是怎么寫的,通過 AbstractQueuedSynchronizer 類的注釋提綱挈領(lǐng)地了解一下這個類。

類注釋

這里就不貼英文原文了,比較長,直接上翻譯內(nèi)容:

本類提供了一個用于實(shí)現(xiàn)阻塞鎖和同步器(信號量、事件等)的框架,該框架依賴先進(jìn)先出 (FIFO) 的等待隊(duì)列來實(shí)現(xiàn)。它為大多數(shù)通過單個 int 類型的原子值來表示狀態(tài)的同步器提供了實(shí)現(xiàn)基礎(chǔ)。子類必須重寫更改同步器狀態(tài)的 protected 方法,并定義該狀態(tài)在獲取或釋放子類對象方面的具體含義?;诖?,此類中的其他方法實(shí)現(xiàn)所有排隊(duì)和阻塞機(jī)制。子類可以維護(hù)其他狀態(tài)字段,但就同步而言,只有使用了方法 getState、 setState 和 compareAndSetState 操作以原子方式更新的 int 值才能夠被跟蹤。

子類應(yīng)定義為非公共內(nèi)部輔助類,用于實(shí)現(xiàn)其外部類的同步屬性。AbstractQueuedSynchronizer 類沒有實(shí)現(xiàn)任何同步接口。相反,它定義了諸如 acquireInterruptably 之類的方法,這些方法可以由具體鎖和相關(guān)同步器適當(dāng)調(diào)用以實(shí)現(xiàn)它們的公共方法。

此類默認(rèn)支持獨(dú)占模式和共享模式獲取同步狀態(tài)。當(dāng)以獨(dú)占模式獲取同步狀態(tài)時,其他線程嘗試獲取不會成功。共享模式下,多個線程獲取同步狀態(tài)一般(但不一定)會成功。這個類會忽略這兩種模式在機(jī)制上的差異,那就是當(dāng)線程在共享模式下成功獲取到同步狀態(tài)時,下一個等待的線程(如果存在)也必須確定它是否也可以獲取該狀態(tài)。

不同模式下的等待線程共享一個 FIFO 隊(duì)列。

通常子類只實(shí)現(xiàn)一種模式,但兩者可以同時工作,例如 ReadWriteLock。只支持一種模式的子類,無需再為另一種模式定義方法。

AQS 定義了一個內(nèi)部類 ConditionObject,可以當(dāng)做 Condition 的實(shí)例供支持獨(dú)占模式的子類使用。在獨(dú)占模式下,方法 #isHeldExclusively 用于表示當(dāng)前線程是否在獨(dú)占子類對象,以方法#getState 的返回值為入?yún)ⅲ{(diào)用方法#release 可以完全釋放子類對象。使用這個值調(diào)用#acquire 方法,最終恢復(fù)到前一個獲取鎖的狀態(tài)。 AbstractQueuedSynchronizer 中沒有其他方法創(chuàng)建這個條件,所以如果不能滿足這個約束,就不要使用它。 AbstractQueuedSynchronizer.ConditionObject 的行為當(dāng)然是基于它的實(shí)現(xiàn)類的語義。

該類為內(nèi)部類提供了 檢查/儀表化/監(jiān)視 方法,以及類似的用于 condition 對象的方法。 這些可以導(dǎo)出到使用 AbstractQueuedSynchronizer 作為同步機(jī)制的類中。
該類的序列化僅僅存儲原子整數(shù)的維護(hù)狀態(tài),所以反序列化對象得到的線程隊(duì)列是空的。子類若需要序列化,要定義 readObject() 用于恢復(fù)自己到一個已知的初始狀態(tài)。

用法:

若將該類用作同步器的基礎(chǔ),請使用 getState()/setState()/cas 這些用于檢查或修改同步狀態(tài)的方法對下面的方法進(jìn)行重新定義:

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

這些方法默認(rèn)拋出 UnsupportedOperationException, 這些方法內(nèi)部必須保證線程安全,并且通常應(yīng)該是簡短且無鎖的。 定義這些方法是使用該類的意義所在。其他方法因?yàn)椴荒塥?dú)立的變化,所以聲明為 final。

你也能看到繼承自 AbstractOwnableSynchronizer 的方法,用于跟蹤那些獨(dú)占了同步器的線程。鼓勵你使用這些方法,它們開啟了監(jiān)視和診斷功能,可以幫助用戶判斷那些線程持有鎖。

即使該類基于內(nèi)部 FIFO 隊(duì)列,但也不會執(zhí)行默認(rèn)的 FIFO 策略,獨(dú)占同步的核心使用以下策略:

Acquire:
    while (!tryAcquire(arg)) {
          enqueue thread if it is not already queued;
          possibly block current thread;
       }
 
   Release:
       if (tryRelease(arg))
          unblock the first queued thread;

(共享模式類似,但可能涉及級聯(lián)信號。)

因?yàn)閷Λ@取鎖的檢查是在入隊(duì)之前進(jìn)行,所以一個新線程可能會插入到其他已經(jīng)入隊(duì)的線程之前。不管咋地吧,你要是愿意,可以通過調(diào)用內(nèi)部的一個或多個檢查方法去定義 tryAcquire() / tryAcquireShared() 方法,從而提供一個公平的 FIFO 獲取順序。特別是,大多數(shù)公平同步器在 hasQueuedPredecessors()(這是一個專門設(shè)計用于公平同步器的)返回 true 的時候,讓 tryAcquire() 返回 false. 當(dāng)然,其他方法也有可能。

對于默認(rèn)的插隊(duì)策略(也叫 greedy/renouncement/convoy-avoidance), 吞吐量和可伸縮性通常最高。盡管這不能保證公平,但允許早入隊(duì)的線程在后來的線程之前重新競爭,每個重新競爭的線程擁有公平的機(jī)會打敗新來的線程。盡管獲取行為通常不會一直進(jìn)行,但線程在被阻塞之前,伴隨著其他計算過程,他們可能會多次調(diào)用 tryAcquire(). 當(dāng)獨(dú)占同步器只是短暫的被持有時,這對于自旋來說很有好處,沒有太多負(fù)擔(dān)。你可以通過前面的調(diào)用增強(qiáng)這一點(diǎn),以獲得帶有"fast-path"的方法。如果同步器可能沒有被爭奪,或許只會預(yù)先檢查 hasContended() 和 hasQueuedThreads()。

這個類為同步提供了有效的可擴(kuò)展的基礎(chǔ),部分原因是把范圍集中在那些依賴數(shù)字狀態(tài)、 獲取/釋放參數(shù)、 以及內(nèi)部 FIFO 的同步器上。當(dāng)這些不能滿足需要時,你可以使用原子類、 你自己的 java.util.Queue 類、 LockSuppor 來構(gòu)建更底層的同步器。

用法示例:

這是一個不可重入互斥鎖,0 表示打開,1 表示鎖定。雖然不可重入鎖無需嚴(yán)格記錄本地?fù)碛姓叩木€程,但這個類還是這么做了,以便更容易監(jiān)視。同時也支持 conditions 并且公開了一個檢測方法。

class Mutex implements Lock, java.io.Serializable {

    // Our internal helper class
    private static class Sync extends AbstractQueuedSynchronizer {
        // Acquires the lock if state is zero
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // Releases the lock by setting state to zero
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // Reports whether in locked state
        public boolean isLocked() {
            return getState() != 0;
        }

        public boolean isHeldExclusively() {
            // a data race, but safe due to out-of-thin-air guarantees
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        // Provides a Condition
        public Condition newCondition() {
            return new ConditionObject();
        }

        // Deserializes properly
        private void readObject(ObjectInputStream s)
                throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    // The sync object does all the hard work. We just forward to it.
    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isLocked();
    }

    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

這是一個類似于 java.util.concurrent.CountDownLatch CountDownLatch 的閂鎖類,只是它只需要一個信號即可觸發(fā)。 因?yàn)殒i存器是非獨(dú)占的,所以它使用共享的獲取和釋放方法。


class BooleanLatch {

    private static class Sync extends AbstractQueuedSynchronizer {
        boolean isSignalled() {
            return getState() != 0;
        }

        protected int tryAcquireShared(int ignore) {
            return isSignalled() ? 1 : -1;
        }

        protected boolean tryReleaseShared(int ignore) {
            setState(1);
            return true;
        }
    }

    private final Sync sync = new Sync();

    public boolean isSignalled() {
        return sync.isSignalled();
    }

    public void signal() {
        sync.releaseShared(1);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
}

如果對上面的翻譯理解起來還是費(fèi)勁,那還放那兒,等看完下面的,再回到上面看一遍就通透了。

看設(shè)計

AQS 本質(zhì)上是一個 FIFO 的雙向隊(duì)列,線程被包裝成結(jié)點(diǎn)的形式,基于自旋機(jī)制在隊(duì)列中等待獲取資源(這里的資源可以簡單理解為對象鎖)

image

總攬一下這個類,可以看到有兩個內(nèi)部類,剩下的就是一堆成員變量和成員方法。

image

設(shè)計思路

這是 AQS 的模型:


image

AQS 主要由三部分組成,state 同步狀態(tài)、Node 組成的 CLH 隊(duì)列、ConditionObject 條件變量(包含 Node 組成的條件單向隊(duì)列)。

state 用 volatile 來修飾,保證了我們操作的可見性,所以任何線程通過 getState() 獲得狀態(tài)都是可以得到最新值,但是 setState() 無法保證原子性,因此 AQS 給我們提供了 compareAndSetState 方法利用底層 UnSafe 的 CAS 功能來實(shí)現(xiàn)原子性。

對于 AQS 來說,線程同步的關(guān)鍵是對 state 的操作,可以說獲取、釋放資源是否成功都是由 state 決定的,比如 state>0 代表可獲取資源,否則無法獲取,所以 state 的具體語義由實(shí)現(xiàn)者去定義,現(xiàn)有的 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch 定義的 state 語義都不一樣。

  • ReentrantLock 的 state 用來表示是否有鎖資源,變量記錄了鎖的重入次數(shù)
  • ReentrantReadWriteLock 的 state 高 16 位代表讀鎖狀態(tài),低 16 位代表寫鎖狀態(tài)
  • Semaphore 的 state 用來表示可用信號的個數(shù)
  • CountDownLatch 的 state 用來表示計數(shù)器的值

AQS 實(shí)現(xiàn)了兩類隊(duì)列,即 同步隊(duì)列 和 條件隊(duì)列。

同步隊(duì)列服務(wù)于線程阻塞等待獲取資源,而條件隊(duì)列則服務(wù)于線程因某個條件不滿足而進(jìn)入等待狀態(tài)。
條件隊(duì)列中的線程實(shí)際上已經(jīng)獲取到了資源,但是沒有能夠繼續(xù)執(zhí)行下去的條件,所以被打入條件隊(duì)列并釋放持有的資源,以讓渡其它線程執(zhí)行,如果未來某個時刻條件得以滿足,則該線程會被從條件隊(duì)列轉(zhuǎn)移到同步隊(duì)列,繼續(xù)參與競爭資源,以繼續(xù)向下執(zhí)行。

同步隊(duì)列

CLH

同步隊(duì)列是基于鏈表實(shí)現(xiàn)的雙向隊(duì)列,也是 CLH 鎖的變種。CLH 鎖是 AQS 隊(duì)列同步器實(shí)現(xiàn)的基礎(chǔ)。

來看一下 CLH 隊(duì)列

image
  • CLH 鎖是有由 Craig, Landin, and Hagersten 這三個人發(fā)明的鎖,取了三個人名字的首字母,所以叫 CLH Lock。
  • CLH 鎖是一個自旋鎖。能確保無饑餓性。提供先來先服務(wù)的公平性。
  • CLH 隊(duì)列鎖也是一種基于鏈表的可擴(kuò)展、高性能、公平的自旋鎖,申請線程僅僅在本地變量上自旋,它不斷輪詢前驅(qū)的狀態(tài),假設(shè)發(fā)現(xiàn)前驅(qū)釋放了鎖就結(jié)束自旋。

Node

AQS 以內(nèi)部類 Node 的形式定義了同步隊(duì)列結(jié)點(diǎn)。這就是前文看到的第一個內(nèi)部類。

static final class Node {

    /** 模式定義 */

    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    /** 線程狀態(tài) */

    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    /** 線程等待狀態(tài) */
    volatile int waitStatus;

    /** 前驅(qū)結(jié)點(diǎn) */
    volatile Node prev;
    /** 后置結(jié)點(diǎn) */
    volatile Node next;

    /** 持有的線程對象 */
    volatile Thread thread;

    /** 對于獨(dú)占模式而言,指向下一個處于 CONDITION 等待狀態(tài)的結(jié)點(diǎn);對于共享模式而言,則為 SHARED 結(jié)點(diǎn) */
    Node nextWaiter;

    // ... 省略方法定義
}
image

Node 在 CLH 的基礎(chǔ)上進(jìn)行了變種。CLH 是單向隊(duì)列,其主要特點(diǎn)是自旋檢查前驅(qū)節(jié)點(diǎn)的 locked 狀態(tài)。而 AQS 同步隊(duì)列是雙向隊(duì)列,每個節(jié)點(diǎn)也有狀態(tài) waitStatus,而其并不是一直對前驅(qū)節(jié)點(diǎn)的狀態(tài)自旋判斷,而是自旋一段時間后阻塞讓出 cpu 時間片(上下文切換),等待前驅(qū)節(jié)點(diǎn)主動喚醒后繼節(jié)點(diǎn)。

image

waitStatus 有如下 5 中狀態(tài):

  • CANCELLED = 1
    表示當(dāng)前結(jié)點(diǎn)已取消調(diào)度。當(dāng)超時或被中斷(響應(yīng)中斷的情況下),會觸發(fā)變更為此狀態(tài),進(jìn)入該狀態(tài)后的結(jié)點(diǎn)將不會再變化。
  • SIGNAL = -1
    表示后繼結(jié)點(diǎn)在等待當(dāng)前結(jié)點(diǎn)喚醒。后繼結(jié)點(diǎn)入隊(duì)時,會將前繼結(jié)點(diǎn)的狀態(tài)更新為 SIGNAL。
  • CONDITION = -2
    表示結(jié)點(diǎn)等待在 Condition 上,當(dāng)其他線程調(diào)用了 Condition 的 signal() 方法后,CONDITION 狀態(tài)的結(jié)點(diǎn)將從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列中,等待獲取同步鎖。
  • PROPAGATE = -3
    共享模式下,前繼結(jié)點(diǎn)不僅會喚醒其后繼結(jié)點(diǎn),同時也可能會喚醒后繼的后繼結(jié)點(diǎn)。
  • INITIAL = 0
    新結(jié)點(diǎn)入隊(duì)時的默認(rèn)狀態(tài)。

從上面的代碼中可以看出,位于 CLH 鏈表中的線程以 2 種模式在等待資源,即 SHARED 和 EXCLUSIVE,其中 SHARED 表示共享模式,而 EXCLUSIVE 表示獨(dú)占模式。共享模式與獨(dú)占模式的主要區(qū)別在于,同一時刻獨(dú)占模式只能有一個線程獲取到資源,而共享模式在同一時刻可以有多個線程獲取到資源。典型的場景就是讀寫鎖,讀操作可以有多個線程同時獲取到讀鎖資源,而寫操作同一時刻只能有一個線程獲取到寫鎖資源,其它線程在嘗試獲取資源時都會被阻塞。

同步隊(duì)列主要行為

AQS 類成員變量 head 和 tail 字段分別指向同步隊(duì)列的頭結(jié)點(diǎn)和尾結(jié)點(diǎn):


    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

其中 head 表示同步隊(duì)列的頭結(jié)點(diǎn),而 tail 則表示同步隊(duì)列的尾結(jié)點(diǎn),具體組織形式如下圖:

image

當(dāng)調(diào)用 AQS 的 acquire 方法獲取資源時,如果資源不足則當(dāng)前線程會被封裝成 Node 結(jié)點(diǎn)添加到同步隊(duì)列的末端(入隊(duì)),頭結(jié)點(diǎn) head 用于記錄當(dāng)前正在持有資源的線程結(jié)點(diǎn),而 head 的后繼結(jié)點(diǎn)就是下一個將要被調(diào)度的線程結(jié)點(diǎn),當(dāng) release 方法被調(diào)用時,該結(jié)點(diǎn)上的線程將被喚醒(出隊(duì)),繼續(xù)獲取資源。

同步隊(duì)列的主要行為是 :入隊(duì)、出隊(duì)

入隊(duì)

獲取資源失敗的線程需要封裝成 Node 節(jié)點(diǎn),接著尾部入隊(duì),在 AQS 中提供 addWaiter 函數(shù)完成 Node 節(jié)點(diǎn)的創(chuàng)建與入隊(duì)。添加節(jié)點(diǎn)的時候,如 CLH 隊(duì)列已經(jīng)存在,通過 CAS 快速將當(dāng)前節(jié)點(diǎn)添加到隊(duì)列尾部,如果添加失敗或隊(duì)列不存在,則初始化同步隊(duì)列。

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    Node node = new Node(mode);

    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

總結(jié):入隊(duì)列,線程獲取鎖失敗,入隊(duì)列將新節(jié)點(diǎn)加到 tail 后面,然后對 tail 進(jìn)行 CAS 操作,將 tail 指針后移到新節(jié)點(diǎn)上。

出隊(duì)

CLH 隊(duì)列中的節(jié)點(diǎn)都是獲取資源失敗的線程節(jié)點(diǎn),當(dāng)持有資源的線程釋放資源時,會將 head.next 指向的線程節(jié)點(diǎn)喚醒(C L H 隊(duì)列的第二個節(jié)點(diǎn)),如果喚醒的線程節(jié)點(diǎn)獲取資源成功,線程節(jié)點(diǎn)清空信息設(shè)置為頭部節(jié)點(diǎn)(新哨兵節(jié)點(diǎn)),原頭部節(jié)點(diǎn)出隊(duì)(原哨兵節(jié)點(diǎn))


  protected final boolean tryRelease(int releases) {
        int c = getState() - releases; 
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) { // 如果 state=0 了,就是可以釋放鎖了
            free = true; 
            setExclusiveOwnerThread(null); // 將拿鎖線程置為 null
        }
        setState(c); // 重置同步器的 state
        return free; // 返回是否成功釋放
  }

  private void unparkSuccessor(Node node) {
    // node 節(jié)點(diǎn)是當(dāng)前釋放鎖的節(jié)點(diǎn),也是同步隊(duì)列的頭節(jié)點(diǎn)
    int ws = node.waitStatus;
    // 如果節(jié)點(diǎn)已經(jīng)被取消了,把節(jié)點(diǎn)的狀態(tài)置為初始化
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 拿出隊(duì)二 s
    Node s = node.next;
    // s 為空,表示 node 的后一個節(jié)點(diǎn)為空
    // s.waitStatus 大于 0,代表 s 節(jié)點(diǎn)已經(jīng)被取消了
    // 遇到以上這兩種情況,就從隊(duì)尾開始,向前遍歷,找到第一個 waitStatus 字段不是被取消的
    if (s == null || s.waitStatus > 0) {
        s = null;
        
        // 結(jié)束條件是前置節(jié)點(diǎn)就是 head 了
        for (Node t = tail; t != null && t != node; t = t.prev)
            // t.waitStatus <= 0 說明 t 當(dāng)前沒有被取消,肯定還在等待被喚醒
            if (t.waitStatus <= 0)
                s = t;
    }
    // 喚醒以上代碼找到的線程
    if (s != null)
        LockSupport.unpark(s.thread);
}

當(dāng)線程被喚醒后,又繼續(xù)執(zhí)行 acquireQueued 方法,進(jìn)入循環(huán)

 /**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            //獲取前驅(qū)節(jié)點(diǎn)
            final Node p = node.predecessor();
            //如果前驅(qū)節(jié)點(diǎn)是首節(jié)點(diǎn),獲取資源(子類實(shí)現(xiàn))
            if (p == head && tryAcquire(arg)) {
                //獲取資源成功,設(shè)置當(dāng)前節(jié)點(diǎn)為頭節(jié)點(diǎn),清空當(dāng)前節(jié)點(diǎn)的信息,把當(dāng)前節(jié)點(diǎn)變成哨兵節(jié)點(diǎn)
                setHead(node);
                //原來首節(jié)點(diǎn)下個節(jié)點(diǎn)指向?yàn)?null
                p.next = null; // help GC
                //返回線程中斷狀態(tài)
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

總結(jié):出隊(duì)列,鎖釋放喚醒 head 的后繼節(jié)點(diǎn),head 的后繼節(jié)點(diǎn)從阻塞中醒來,開始搶鎖,獲取鎖成功,此時 head 指針向后移一個位置,原先 head 的后繼節(jié)點(diǎn)成為新的 head。

最后是一個 同步隊(duì)列的流程概述

image

條件隊(duì)列

一個 AQS 可以對應(yīng)多個條件變量

image

ConditionObject 內(nèi)部維護(hù)著一個單向條件隊(duì)列,不同于 CLH 隊(duì)列,條件隊(duì)列只入隊(duì)執(zhí)行 await 的線程節(jié)點(diǎn),并且加入條件隊(duì)列的節(jié)點(diǎn),不能在 CLH 隊(duì)列, 條件隊(duì)列出隊(duì)的節(jié)點(diǎn),會入隊(duì)到 CLH 隊(duì)列。

當(dāng)某個線程執(zhí)行了 ConditionObject 的 await 函數(shù),阻塞當(dāng)前線程,線程會被封裝成 Node 節(jié)點(diǎn)添加到條件隊(duì)列的末端,其他線程執(zhí)行 ConditionObject 的 signal 函數(shù),會將條件隊(duì)列頭部線程節(jié)點(diǎn)轉(zhuǎn)移到 CLH 隊(duì)列參與競爭資源,具體流程如下圖:

image

一個 Condition 對象就有一個單項(xiàng)的等待任務(wù)隊(duì)列。在一個多線程任務(wù)中我們可以 new 出多個等待任務(wù)隊(duì)列。比如我們 new 出來兩個等待隊(duì)列。

 private Lock lock = new ReentrantLock();
 private Condition FirstCond = lock.newCondition();
 private Condition SecondCond = lock.newCondition();

所以真正的 AQS 任務(wù)中一般是一個任務(wù)隊(duì)列 N 個等待隊(duì)列的,因此我們盡量調(diào)用 signal 而少用 signalAll,因?yàn)樵谥付ǖ膶?shí)例化等待隊(duì)列中只有一個可以拿到鎖的。

設(shè)計模式

從設(shè)計模式的角度講,AbstractQueuedSynchronizer 是個抽象類,所有用到方法的類都要繼承此類的若干方法,對應(yīng)的設(shè)計模式就是模版模式。 這樣就解決了實(shí)現(xiàn)同步器時涉及的大量細(xì)節(jié)問題,能夠極大地減少實(shí)現(xiàn)工作。

若干方法就是上文注釋翻譯中描述的這些方法:

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

說明一下各個方法的作用:

  • tryAcquire :嘗試以獨(dú)占模式獲取資源,如果獲取成功則返回 true,否則返回 false。
  • tryRelease :嘗試以獨(dú)占模式釋放資源,如果釋放成功則返回 true,否則返回 false。
  • tryAcquireShared :嘗試以共享模式獲取資源,如果返回正數(shù)則說明獲取成功,且還有可用的剩余資源;如果返回 0 則說明獲取成功,但是沒有可用的剩余資源;如果返回負(fù)數(shù)則說明獲取資源失敗。
  • tryReleaseShared :嘗試以共享模式釋放資源,如果釋放成功則返回 true,否則返回 false。
  • isHeldExclusively :判斷當(dāng)前線程是否正在獨(dú)占資源,如果是則返回 true,否則返回 false。

AbstractQueuedSynchronizer 中的方法實(shí)現(xiàn)按照功能劃分可以分為兩大類,即獲取資源(acquire)和釋放資源(release),同時區(qū)分獨(dú)占模式和共享模式

可以看到具體實(shí)現(xiàn)類需要對資源在不同模式下的獲得和釋放進(jìn)行具體定義。
具體實(shí)現(xiàn)舉例來說可以到 ReentrantReadWriteLock 中的 內(nèi)部類 Sync 看一下。

持鎖的當(dāng)前線程:exclusiveOwnerThread

自實(shí)現(xiàn)

AQS 定義了一套多線程訪問共享資源的同步模板,解決了實(shí)現(xiàn)同步器時涉及的大量細(xì)節(jié)問題,能夠極大地減少實(shí)現(xiàn)工作,現(xiàn)在我們基于 AQS 實(shí)現(xiàn)一個不可重入的獨(dú)占鎖,直接使用 AQS 提供的獨(dú)占式模板,只需明確 state 的語義與實(shí)現(xiàn) tryAcquire 與 tryRelease 函數(shù)(獲取資源與釋放資源)。在這里 state 為 0 表示鎖沒有被線程持有,state 為 1 表示鎖已經(jīng)被某個線程持有,由于是不可重入鎖,所以不需要記錄持有鎖線程的獲取鎖次數(shù)。

不可重入的獨(dú)占鎖代碼如下


public class NonReentrantLock implements Lock {

    /**
     * 
     * @Description 自定義同步器
     */
    private static class Sync extends AbstractQueuedSynchronizer {

        /**
         * 鎖是否被線程持有
         */
        @Override
        protected boolean isHeldExclusively() {
            //0:未持有 1:已持有
            return super.getState() == 1;
        }

        /**
         * 獲取鎖
         */
        @Override
        protected boolean tryAcquire(int arg) {
            if (arg != 1) {
                //獲取鎖操作,是需要把 state 更新為 1,所以 arg 必須是 1
                throw new RuntimeException("arg not is 1");
            }
            if (compareAndSetState(0, arg)) {//cas 更新 state 為 1 成功,代表獲取鎖成功
                //設(shè)置持有鎖線程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        /**
         * 釋放鎖
         */
        @Override
        protected boolean tryRelease(int arg) {
            if (arg != 0) {
                //釋放鎖操作,是需要把 state 更新為 0,所以 arg 必須是 0
                throw new RuntimeException("arg not is 0");
            }
            //清空持有鎖線程
            setExclusiveOwnerThread(null);
            //設(shè)置 state 狀態(tài)為 0,此處不用 cas,因?yàn)橹挥蝎@取鎖成功的線程才會執(zhí)行該函數(shù),不需要考慮線程安全問題
            setState(arg);
            return true;
        }

        /**
         * 提供創(chuàng)建條件變量入口
         */
        public ConditionObject createConditionObject() {
            return new ConditionObject();
        }

    }

    private final Sync sync = new Sync();

    /**
     * 獲取鎖
     */
    @Override
    public void lock() {
        //Aqs 獨(dú)占式-獲取資源模板函數(shù)
        sync.acquire(1);
    }
        
    /**
     * 獲取鎖-響應(yīng)中斷
     */
    @Override
    public void lockInterruptibly() throws InterruptedException {
        //Aqs 獨(dú)占式-獲取資源模板函數(shù)(響應(yīng)線程中斷)
        sync.acquireInterruptibly(1);
    }

    /**
     * 獲取鎖是否成功-不阻塞
     */
    @Override
    public boolean tryLock() {
        //子類實(shí)現(xiàn)
        return sync.tryAcquire(1);
    }
    
    /**
     * 獲取鎖-超時機(jī)制
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        //Aqs 獨(dú)占式-獲取資源模板函數(shù)(超時機(jī)制)
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }
    
    /**
     * 釋放鎖
     */
    @Override
    public void unlock() {
        //Aqs 獨(dú)占式-釋放資源模板函數(shù)
        sync.release(0);
    }
    
    /**
     * 創(chuàng)建條件變量
     */
    @Override
    public Condition newCondition() {
        return sync.createConditionObject();
    }
}

NonReentrantLock 定義了一個內(nèi)部類 Sync,Sync 用來實(shí)現(xiàn)具體的鎖操作,它繼承了 AQS,因?yàn)槭褂玫氖仟?dú)占式模板,所以重寫 tryAcquire 與 tryRelease 函數(shù),另外提供了一個創(chuàng)建條件變量的入口,下面使用自定義的獨(dú)占鎖來同步兩個線程對 j++。

    private static int j = 0;

    public static void main(String[] agrs) throws InterruptedException {
        NonReentrantLock  nonReentrantLock = new NonReentrantLock();

        Runnable runnable = () -> {
            //獲取鎖
            nonReentrantLock.lock();
            for (int i = 0; i < 100000; i++) {
                j++;
            }
            //釋放鎖
            nonReentrantLock.unlock();
        };

        Thread thread = new Thread(runnable);
        Thread threadTwo = new Thread(runnable);

        thread.start();
        threadTwo.start();

        thread.join();
        threadTwo.join();

        System.out.println(j);
    }
    

無論執(zhí)行多少次輸出內(nèi)容都是:
200000

其他

LockSupport 輔助類

LockSupport 是一個線程阻塞工具類,所有的方法都是靜態(tài)方法,可以讓線程在任意位置阻塞,當(dāng)然阻塞之后肯定得有喚醒的方法。

park 是因?yàn)?park 英文意思為停車。我們?nèi)绻?Thread 看成一輛車的話,park 就是讓車停下,unpark 就是讓車啟動然后跑起來。

park/unpark 調(diào)用的是 Unsafe(提供 CAS 操作) 中的 native 代碼。

說明

本文參考了非常多網(wǎng)絡(luò)圖片和文章資料,嚴(yán)格意義上已經(jīng)不算原創(chuàng)文章了,如本文引有的內(nèi)容有侵權(quán),請聯(lián)系刪除。

參考

更多精彩內(nèi)容,請關(guān)注公眾號 “小盒子的技術(shù)分享”

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容