多線程與并發(fā)(七):AQS原理

1 AQS原理

全稱是 AbstractQueuedSynchronizer ,阻塞式鎖和相關同步工具的框架

1.1 特點

  • 用state屬性來表示資源的狀態(tài)(分獨占模式和共享模式),子類需要定義如何維護這個狀態(tài),控制如何獲取鎖和釋放鎖

  • getState 獲取state狀態(tài)

  • setState 設置state 狀態(tài)

  • compareAndState cas機制設置state狀態(tài)

  • 獨占模式是只有一個線程能夠訪問資源,而共享模式可以允許多個線程訪問資源

  • 提供了基于FIFO的等待隊列,類似于Monitor 的EntryList

  • 條件變量來實現(xiàn)等待、喚醒機制,支持多個條件變量,類似于Monitor 的WaitSet

子類主要實現(xiàn)這樣的一些方法(默認拋出UnsupportedOperationException)

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

獲取鎖的姿勢

//如果獲取鎖失敗
if (! tryAcquire(arg)) {
    //入隊,可以選擇阻塞當前線程 park unpark 機制
}

釋放鎖的姿勢

//如果釋放鎖成功
if (tryRelease(arg)) {
    //讓阻塞線程恢復運行
}

2. 自定義鎖

自定義鎖實現(xiàn) Lock 接口,定義一個內部類繼承 AbstractQueuedSynchronizer 類
自定義鎖是一個不可重入鎖

public class AqsTest {
    public static void main(String[] args) {
        MyLock lock = new MyLock();
        new Thread(()->{
            lock.lock();
            try{
                System.out.println("lock" + Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }finally {
                System.out.println("unlock" + Thread.currentThread().getName());
                lock.unlock();
            }
        }).start();

        new Thread(()->{
            lock.lock();
            try{
                System.out.println("lock" + Thread.currentThread().getName());
            }finally {
                System.out.println("unlock" + Thread.currentThread().getName());
                lock.unlock();
            }
        }).start();
    }

}

/**
 * 自定義不可重入鎖
 */
class MyLock implements Lock {

    class MySync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0,1)) {
                //表示加上鎖了,設置owner為當前線程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {

            //注意兩個方法順序
            setExclusiveOwnerThread(null);
           //state 使用volatile修飾,禁止指令重排
            setState(0); //表示解鎖

            return true;
        }

        /**
         * 是否持有獨占鎖
         * @return
         */
        @Override
        protected boolean isHeldExclusively() {

            return getState() == 1;
        }

        public Condition newCondition(){
            return new ConditionObject();
        }
    }

    private MySync sync = new MySync();

    /**
     * 不成功進入等待隊列等待
     */
    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
       sync.acquireInterruptibly(1);
    }

    /**
     * 嘗試加鎖,只會嘗試一次
     * @return
     */
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    /**
     * 創(chuàng)建條件變量
     * @return
     */
    @Override
    public Condition newCondition() {
        return null;
    }
}

測試結果:
lockThread-0
unlockThread-0
lockThread-1
unlockThread-1


3. ReentrantLock 原理

ReentrantLock UML類圖


reentrantlock_uml.png

3.1 非公平鎖實現(xiàn)原理

3.1.1 加解鎖流程

加鎖流程

沒有競爭時

非公平鎖-1.png

第一個競爭出現(xiàn)時
Thread-1 執(zhí)行了

  1. CAS 嘗試將state 由 0 改為 1 ,結果失敗
  2. 進入tryAcquire邏輯,這時 state 已經是 1 ,結果仍然失敗
  3. 接下來進入addWaiter邏輯,構造Node隊列
    • 途中黃色三角表示該Node的waitStatus 狀態(tài),其中 0 為默認正常狀態(tài)
    • Node的創(chuàng)建時懶惰的
    • 其中第一個Node成為 Dummy(啞元) 或哨兵,用來占位,并不關聯(lián)線程
