并發(fā)系列四:基于兩種案例來認(rèn)識ReentrantLock源碼加鎖過程(公平鎖)

前言

  • 上篇文章咱們證明了synchronized關(guān)鍵字的特性:無鎖、偏向鎖、輕量鎖、重(chong)偏向、重(chong)輕量、重量鎖。可以說synchronized是jvm層面實(shí)現(xiàn)同步的方式。在jdk中,存在一個叫java.util.concurrent的包,簡稱JUC,它是一個jdk層面的并發(fā)包,里面存在了大量與并發(fā)編程相關(guān)的api,其中最代表意義的就是atomic和lock兩種類別,前者是基于樂觀鎖CAS(Compare And Swap)的實(shí)現(xiàn),后者是基于AQS(Abstract Queued Synchronizer)實(shí)現(xiàn)。本文將詳細(xì)講解下AQS原理以及根據(jù)兩個案例來解讀ReentrantLock源碼。
  • 兩個案例:

    1.線程A單獨(dú)加鎖
    2.線程A正在持有鎖的過程中,線程t1來加鎖

一、AQS原理

  • AQS簡稱Abstract Queued Synchronizer,它的核心是基于一個雙向鏈表組成的隊(duì)列(CLH隊(duì)列) + volatile關(guān)鍵字修飾的int類型變量實(shí)現(xiàn)的。(關(guān)于volatile關(guān)鍵字可以參考其他博主的一些總結(jié): 傳送門),大致核心可以以如下圖來呈現(xiàn):

    在這里插入圖片描述

    簡單總結(jié)就是:內(nèi)部使用雙向鏈表維護(hù)了一個隊(duì)列,其中Node數(shù)據(jù)結(jié)構(gòu)為此隊(duì)列的基石,內(nèi)部維護(hù)了prev(指向上一個節(jié)點(diǎn))、next(指向下一個節(jié)點(diǎn))、waitStatus(當(dāng)前node的狀態(tài))、thread(當(dāng)前維護(hù)的線程)四個重要的屬性。其中waitStatus分別有如下取值:

    Node中waitStatus具體取值 含義
    CANCELLED(1) 中斷或取消,此狀態(tài)下的節(jié)點(diǎn)會從隊(duì)列中移除
    SIGNAL(-1) 此狀態(tài)下的節(jié)點(diǎn)一定是在隊(duì)列排隊(duì)中
    CONDITION(-2) 條件阻塞,比如說內(nèi)部因Condition而阻塞的節(jié)點(diǎn)
    PROPAGATE(-3) 表示下一個acquireShared應(yīng)該無條件傳播
    0 默認(rèn)狀態(tài)

    除此之外,隊(duì)列中還維護(hù)了三個屬性,head(指向隊(duì)列中的頭節(jié)點(diǎn))、state(鎖的狀態(tài))、tail(指向隊(duì)列中的尾節(jié)點(diǎn))。其中,state的取值有兩種情況,將以如下表展示出來:

    AQS中state具體取值 含義
    0 表示當(dāng)前鎖沒有被線程持有
    1 表示當(dāng)前鎖正在被線程持有
    大于1 表示當(dāng)前鎖被線程重入了(重入鎖),這里要注意:ReentrantLock重入了幾次,就要釋放幾次鎖

