AbstractQueuedSynchronizer源碼解析

1. 簡介

AbstractQueuedSynchronizer(抽象隊列同步器)簡稱AQS,是一個用來構(gòu)建同步組件的基礎(chǔ)框架,JDK中java.util.concurrent這個包的許多同步組件都是基于AQS來構(gòu)建的,如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock等。AQS運用了模板方法的設(shè)計模式,使我們只需重寫AQS的tryAcquiretryRelease等方法來快速構(gòu)造一個同步組件,上面提到的同步組件都是用這種方式創(chuàng)建出來的。比如獨占鎖這種工具我們只需重寫以下3個方法:

  • tryAcquire:嘗試獲取同步狀態(tài)
  • tryRelease:嘗試釋放同步狀態(tài)
  • isHeldExclusively:是否為獨占狀態(tài)

2. 實現(xiàn)

類繼承關(guān)系

AbstractOwnableSynchronizer這個類提供了設(shè)置和獲取獨占同步狀態(tài)的線程的方法

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {}
public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    /** Use serial ID even though all fields transient. */
    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }

    private transient Thread exclusiveOwnerThread;

    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

等待隊列

隊列同步器的原理是維持一個同步狀態(tài)和一個FIFO等待隊列,獲取到同步狀態(tài)的線程會存在于隊列頭結(jié)點,而獲取同步狀態(tài)失敗的線程會插入到隊列尾部并且不斷自旋檢查是否可以獲取同步狀態(tài),嘗試獲取失敗后會判斷是否需要阻塞,阻塞直到釋放同步狀態(tài)的線程按順序喚醒阻塞線程直到獲取到同步狀態(tài)退出。

AQS等待隊列

結(jié)點類型

static final class Node {
    // 結(jié)點位于共享模式
    static final Node SHARED = new Node();
    // 結(jié)點位于獨占模式
    static final Node EXCLUSIVE = null;

    // 以下4個狀態(tài)是同步隊列中結(jié)點線程的等待狀態(tài)

    // 取消狀態(tài),表示線程取消獲取同步狀態(tài),可能是因為中斷或超時取消
    static final int CANCELLED =  1;
    // 需要喚醒等待狀態(tài)為SIGNAL的后繼結(jié)點
    static final int SIGNAL    = -1;
    // 等待某個條件
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    // 等待狀態(tài),值為0或以上4個狀態(tài)
    volatile int waitStatus;
    // 當(dāng)前結(jié)點的前驅(qū)結(jié)點
    volatile Node prev;
     // 當(dāng)前結(jié)點的后驅(qū)結(jié)點
    volatile Node next;
    // 被結(jié)點包裝的線程
    volatile Thread thread;

    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 獲取前驅(qū)結(jié)點
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
// 等待隊列頭結(jié)點
private transient volatile Node head;
// 等待隊列尾結(jié)點
private transient volatile Node tail;
// 同步狀態(tài)
private volatile int state;
// 獲取同步狀態(tài)
protected final int getState() {
    return state;
}
// 設(shè)置同步狀態(tài)
protected final void setState(int newState) {
    state = newState;
}
// 以CAS的方式設(shè)置同步狀態(tài)
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

acquire方法

阻塞獲取獨占式同步狀態(tài)。源碼中的tryAcquire方法需要在子類中根據(jù)需求重寫。

acquire過程
public final void acquire(int arg) {
    // 調(diào)用tryAcquire方法嘗試獲取同步狀態(tài),獲取成功直接返回,獲取失敗調(diào)用addWaiter方法將當(dāng)前線程封裝為獨占模式的等待結(jié)點加入等待隊列,接著調(diào)用acquireQueued方法自旋獲取同步狀態(tài)。
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}


private Node addWaiter(Node mode) {
    // 創(chuàng)建一個指定模式的包含當(dāng)前線程的等待結(jié)點
    Node node = new Node(Thread.currentThread(), mode);
    // 當(dāng)尾結(jié)點不為空,即隊列不為空,使用CAS方法將新建結(jié)點添加到隊尾
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }

    // 隊列為空或在剛才入隊時出現(xiàn)CAS沖突執(zhí)行enq方法
    enq(node);
    // 返回插入的新結(jié)點
    return node;
}