非公平鎖-3 (1).png
  1. 當前線程進入acquireQueued 邏輯
  • acquireQueued 會在一個死循環(huán)中不斷嘗試獲得鎖,失敗后進入park 阻塞
  • 如果自己是緊鄰著 head (排第二位),那么再次tryAcquire 嘗試獲取鎖,當然這時state 仍為1,失敗
  • 進入 shouldParkAfterFailedAcquire(p, node) 邏輯,將前驅node ,即head 的waitStatus 改為 -1 ,這次返回false (-1 表示有責任喚醒它的后繼節(jié)點)
非公平鎖-4.png
  • shouldParkAfterFailedAcquire(p, node) 執(zhí)行完畢后回到acquireQueued, 再次tryAcquire 嘗試獲得鎖,當然這時的 state 仍為1,失敗
  • 當再次進入 shouldParkAfterFailedAcquire 時,這時因為其前驅node 的waitStatus 已經是 -1,這次返回true
  • 進入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
非公平鎖-5.png
  • 再次有多個線程經歷上述過程競爭失敗,變成這個樣子
非公平鎖-6.png

解鎖流程

1.Thread-0 釋放鎖,進入tryRelease 流程,如果成功

  • 設置 execlusiveOwnerThread 為null
  • state = 0
非公平鎖-解鎖1.png

2.當前隊列不為null , 并且 head 的waitStatus = -1,進入unparkSuccessor 流程

  • 找到隊列中離head 最近的一個Node (沒取消的),unpark 恢復其運行,本例中為Thread-1

3.回到Thread-1 的acquireQueue流程

非公平鎖-解鎖2.png

如果加鎖成功(沒有競爭),會設置

  • exclusiveOwnerThread 為Thread-1 ,state = 1;
  • head 指向剛剛Thread-1 所在的Node,該Node清空Thread
  • 原本的head因為從鏈表斷開,而可被垃圾回收

4.如果這時候有其他線程來競爭(非公平的體現(xiàn)),例如這時Thread-4 來了

如果不巧被Thread-4占了先機

  • Thread-4 被設置為exclusiveOwnerThread , state = 1
  • Thread-1 再次進入acquireQueued 流程,獲取鎖失敗,重新進入park 阻塞
源碼

先從構造器看,默認為非公平鎖實現(xiàn)

public ReentrantLock() {
    sync = new NonfairSync();
}

NonfairSync 繼承自 Sync , Sync 繼承自AQS

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    } 
abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();

        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // Methods relayed from outer class

        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }

        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }

        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor(); //獲取前驅節(jié)點
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

/**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 * @return the new node
 */
private Node addWaiter(Node mode) {
    //創(chuàng)建一個節(jié)點,waitStatus = null
    Node node = new Node(Thread.currentThread(), mode);
    //如果當前隊列已經有節(jié)點了,直接將該節(jié)點加入到AQS隊列尾部
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
   //如果當前隊列為空,要先初始化,new Node()
    enq(node);
    return node;
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

3.1.2 可重入原理

具體分析見源碼

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;


    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}


abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    abstract void lock();

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();

     
        if (c == 0) {
            //當前線程沒有獲得鎖,嘗試修改狀態(tài)0->1
            if (compareAndSetState(0, acquires)) {
               //修改狀態(tài)成功,表示獲得鎖。設置owner為當前線程
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //如果已經獲得了鎖,線程還是當前線程,表示發(fā)生了鎖重入
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            //設置當前的state, 同一個線程鎖重入state++
            setState(nextc);
            return true;
        }
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        //c==0時重新設置 owner 返回true,否則只更新state值,返回false
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    //是否獨占鎖
    protected final boolean isHeldExclusively() {
        // While we must in general read state before owner,
        // we don't need to do so to check if current thread is owner
        return getExclusiveOwnerThread() == Thread.currentThread();
    }
   ...
}

3.1.3 可打斷原理

不可打斷模式

在此模式下,即使它被打斷,仍會駐留在AQS 隊列中,等獲得鎖后方能繼續(xù)運行(是繼續(xù)運行,只是打斷標記被設置為true)

private final boolean parkAndCheckInterrupt() {
    // 如果打斷標記已經是true,則park 會失效
    LockSupport.park(this);
    // interrupted 會清除打斷標記
    return Thread.interrupted();
}