二、案例1:線程A單獨(dú)加鎖

  • 代碼如下:

    public class SimpleThreadLock {
    
        static ReentrantLock lock = new ReentrantLock(true);
    
        public static void main(String[] args) throws InterruptedException {
            Thread a = new Thread(() -> {
                try {
                    lock.lock();
                    System.out.println("Get lock");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }, "線程a");
    
            a.start();
            a.join();
            System.out.println("end");
        }
    }
    

    代碼也比較簡單,就是在主線程中創(chuàng)建了一個線程,并且內(nèi)部去使用ReentrantLock加鎖,獲取到鎖后就打印出Get lock這句話,當(dāng)t1線程執(zhí)行完后再繼續(xù)執(zhí)行主線程的邏輯。這里就不一步步演示斷點(diǎn)了,直接上源碼。

  • 這里先說明下ReentrantLock重載的兩個構(gòu)造方法

    // 默認(rèn)非公平鎖
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    // 若傳入true則是公平鎖
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    

    因?yàn)樵蹅儌魅肓?strong>true進(jìn)去,所以此時,它是一把公平鎖。

  • lock.lock()方法,因?yàn)樵蹅冎付耸褂霉芥i,所以最終會進(jìn)入ReentrantLock內(nèi)部維護(hù)的FairSync類的lock方法

    // FairSync類下的lock方法
    final void lock() {
        acquire(1);
    }
    

    于是,我們需要找到acquire方法,此方法為AQS(父類AbstractQueuedSynchronizer)的方法,所以最終會進(jìn)入如下這么一段代碼:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    這段代碼,看似很精簡,但是它做的事真的太多了。濃縮的才是精華呀!好了,咱們不偏題,繼續(xù)按照咱們的主題:線程A單獨(dú)加鎖。不過要繼續(xù)往下看,還是要加深下acquire方法的含義,我們必須要tryAcquire方法返回false,才能繼續(xù)走if條件中后面的邏輯,以及if條件內(nèi)部的邏輯。于是,我們直接看tryAcquire方法源碼:

  • tryAcquire方法

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    

    tryAcquire方法是一個protected方法,內(nèi)部直接拋出了一個異常,還記得咱們是從哪個類掉用到父類AbstractQueuedSynchronizer的acquire方法的?沒錯,就是FairSync類。那么咱們就直接定位到FairSync類的tryAcquire方法唄。

    protected final boolean tryAcquire(int acquires) {
        // 拿到當(dāng)前線程,也就是線程A
        final Thread current = Thread.currentThread();
    
        // 拿到當(dāng)前aqs的state變量,我們沒有修改過它,
        // 默認(rèn)為0
        int c = getState();
        if (c == 0) {
            // 進(jìn)入此邏輯,此邏輯跟acquire方法有點(diǎn)類似
            // 必須要hasQueuedPredecessors()方法返回false
            // 才能繼續(xù)往下執(zhí)行,于是我們把hasQueuedPredecessors的源碼也貼出來
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    
  • hasQueuedPredecessors方法源碼

    public final boolean hasQueuedPredecessors() {
        // 拿到aqs中的tail
        Node t = tail; 
        // 拿到aqs中的head
        Node h = head;
        Node s;
    
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    

    此方法涵蓋的情景比較多,但是就當(dāng)前情景而言,它很容易理解,在當(dāng)前情形中,我們壓根沒操作過tail和head那么h 肯定等于 t,所以此方法返回false,返回false后,我們回到FairSync類的tryAcquire方法,

    protected final boolean tryAcquire(int acquires) {
        // .... 上半部分代碼省略
        if (c == 0) {
            // 在當(dāng)前情景下,hasQueuedPredecessors返回的是false
            // 也就是說會繼續(xù)走if后面的邏輯,
            // if后面的邏輯就是執(zhí)行CAS操作,
            // 將state屬性從0設(shè)置成1
            // 由于此時只有一個線程在執(zhí)行,
            // 這個cas操作一定是成功的
            // cas成功后就會執(zhí)行setExclusiveOwnerThread代碼,這段代碼很有用
            // 它是一個賦值的操作,也就是記錄
            // 當(dāng)前擁有鎖的線程
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // .... 下半部分else if邏輯也省略了
        return false;
    }
    

    通過上述代碼中的注釋,我們可以發(fā)現(xiàn),線程A加鎖成功后會返回true,至此,tryAcquire的返回值為true。還記的我們是從哪個方法進(jìn)來的嗎?是的,是從父類AbstractQueuedSynchronizer的acquire方法進(jìn)來的,上面總結(jié)到了,只有當(dāng)tryAcquire返回false,才會繼續(xù)往下執(zhí)行。至此,線程A單獨(dú)加鎖的案例就結(jié)束了。通過這么一個單線程加鎖的案例,如果你認(rèn)為AQS很簡單的話,那就大錯特錯了,單線程加鎖的案例中,我們僅使用到了AQS中的state變量,CLH隊(duì)列卻始終沒有涉及到,而且從加鎖到加鎖結(jié)束的整個過程,我們連一個Node類型的數(shù)據(jù)結(jié)構(gòu)都沒有看到過。那Node類型的數(shù)據(jù)結(jié)構(gòu)什么時候會被用到呢?我們來看下一個案例線程A正在持有鎖的過程中,線程t1來加鎖

三、案例2:線程A正在持有鎖的過程中,線程t1來加鎖

  • 同樣的,咱們改造下代碼:

    public class TwoThreadLock {
    
        static ReentrantLock lock = new ReentrantLock(true);
    
        public static void main(String[] args) throws InterruptedException {
            new Thread(() -> {
                try {
                    lock.lock();
                    System.out.println("Thread a get lock");
                    TimeUnit.SECONDS.sleep(60);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }, "線程a").start();
    
            Thread t1 = new Thread(() -> {
                try {
                    lock.lock();
                    System.out.println("Thread t1 get lock");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }, "線程t1");
    
            t1.start();
            t1.join();
    
            System.out.println("end");
        }
    }
    

    上段代碼,毫無疑問,線程t1在調(diào)用lock.lock()方法時,就阻塞到那里了,要等線程a睡60s后才會繼續(xù)執(zhí)行,那么這里面到底做了哪些事呢?我們來一起研究下。

  • 同案例1,使用的是公平鎖,最終肯定會調(diào)用到tryAcquire方法去,咱們這次就一次性的把tryAcquire方法給講清楚

    protected final boolean tryAcquire(int acquires) {
        // 拿到當(dāng)前線程,也就是線程t1
        final Thread current = Thread.currentThread();
    
        // 拿到當(dāng)前aqs的state變量,此時的c是多少呢?
        // 沒錯,是1,因?yàn)殒i已經(jīng)被線程A占有了,此時的
        // state為1。于是它會走else if邏輯
        int c = getState();
        if (c == 0) {
            // 進(jìn)入此邏輯,此邏輯跟acquire方法有點(diǎn)類似
            // 必須要hasQueuedPredecessors()方法返回false
            // 才能繼續(xù)往下執(zhí)行,于是我們把hasQueuedPredecessors的源碼也貼出來
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 走了else if邏輯,它也發(fā)現(xiàn)當(dāng)前持有鎖的線程不是自己呀,于是直接return false
        // 這里順帶解釋下這個else if的邏輯,這個else if
        // 就是判斷當(dāng)前調(diào)用lock方法的線程是不是和當(dāng)前持有
        // 鎖的線程一樣,如果是一樣的,則將state + 1并賦值給nextc屬性
        // 這就表示了ReentrantLock支持重入性
        // 那么什么時候會出現(xiàn)nextc屬性小于0的情況呢?
        // nextc是一個int類型,當(dāng)超過了它的存儲返回后
        // 會出現(xiàn)小于0的情況 ===> 也就是說ReentrantLock
        // 的重入次數(shù)最大為支持int類型最大值
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    通過上述代碼塊中的注釋可知,線程t1的加鎖流程并沒有這么順利,在tryAcquire方法中返回了false,那這代表了什么呢?是的,它代表著線程t1可以繼續(xù)走acquire后面的邏輯了,咱們繼續(xù)把a(bǔ)cquire方法貼出來:

    public final void acquire(int arg) {
        // 在案例2的情況下,tryAcquire方法返回了false
        // 于是會執(zhí)行后面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        // 當(dāng)acquireQueued(addWaiter(Node.EXCLUSIVE), arg)返回了true才會執(zhí)行內(nèi)部的selfInterrupt()方法
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    于是,咱們先了解下addWaiter(Node.EXCLUSIVE)方法,它的源碼如下:

    private Node addWaiter(Node mode) {
        // 此時的mode是由上述代碼塊傳入的,
        // 它的值為Node.EXCLUSIVE ===> 這是一個空節(jié)點(diǎn),
        // 值為null,
        // 創(chuàng)建了一個node節(jié)點(diǎn), 內(nèi)部維護(hù)了當(dāng)前線程(線程t1),并且它的next節(jié)點(diǎn)為null(有Node的構(gòu)造方法可知)
        Node node = new Node(Thread.currentThread(), mode);
        // 拿到aqs隊(duì)列中的tail屬性,
        // 此時肯定為null啊(aqs隊(duì)列都沒初始化,哪來的隊(duì)尾節(jié)點(diǎn))
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        
        // 此時pred為null,即不會走上面的if邏輯,于是執(zhí)行enq方法,記?。捍藭r傳入enq方法時的形參為新new出來的Node
        // 內(nèi)部維護(hù)的是當(dāng)前線程(線程t1)
        enq(node);
        return node;
    }
    

    上面代碼塊的注釋也說了,最終會執(zhí)行到enq方法,enq方干啥的呢?猜一下?是的,它就是初始aqs隊(duì)列的。我們來看一下它的源碼:

    /**
     形參node內(nèi)部維護(hù)的線程為t2, 并且它的next屬性指向?yàn)閚ull
     */
    private Node enq(final Node node) {
        // 此處寫了一個死循環(huán),也就是常說的自旋鎖
        for (;;) {
            // 自旋的過程中
            // 第一次自旋:
            //  拿到隊(duì)尾元素, 此時隊(duì)列都沒有,肯定為null
            //  發(fā)現(xiàn)隊(duì)列中的tail指向的是null,于是初始化tail節(jié)點(diǎn),并讓aqs中的head指向了tail,
            //  至此,aqs簡易版本的隊(duì)列就出來啦,
            //  head和tail指向同一個node,并且此node內(nèi)部
            //  維護(hù)的thread、prev、next、waitStatus全是默認(rèn)值
            // 由于是if else邏輯,所以初始化tail屬性后,就會進(jìn)行第二次自旋
            // 第二次自旋:
            //  再次拿到tail, 由于第一次自旋把tail給初始化了,所以此時拿到的tail不為null, 于是走了else邏輯
            //  在else中,主要操作的是形參node, 還記得形參node是什么嗎? ==> 維護(hù)當(dāng)前線程(線程t1)的node節(jié)點(diǎn),
            //  此時會將node的上一個節(jié)點(diǎn)指向t節(jié)點(diǎn)
            //  同時進(jìn)行cas操作,將node節(jié)點(diǎn)變成tail
            //  當(dāng)cas成功后,再設(shè)置t的next指向node
            //  最終返回這個t.
            //  此時此刻這個t是什么樣的數(shù)據(jù)結(jié)構(gòu)呢?
            //  此時的這個t就是隊(duì)列中的head節(jié)點(diǎn)了,
            //  并且它的next為node(維護(hù)線程t1)
            //  所以此時此刻隊(duì)列中現(xiàn)在有兩個元素了
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    代碼中的注釋描述了enq的過程,我專門畫了一個圖來描述aqs隊(duì)列產(chǎn)生的過程,幫助理解:


    在這里插入圖片描述

    enq初始化aqs隊(duì)列的過程后,就執(zhí)行到了addWaiter方法的出口了

    private Node addWaiter(Node mode) {
        // ....上述代碼省略
        // enq初始化隊(duì)列后,會將node進(jìn)行返回
        // 這個node就是維護(hù)線程t1的node,它已經(jīng)是
        // 隊(duì)列中的隊(duì)列了
        enq(node);
        return node;
    }
    

    addWaiter方法執(zhí)行完了之后,將繼續(xù)執(zhí)行acquire方法

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    此時應(yīng)該接著執(zhí)行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)了,由于addWaiter方法已經(jīng)執(zhí)行完成,返回的是擁有當(dāng)前線程的node,同時它也是當(dāng)前隊(duì)列中的隊(duì)尾。我們來查看下acquireQueued的源碼:

    /**
     node形參為維護(hù)當(dāng)前線程(t1)的節(jié)點(diǎn),
     同時arg為1
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 此處又自旋了
            for (;;) {
                // 獲取到當(dāng)前節(jié)點(diǎn)的上一個節(jié)點(diǎn),在
                // 當(dāng)前案例下,它是head節(jié)點(diǎn)
                final Node p = node.predecessor();
                // 第一次自旋:
                //   做判斷,發(fā)現(xiàn)上一個節(jié)點(diǎn)是head節(jié)點(diǎn)
                //   于是繼續(xù)執(zhí)行加鎖方法tryAcquire
                //   因?yàn)樵诋?dāng)前案例下,線程a睡眠了60s
                //   肯定還是加鎖失敗的,加鎖失敗后,
                //   則走下面的邏輯,這里就是為了當(dāng)前
                //   節(jié)點(diǎn)繼續(xù)上鎖、因?yàn)橛锌赡芮懊娴?            //   節(jié)點(diǎn)已經(jīng)釋放鎖了,或者說被park
                //   的線程被unpark了,要繼續(xù)自旋,
                //   嘗試獲取鎖
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                
                // 判斷當(dāng)前這個節(jié)點(diǎn)是否需要park
                // 什么是park?就是使用unsafe類來阻塞指定的線程,
                // 在shouldParkAfterFailedAcquire方法中
                // 傳入的是當(dāng)前節(jié)點(diǎn)和上一個節(jié)點(diǎn),
                // 大致邏輯為:
                //   1. 判斷當(dāng)前節(jié)點(diǎn)的上一個節(jié)點(diǎn)(即p)的waitStatus是不是SIGNAL(-1)狀態(tài),如果是則返回true
                //     SIGNAL代表什么呢?上面的表格中有說到
                //     SIGNAL代表這個Node是處于排隊(duì)狀態(tài)
                //     因此可以得出一個結(jié)論:如果上一個節(jié)點(diǎn)也處于排隊(duì)狀態(tài)
                //     那么我就返回true,進(jìn)而執(zhí)行parkAndCheckInterrupt方法,parkAndCheckInterrupt方法就是讓park當(dāng)前線程,讓當(dāng)前線程進(jìn)入阻塞狀態(tài),自旋再此暫停
                //   2. 如果p節(jié)點(diǎn)的waitStatus為負(fù)數(shù),即不是中斷或者取消狀態(tài)
                //      那么它會將p的waitStatus置為-1.并返回false
                //      進(jìn)而進(jìn)入第二次自旋,當(dāng)進(jìn)入第二次自旋時,若上面還未獲取鎖成功,那么當(dāng)前線程就會被park
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    所以,當(dāng)線程t2在執(zhí)行到此方法時,發(fā)現(xiàn)head即線程a對應(yīng)的node的waitStatus為0,于是會自旋一次將head的waitStatus置為-1,然后再繼續(xù)自旋,此時自己嘗試加鎖又失敗了,此時就會進(jìn)入park狀態(tài)。所以就在acquireQueued方法處阻塞了,等待線程a釋放鎖后喚醒線程t1。至此案例2的加鎖過程也結(jié)束了

四、總結(jié)

  • 本次只是基于兩個簡單的案例來認(rèn)識ReentrantLock加鎖流程的源碼,其中還有很多其他的case沒有涉及到。這兩種案例算是認(rèn)識ReentrantLock加鎖源碼的入門吧。下篇博客將介紹下基于這兩種案例的解鎖過程。
  • ReentrantLock加鎖流程涉及到每個方法的詳細(xì)步驟可查看在github中的總結(jié):傳送門
  • I am a slow walker, but I never walk backwards.
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容