AQS(AbstractQueuedSynchronizer)源碼解析

前言

java.util.concurrent包(之后簡稱JUC包)中,提供了大量的同步與并發(fā)的工具類,是多線程編程的“利器”。其中l(wèi)ocks包下,包含了多種鎖,如ReentrantLock獨占鎖、ReentrantReadWriteLock讀寫鎖、Semaphore信號量(共享鎖)等,而這些鎖有一個共同的基礎類:AbstractQueuedSynchronizer。

原文地址:http://www.wangjialong.cc/2018/04/06/aqs_info/#more

AQS簡介

AQS是一個抽象類,不可以被實例化,它的設計之初就是為了讓子類通過繼承來實現(xiàn)多樣的功能的。它內(nèi)部提供了一個FIFO的等待隊列,用于多個線程等待一個事件(鎖)。它有一個重要的狀態(tài)標志——state,該屬性是一個int值,表示對象的當前狀態(tài)(如0表示lock,1表示unlock)。AQS提供了三個protected final的方法來改變state的值,分別是:getState、setState(int)、compareAndSetState(int, int)。根據(jù)修飾符,它們是不可以被子類重寫的,但可以在子類中進行調(diào)用,這也就意味著子類可以根據(jù)自己的邏輯來決定如何使用state值。

/ 同一類中 同一包中 同包內(nèi)的子類 不同包內(nèi)的子類 全局
public + + + + +
protected + + + +
default(無修飾符) + + +
private +

java的修飾符作用域如下:

/ 同一類中 同一包中 同包內(nèi)的子類 不同包內(nèi)的子類 全局
public + + + + +
protected + + + +
default(無修飾符) + + +
private +

表中 + 表示可以訪問, 空白表示無法訪問

AQS的子類應當被定義為內(nèi)部類,作為內(nèi)部的helper對象。事實上,這也是juc種鎖的做法,如ReentrantLock,便是通過內(nèi)部的Sync對象來繼承AQS的。AQS中定義了一些未實現(xiàn)的方法(拋出UnsupportedOperationException異常)

  • tryAcquire(int) 嘗試獲取state
  • tryRelease(int) 嘗試釋放state
  • tryAcquireShared(int) 共享的方式嘗試獲取
  • tryReleaseShared(int) 共享的方式嘗試釋放
  • isHeldExclusively() 判斷當前是否為獨占鎖

這些方法是子類需要實現(xiàn)的,可以選擇實現(xiàn)其中的一部分。根據(jù)實現(xiàn)方式的不同,可以分為兩種:獨占鎖和共享鎖。其中JUC中鎖的分類為:

  • 獨占鎖:ReentrantLock、ReentrantReadWriteLock.WriteLock
  • 共享鎖:ReentrantReadWriteLock.ReadLock、CountDownLatch、CyclicBarrier、Semaphore

其實現(xiàn)方式為:

  • 獨占鎖實現(xiàn)的是tryAcquire(int)、tryRelease(int)
  • 共享鎖實現(xiàn)的是tryAcquireShared(int)、tryReleaseShared(int)

如獨占鎖的實現(xiàn)方式是:

Acquire:
     while (!tryAcquire(arg)) {
        //將當前線程加入FIFO隊列中;
        //自旋或阻塞當前線程;
     }

Release:
     if (tryRelease(arg))
        //喚醒隊列中的第一個線程(head);

AQS中還提供了一個內(nèi)部類ConditionObject,它實現(xiàn)了Condition接口,可以用于await/signal。采用CLH隊列的算法,喚醒當前線程的下一個節(jié)點對應的線程,而signalAll喚醒所有線程。

總的來說,AQS提供了三個功能:

  1. 實現(xiàn)獨占鎖
  2. 實現(xiàn)共享鎖
  3. 實現(xiàn)Condition模型

源碼解析

Node解析

AQS內(nèi)部定義了一個static final的內(nèi)部類Node,用于實現(xiàn)等待隊列CLH,滿足FIFO結構,其隊列結構如下所示:

<pre>
+------+ prev +-----+ +-----+
head | | <---- | | <---- | | tail
| | ----> | | ----> | |
+------+ next +-----+ +-----+
</pre>

隊列為一個雙向鏈表結構,保存了head、tail兩個指針,分別指向鏈表頭部、尾部。當需要添加節(jié)點時,直接在tail位置添加,而dequeue操作直接對head節(jié)點進行。Node中定義如下常量:

/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();

/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED =  1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL    = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;

/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;