// 該方法使用循環(huán)重試保證node一定會入隊成功
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 隊列為空,先新建一個空結(jié)點作為頭結(jié)點,使用CAS方式插入
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else { // 否則隊列不為空使用CAS方法添加結(jié)點node到隊尾,失敗則重試
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 獲取當(dāng)前結(jié)點的前驅(qū)結(jié)點
            final Node p = node.predecessor();
            // 如果前驅(qū)結(jié)點為頭結(jié)點表示當(dāng)前結(jié)點為等待隊列中的第一個有資格獲取同步狀態(tài)的結(jié)點
            // 接著嘗試獲取同步狀態(tài)
            if (p == head && tryAcquire(arg)) {
                // 設(shè)置頭結(jié)點為當(dāng)前結(jié)點
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 獲取同步狀態(tài)失敗調(diào)用shouldParkAfterFailedAcquire方法判斷是否需要阻塞,
            // 如果需要則調(diào)用parkAndCheckInterrupt方法阻塞線程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 獲取前驅(qū)結(jié)點等待狀態(tài)
    int ws = pred.waitStatus;
    // 如果前驅(qū)結(jié)點等待狀態(tài)為SIGNAL則表示當(dāng)前線程需要阻塞,直接返回true
    if (ws == Node.SIGNAL)
        return true;
    // 如果前驅(qū)結(jié)點等待狀態(tài)為CANCELLED,說明前驅(qū)結(jié)點取消獲取鎖,向前遍歷鏈表,跳過等待狀態(tài)為CANCELLED的結(jié)點直到找到一個等待狀態(tài)不為CANCELLED的結(jié)點
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 否則將前驅(qū)結(jié)點的等待狀態(tài)使用CAS方式設(shè)置為SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


private final boolean parkAndCheckInterrupt() {
    // 調(diào)用LockSupport.park方法阻塞當(dāng)前線程,當(dāng)釋放同步狀態(tài)的線程執(zhí)行LockSupport.unpark方法喚醒當(dāng)前線程或其他線程執(zhí)行中斷方法中斷當(dāng)前線程則會從LockSupport.park中返回
    LockSupport.park(this);
    // 返回中斷狀態(tài)并清除
    return Thread.interrupted();
}

release方法

釋放獨占式同步狀態(tài)。源碼中的tryRelease方法需要在子類中根據(jù)需求重寫。

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 如果頭結(jié)點不為空,并且等待狀態(tài)不為初始狀態(tài)0則喚醒后繼結(jié)點
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// 喚醒后繼結(jié)點
private void unparkSuccessor(Node node) {

    int ws = node.waitStatus;
    // 如果當(dāng)前結(jié)點狀態(tài)小于0則使用CAS方式將其修改為0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 獲取后繼結(jié)點
    Node s = node.next;
    // 如果后繼結(jié)點為空或者取消獲取同步狀態(tài)則后續(xù)第一個等待狀態(tài)不為CANCELLED的結(jié)點
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 從尾結(jié)點開始向前遍歷到node結(jié)點,遍歷過程中記錄最新的等待狀態(tài)小于等于0的結(jié)點,該最新結(jié)點里的線程為需要喚醒的線程
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 喚醒線程
        LockSupport.unpark(s.thread);
}

acquireInterruptibly方法

上面講到的acquire方法是阻塞式獲取同步狀態(tài),不支持獲取過程中響應(yīng)中斷拋出中斷異常,如果需要支持阻塞獲取同步狀態(tài)過程中可以響應(yīng)中斷拋出中斷異??梢允褂?code>acquireInterruptibly方法

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    // 判斷是否中斷,是則拋出中斷異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 嘗試獲取同步狀態(tài),成功則返回,失敗調(diào)用doAcquireInterruptibly方法
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

// 這個方法跟上面的acquireQueued方法基本一樣,就是在被喚醒后檢測到中斷狀態(tài)后會拋出中斷異常
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 檢測到中斷,拋出中斷異常
                throw new InterruptedException();
        }
    } finally {
        // 當(dāng)failed為true時,表示檢查到中斷拋出異常,需要執(zhí)行cancelAcquire方法取消獲取同步狀態(tài)
        if (failed)
            cancelAcquire(node);
    }
}

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // 向前遍歷,跳過等待狀態(tài)為CANCELLED的結(jié)點,直到找到一個等待狀態(tài)不為CANCELLED的結(jié)點
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;
    //設(shè)置當(dāng)前結(jié)點等待狀態(tài)為CANCELLED
    node.waitStatus = Node.CANCELLED;

    // 如果node為尾結(jié)點則移除尾結(jié)點
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        // 否則如果node前驅(qū)結(jié)點不為head結(jié)點
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            、、
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {// 如果node前驅(qū)結(jié)點為head
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

tryAcquireNanos方法

該方法通過設(shè)置超時時間nanosTimeout,當(dāng)在指定超時時間內(nèi)未獲取到同步狀態(tài)將返回false,在獲取過程中支持響應(yīng)中斷并拋出異常。

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 判斷是否中斷,是則拋出中斷異常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 嘗試獲取同步狀態(tài),失敗則調(diào)用doAcquireNanos方法超時獲取
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // 計算截止時間deadline
    final long deadline = System.nanoTime() + nanosTimeout;
    // 創(chuàng)建獨占模式下的結(jié)點并入隊
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 當(dāng)前驅(qū)結(jié)點為head則嘗試獲取同步狀態(tài)
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 計算還有多長時間到達截止時間
            nanosTimeout = deadline - System.nanoTime();
            // 如果超時時間小于0則已到達截止時間,返回false
            if (nanosTimeout <= 0L)
                return false;
            // 判斷是否要阻塞,并且超時時間大于自旋超時時間的閾值則阻塞,因為小于spinForTimeoutThreshold沒必要阻塞,因為喚醒的時間可能超過該閾值
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            // 檢查到中斷則拋出中斷異常
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容