//未獲得到鎖時,調用該方法
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                //還是需要獲得鎖后,才能返回打斷狀態(tài)
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 如果是因為interrupt 被喚醒,返回打斷狀態(tài)為true
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如果打斷狀態(tài)為true
        selfInterrupt();
}

static void selfInterrupt() {
    // 重新產生一次中斷
    Thread.currentThread().interrupt();
}

可打斷模式

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        // 如果沒有獲得鎖,進入
        doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //在park過程中如果被interrupt 會進入此
                // 這時拋出異常,而不會再次進入 for(;;)
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

4. 公平鎖實現(xiàn)原理

非公平鎖

final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        //如果還沒有獲得鎖
        if (c == 0) {
            //嘗試用CAS獲得鎖,這里體現(xiàn)非公平性:不去檢查AQS隊列
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果已經獲得了鎖,線程還是當前線程,表示發(fā)生了鎖重入
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        //獲取失敗,回到調用處
        return false;
    }

公平鎖實現(xiàn)

// 與非公平鎖的主要區(qū)別在于tryAcquire 方法的實現(xiàn)
protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 先檢查AQS隊列中是否有前驅節(jié)點
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

5. 條件變量的實現(xiàn)原理

每個條件變量其實就對應著一個等待隊列,實現(xiàn)類是ConditionObject

await 流程

開始 Thread-0持有鎖,調用await,進入 ConditionObject 的 addConditionWaiter 流程創(chuàng)建新的Node狀態(tài)為-2(Node.CONDITION),關聯(lián) Thread-0,加入等待隊列的尾部


條件變量.jpg
public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
       //調用ConditionObject.await() 先進入addConditionWaiter()流程
        Node node = addConditionWaiter();
       //接下來進入AQS的fullyRelease 流程,釋放同步器上的鎖
       //為什么這里要調用 fullyRelease(node)? 有可能一個線程重入好幾次
        int savedState = fullyRelease(node);
        int interruptMode = 0;
       //進入一個循環(huán),判斷
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //給當前線程創(chuàng)建一個新節(jié)點,連接到鏈表上
            //waitStatus == Node.CONDITION
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

當前線程添加到 ConditionObject隊列中后,當前線程要釋放鎖

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        //釋放鎖成功,返回當前狀態(tài);若釋放失敗,設置當前節(jié)點的狀態(tài)為Node.CANCELLED
        //release(savedState) 還會喚醒等待隊列中的下一個節(jié)點
        //unpark AQS隊列的下一個節(jié)點,競爭鎖,假設沒有其他線程競爭,Thread-1競爭鎖成功
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

signal 流程

假設Thread-1 要來喚醒 Thread-0

signal.png

進入ConditionObject 的doSignal()流程


/**
 * Moves the longest-waiting thread, if one exists, from the
 * wait queue for this condition to the wait queue for the
 * owning lock.
 *
 */
public final void signal() {
    //如果當前線程沒有持有鎖,調用signal()拋異常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //取出等待隊列的頭結點
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

/**
 * Removes and transfers nodes until hit non-cancelled one or
 * null. Split out from signal in part to encourage compilers
 * to inline the case of no waiters.
 * @param first (non-null) the first node on condition queue
 */
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

執(zhí)行 transferForSignal 流程,將該Node 加入 AQS隊列尾部,將Thread-0 的waitStatus 改為 0;將Thread-3的 waitStatus 改為 -1

 /**
  * Transfers a node from a condition queue onto sync queue.
  * Returns true if successful.
  * @param node the node
  * @return true if successfully transferred (else the node was
  * cancelled before signal)
  */
//將這個節(jié)點從 condition隊列轉移到 sync隊列,如果成功則返回true
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    //首先嘗試cas操作將waitStatus 改為0,若失敗,直接返回false
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    //拼接將該節(jié)點到 aqs 隊尾。調用 enq()返回該節(jié)點的前置節(jié)點
   
    Node p = enq(node);
    int ws = p.waitStatus;
   
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true
}

Node節(jié)點定義

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;

        /**
         * Returns true if node is waiting in shared mode.
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容