以上常量分別用于設置如下屬性的值:

volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;

Node類型的常量SHARED、EXCLUSIVE用于設置nextWaiter,用于表示當前節(jié)點是共享的,還是互斥的,分別用于共享鎖和獨占鎖。int類型的常量CANCELLED、SIGNAL、CONDITION、PROPAGATE用于設置waitStatus,用于在ConditionObject中使用,可以實現(xiàn)await/signal模型。

Node有三個構造函數(shù):

//不存放任何線程,用于生成哨兵節(jié)點
Node() ;
//用于鎖
Node(Thread thread, Node mode) ;
//用于Condition
Node(Thread thread, int waitStatus) ;

AQS屬性

AQS使用內(nèi)部類Node,構造一個雙向鏈表,用作FIFO隊列;除此之外,AQS還存放一個int類型的屬性state,用于表示當前的同步狀態(tài)。

//鏈表頭節(jié)點
private transient volatile Node head;
//鏈表尾節(jié)點
private transient volatile Node tail;
//同步狀態(tài)
private volatile int state;

head節(jié)點是一個哨兵節(jié)點,不存放實際的“線程”節(jié)點(使用Node的無參構造函數(shù))。tail指向鏈表的最后一個節(jié)點,當新增節(jié)點時,將新節(jié)點作為當前tail的下一個節(jié)點,通過CAS設置成功后,將新節(jié)點設為新的tail節(jié)點即可。新增節(jié)點的源碼如下:

    private Node enq(final Node node) {
        for (;;) { //死循環(huán)
            Node t = tail;
            if (t == null) { // 空鏈表,head、tail都為空
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq操作是一個無限循環(huán)的操作,最終總會成功,但根據(jù)代碼可知,AQS應不是starvation free的,因為某個線程可能會持續(xù)的enq失敗。AQS提供了形如doAcquireNanos方法,但其超時返回false操作是在addWaiter方法(內(nèi)部調(diào)用enq)之后,也無法回避enq的starvation。在此順便說一下,AQS也是無法保證fair的,也就是說先入隊列的線程不一定先獲取到鎖。節(jié)點的CAS是通過Unsafe來實現(xiàn)的,在state中統(tǒng)一說明。

state表示AQS當前的同步狀態(tài),如0表示lock,1表示unlock狀態(tài)。對state的操作,提供了三個方法。

    //讀取當前state
    protected final int getState() {
        return state;
    }

    //直接寫入,不考慮當前值
    protected final void setState(int newState) {
        state = newState;
    }

    //保證讀-寫的原子性
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

可以看到compareAndSetState使用的是unsafe對象的compareAndSwapInt方法,傳入this指針,state屬性的偏移地址,期待值expect,更新值update,可以實現(xiàn)CAS操作。state屬性的偏移地址獲取方式如下:

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
static {
    try {
        stateOffset = unsafe.objectFieldOffset
                        (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
    } catch (Exception ex) { throw new Error(ex); }
}

實際上,AQS的head、tail節(jié)點,內(nèi)部類Node的waitStatus、next屬性均使用unsafe對象,通過偏移地址來進行CAS操作。Unsafe是sun.misc包下的類,在Java API中沒有官方文檔,因為它是用于實現(xiàn)Java庫的,Java中有一個功能類似的類,可以實現(xiàn)對象屬性的CAS操作,可以參考我的另一篇博客AtomicXFieldUpdater,屬性原子修改的外部工具類,關于Unsafe的使用,可以參考Guide to Unsafe

AQS還有一個屬性static final long spinForTimeoutThreshold = 1000L;,用于表示自旋的時間,小于1000納秒的采用自旋鎖,大于1000納秒,使用LockSupport.park方法,將線程掛起。

重要方法分析

AQS是用于實現(xiàn)獨占鎖或共享鎖的,對于一個鎖來說,最重要的就是lock和unlock操作了,對應到AQS中,為acquire、release方法,由于AQS需要和子類進行“合作”,因此需要子類的定義來進行聯(lián)合分析,為簡單起見,使用AQS官方文檔中的示例,定義獨占鎖如下:

class Mutex implements Lock, java.io.Serializable {

   // Our internal helper class
   private static class Sync extends AbstractQueuedSynchronizer {
     // Reports whether in locked state
     protected boolean isHeldExclusively() {
       return getState() == 1;
     }

     // Acquires the lock if state is zero
     public boolean tryAcquire(int acquires) {
       assert acquires == 1; // Otherwise unused
       if (compareAndSetState(0, 1)) {
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }

     // Releases the lock by setting state to zero
     protected boolean tryRelease(int releases) {
       assert releases == 1; // Otherwise unused
       if (getState() == 0) throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }

     // Provides a Condition
     Condition newCondition() { return new ConditionObject(); }

     // Deserializes properly
     private void readObject(ObjectInputStream s)
         throws IOException, ClassNotFoundException {
       s.defaultReadObject();
       setState(0); // reset to unlocked state
     }
   }

   // The sync object does all the hard work. We just forward to it.
   private final Sync sync = new Sync();

   public void lock()                { sync.acquire(1); }
   public boolean tryLock()          { return sync.tryAcquire(1); }
   public void unlock()              { sync.release(1); }
   public Condition newCondition()   { return sync.newCondition(); }
   public boolean isLocked()         { return sync.isHeldExclusively(); }
   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
   public void lockInterruptibly() throws InterruptedException {
     sync.acquireInterruptibly(1);
   }
   public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
   }
 }

可以看到,lock方法調(diào)用內(nèi)部類的acquire方法,也就是AQS的acquire方法。unlock方法調(diào)用release方法。
下面對兩個流程進行分析

acquire

acquire是獨占鎖的獲取鎖的方法,其源碼如下:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire方法非常簡單,如果tryAcquire失敗(返回false),則調(diào)用acquireQueued方法,將當前線程加入到等待隊列中,并中斷當前線程,等待喚醒。

tryAcquire由子類實現(xiàn),下面先分析acquireQueued方法。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //若當前節(jié)點為鏈表第一個節(jié)點
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //park當前線程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

acquireQueued在addWaiter之后,再次嘗試獲取鎖,與tryAcquire不同的是,返回true表示未獲取成功,false表示獲取成功。通過判斷當前節(jié)點是否為隊列第一個節(jié)點,來決定是否獲取成功,acquireQueued方法相當于提供了一個默認方法,會被子類的tryAcquire方法屏蔽掉(若tryAcquire返回true的話)。

tryAcquire會調(diào)用子類Mutex.Sync的實現(xiàn),其代碼如下:

    // 如果state為0,則獲取到鎖
    public boolean tryAcquire(int acquires) {
       assert acquires == 1; // Otherwise unused
       if (compareAndSetState(0, 1)) {
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
    }

由此可見,AQS提供了一個模板,子類需要實現(xiàn)其tryAcquire方法,實現(xiàn)具體的獲取鎖邏輯(通過對state的讀、寫),子類lock方法直接調(diào)用AQS的acquire方法即可。

release方法

Mutex的unlock方法調(diào)用了release方法,在AQS中定義,源碼如下:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

還是同樣的配方,release方法調(diào)用子類實現(xiàn)的tryRelease,返回true后,表示獲取成功,之后判斷頭節(jié)點,由于鎖的實現(xiàn)中,waitStatus必定為0,所以不會執(zhí)行unpark操作,unpark用于Condition模型中。tryRelease方法的源碼如下:

    // 將state設置為0,解鎖
    protected boolean tryRelease(int releases) {
       assert releases == 1; // Otherwise unused
       if (getState() == 0) throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }

由源碼可知,tryRelease只需要將state設置為0即可,因為調(diào)用unlock方法的必定是之前調(diào)用lock成功的,因此當前state必定為1,為安全起見,使用getState判斷是否為0,若為0,說明線程出錯。state設置時,不需要調(diào)用CAS方法,只需要setState即可,保證write對于其他線程可見即可(通過volatile內(nèi)存屏障保證)。

總結

AQS提供了一個框架,在其上可以構建豐富的線程同步工具類,JUC包中ReadWriteLock、CountDownLatch都是基于AQS實現(xiàn)的,AQS在JUC包中的地位相當重要。其類圖如下:

image

盜圖使用,詳見“JUC鎖”01之 框架

AQS提供了三大功能:獨占鎖、共享鎖、ConditionObject。子類在實現(xiàn)中,可以實現(xiàn)其一部分方法。其編程思想值得借鑒,通過超類實現(xiàn)基本的處理流程,將其中部分抽成未實現(xiàn)方法,默認拋出異常,由子類實現(xiàn),這種解耦方式,最大化的減少了代碼的重復,且便于子類在實現(xiàn)中個性化自己的處理邏輯。

很久沒寫博客了,準備以AQS入手,深入分析一下JUC包,flag就這么立起來了,希望可以實現(xiàn)吧~~

參考資料

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容