【Java并發(fā)編程】Condition源碼解析

概述

對(duì)于Java來說萬物皆對(duì)象,所有的Java對(duì)象的最終父類都是Object,所以它們都擁有一組監(jiān)視器方法,主要包括:wait(),wait(long timeout),notify()和notifyAll(),這些方法與Synchronized關(guān)鍵字配合,可以實(shí)現(xiàn)等待/通知機(jī)制。
Condition也提供了類似Object的監(jiān)控方法,與Lock接口配合能夠?qū)崿F(xiàn)等待/通知機(jī)制,但是這兩者在使用方式和功能特性上有一定的區(qū)別。下面是一個(gè)Object的監(jiān)視器方法與Condition接口的對(duì)比:

對(duì)比項(xiàng) Object監(jiān)視器方法 Condition接口
前置條件 獲取對(duì)象鎖 1.調(diào)用Lock.lock()方法
2.調(diào)用Lock.newCondition()方法獲取Condition對(duì)象
調(diào)用方式 直接調(diào)用。例如Object.wait() 直接調(diào)用。例如condition.await()方法
等待隊(duì)列個(gè)數(shù) 一個(gè) 多個(gè)
當(dāng)前線程釋放鎖,進(jìn)入等待狀態(tài) 支持 支持
當(dāng)前線程釋放鎖,進(jìn)入等待狀態(tài),
在等待狀態(tài)中不響應(yīng)中斷
不支持 支持
當(dāng)前線程釋放鎖,并進(jìn)入超時(shí)等待狀態(tài) 支持 支持
當(dāng)前線程釋放鎖,進(jìn)入等待狀態(tài)到某個(gè)時(shí)間 不支持 支持
喚醒等待隊(duì)列中的一個(gè)線程 支持 支持
喚醒等待隊(duì)列中的所有線程 支持 支持

Condition的使用方式

Condition接口中定義了等待、通知兩種類型的方法,具體如下圖:

Condition等待、通知方法.png

我們知道Condition是由Lock.newCondition()創(chuàng)建來的,也就是說condition是依賴于Lock對(duì)象的。在調(diào)用上圖的方法時(shí),必須先獲取到Condition對(duì)象關(guān)聯(lián)的鎖。Condition的使用方式如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionTest {

    public static final Lock lock = new ReentrantLock();
    public static final Condition condition = lock.newCondition();

    public static void conditionWait() throws InterruptedException{
        lock.lock();
        try {
            System.out.println(System.currentTimeMillis() + "  :" +Thread.currentThread().getName()+" 調(diào)用await!");
            condition.await();
            System.out.println(System.currentTimeMillis() + "  :" +Thread.currentThread().getName()+" 被喚醒,重新獲得了鎖!");
        } finally {
            lock.unlock();
            System.out.println(System.currentTimeMillis() + "  :" + Thread.currentThread().getName()+" 釋放了鎖!");
        }
    }

    public static void conditionSignal() throws InterruptedException{
        lock.lock();
        try {
            System.out.println(System.currentTimeMillis() + "  :" +Thread.currentThread().getName()+" 調(diào)用signal方法!");
            condition.signal();
            System.out.println(System.currentTimeMillis() + "  :" +Thread.currentThread().getName()+" 調(diào)用了signal方法!");
        } finally {
            lock.unlock();
            System.out.println(System.currentTimeMillis() + "  :" +Thread.currentThread().getName()+" 釋放了鎖!");
        }
    }

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(System.currentTimeMillis() + "  :" +Thread.currentThread().getName()+" 開始執(zhí)行!");
                    conditionWait();
                    System.out.println(System.currentTimeMillis() + "  :" +Thread.currentThread().getName()+" 結(jié)束執(zhí)行!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(System.currentTimeMillis() + "  :" +Thread.currentThread().getName()+" 開始執(zhí)行!");
                    conditionSignal();
                    System.out.println(System.currentTimeMillis() + "  :" +Thread.currentThread().getName()+" 結(jié)束執(zhí)行!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }


}


一般都會(huì)將Condition對(duì)象作為成員變量。當(dāng)調(diào)用await()方法后,當(dāng)前線程會(huì)釋放鎖進(jìn)入等待狀態(tài),而其他線程在獲取鎖,進(jìn)行自己的業(yè)務(wù)邏輯后調(diào)用了Condition對(duì)象的Signal()方法,通知當(dāng)前線程后,當(dāng)前線程才從await()方法返回,并在返回前已經(jīng)獲取了鎖。
關(guān)于Condition方法的描述如下:

  1. void await() 當(dāng)前線程進(jìn)入等待狀態(tài),直到被通知(signal/signalAll)或中斷(其他線程調(diào)用interrupt()方法),當(dāng)前線程將進(jìn)入運(yùn)行狀態(tài)且從await()方法返回,此時(shí)當(dāng)前線程已經(jīng)獲取到了對(duì)應(yīng)鎖。
  2. void awaitUninterruptibly() 當(dāng)前線程進(jìn)入等待狀態(tài),直到被通知,但是該方法不響應(yīng)中斷
  3. long awaitNanos(long nanosTimeout) 當(dāng)前線程進(jìn)入等待狀態(tài),直到被通知、中斷或者超時(shí)。返回值表示剩余時(shí)間,如果在nanosTimeout納秒之前被喚醒,那么返回值為(nanosTimeout - 實(shí)際耗時(shí))。返回值為0或負(fù)值,則表示已經(jīng)超時(shí)了。
  4. boolean awaitUntil(Date deadline) 當(dāng)前線程進(jìn)入等待狀態(tài),直到被通知、中斷或者某個(gè)時(shí)間。如果沒有到達(dá)指定時(shí)間就被通知,則返回true;否則返回false。
  5. void signal() 喚醒一個(gè)等待在Condition上的線程,該線程從等待方法返回前必須獲得與Condition相關(guān)的鎖;
  6. void signalAll() 喚醒所有等待在Condition上的線程,能夠從等待方法返回的線程必須獲得與Condition相關(guān)聯(lián)的鎖;

Condition實(shí)現(xiàn)分析

上面我們說到Condition是由Lock.newCondition()方法創(chuàng)建出來的,而查看ReentrantLock中的源碼,可以看到,newCondition()方法實(shí)際上會(huì)new一個(gè)ConditionObject對(duì)象。具體如下:

final ConditionObject newCondition() {
        return new ConditionObject();
    }

ConditionObject對(duì)象是AQS的一個(gè)內(nèi)部類,之前說到Condition是依賴于Lock來使用的,那么ConditionObject是AQS的內(nèi)部類也順理成章了。每一個(gè)Condition對(duì)象都維護(hù)者一個(gè)隊(duì)列,即等待隊(duì)列,該隊(duì)列是Condition實(shí)現(xiàn)等待/通知機(jī)制的關(guān)鍵。等待隊(duì)列是一個(gè)FIFO的隊(duì)列,在等待隊(duì)列的每一個(gè)節(jié)點(diǎn)都包含了一個(gè)線程引用,如果一個(gè)線程調(diào)用condition.await()方法,那么該線程將會(huì)釋放鎖、構(gòu)造成節(jié)點(diǎn)加入到等待隊(duì)列并進(jìn)入等待狀態(tài)。這里說到的節(jié)點(diǎn)Node其實(shí)與之前AQS中提到的Node是同一個(gè)內(nèi)部類AbstractQueuedSynchronizer.Node。
這里我們還需注意ConditionObject中包含兩個(gè)成員變量:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

分別表示Condition的頭尾指針,還有Node中還有一個(gè)屬性是需要大家注意的Node nextWaiter,nextWaiter表示等待隊(duì)列中的后繼節(jié)點(diǎn),而Node中關(guān)于同步隊(duì)列的相關(guān)屬性卻有兩個(gè):prev和next。那么由此我們可以判斷等待隊(duì)列是一個(gè)單向隊(duì)列,每個(gè)節(jié)點(diǎn)只保存其后一個(gè)節(jié)點(diǎn)的引用。而等待隊(duì)列的基本結(jié)構(gòu)則如下圖:

Condition等待隊(duì)列結(jié)構(gòu).png

如上圖所示,Condition擁有首節(jié)點(diǎn)的引用,而新增節(jié)點(diǎn)只需要將原尾節(jié)點(diǎn)的nextWaiter指向它,并更新尾節(jié)點(diǎn)即可。需要注意的是節(jié)點(diǎn)更新的過程是沒有使用CAS方法的,原因是調(diào)用await 方法的線程必定獲取了鎖。我們可以不止一次的調(diào)用lock.newCondition方法,這說明AQS中不止維護(hù)了一個(gè)等待隊(duì)列。object監(jiān)視器上只能擁有一個(gè)同步隊(duì)列和一個(gè)等待隊(duì)列,而AQS卻擁有一個(gè)同步隊(duì)列,多個(gè)等待隊(duì)列。具體如下圖:

AQS一個(gè)同步隊(duì)列多個(gè)等待隊(duì)列.png

如上圖所示,Condition的實(shí)現(xiàn)是同步器的內(nèi)部類,因此每個(gè)Condition實(shí)例都能夠訪問同步器提供的方法,相當(dāng)于每個(gè)Condition都擁有同步器的引用。

condition.await()方法

廢話不多說,直接擼源碼:

public final void await() throws InterruptedException {
    //如果線程被中斷,那么拋出中斷異常
    if (Thread.interrupted())
        throw new InterruptedException();
    //將線程構(gòu)建成Node節(jié)點(diǎn),并加入等待隊(duì)列
    Node node = addConditionWaiter();
    //釋放當(dāng)前線程所占用的鎖,并喚醒同步隊(duì)列中的下一個(gè)節(jié)點(diǎn)
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        //當(dāng)前線程進(jìn)入等待狀態(tài)
        LockSupport.park(this);
        //判斷是否被中斷
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //自旋等待獲取同步狀態(tài)
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    //處理被中斷狀態(tài)
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

當(dāng)前線程調(diào)用condition.await()方法后,會(huì)使得當(dāng)前線程釋放鎖并進(jìn)入等待隊(duì)列中,直到被signal/signalAll方法喚醒后會(huì)使當(dāng)前線程從等待隊(duì)列移至同步隊(duì)列中去,知道獲取鎖后返回,或者在等待過程中被中斷做中斷處理。那么這中間的細(xì)節(jié)是如何處理的呢?當(dāng)前線程是如何加入等待隊(duì)列中的?又是怎么釋放鎖的呢?釋放之后await方法如何退出呢?這些我們都還不清楚,下面我們來仔細(xì)分析下源碼中調(diào)用的幾個(gè)方法。

private Node addConditionWaiter() {
    //獲取尾節(jié)點(diǎn)指針
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    //如果尾節(jié)點(diǎn)不為null,并且尾節(jié)點(diǎn)等待狀態(tài)不是CONDITION,那么刪除等待隊(duì)列中所有非CONDITION狀態(tài)的節(jié)點(diǎn)
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        //重新獲取尾節(jié)點(diǎn)
        t = lastWaiter;
    }
    //將當(dāng)前線程構(gòu)建成節(jié)點(diǎn)
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    //如果尾節(jié)點(diǎn)為空,則將頭結(jié)點(diǎn)指針指向當(dāng)前節(jié)點(diǎn),否則將尾節(jié)點(diǎn)的后繼節(jié)點(diǎn)指向當(dāng)前節(jié)點(diǎn)
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    //然后將尾節(jié)點(diǎn)指針指向當(dāng)前節(jié)點(diǎn)
    lastWaiter = node;
    return node;
}

從上面這段代碼可以看到,該方法將當(dāng)前線程構(gòu)建成節(jié)點(diǎn),判斷頭結(jié)點(diǎn)firstWaiter是否為空,如果為空,則將firstWaiter指向當(dāng)前節(jié)點(diǎn),如果不為空,則更新尾節(jié)點(diǎn)。這就解決了如何加入等待隊(duì)列的問題,下面由fullRelease方法來釋放鎖,具體源碼如下;

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        //嘗試釋放鎖,并喚醒同步隊(duì)列中的下一個(gè)節(jié)點(diǎn)
        if (release(savedState)) {
            //成功則返回同步狀態(tài)
            failed = false;
            return savedState;
        } else {    
            //不成功拋出異常
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}


這段代碼就很容易理解了,調(diào)用AQS中的release()方法釋放鎖,并喚醒同步隊(duì)列中頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)引用的線程,如果釋放成功則正常返回,釋放失敗則拋出異常。然后在回到await()方法的源碼中,發(fā)現(xiàn)以上方法調(diào)用完后有這么一段邏輯:


while (!isOnSyncQueue(node)) {
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
}


final boolean isOnSyncQueue(Node node) {
    //如果當(dāng)前節(jié)點(diǎn)為等待狀態(tài),或前置節(jié)點(diǎn)為空,那么返回false
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //如果當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)next不為空,這說明在同步隊(duì)列中,返回true
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    //再同步隊(duì)列中尋找當(dāng)前節(jié)點(diǎn),找到返回true,未找到返回false
    return findNodeFromTail(node);
}

很顯然要想退出await方法,需要先跳出該循環(huán)。而從代碼中可以看出跳出循環(huán)的方法兩種:1、!isOnSyncQueue(node)返回false;2、(interruptMode = checkInterruptWhileWaiting(node)) != 0等于true。從上面的源碼可以看出isOnSyncQueue(node)方法,用來判斷當(dāng)前節(jié)點(diǎn)是否在同步隊(duì)列中,即另外線程調(diào)用signal/signalAll方法。第二個(gè)條件判斷當(dāng)前線程是否被中斷。
總結(jié)為:退出await方法的前提條件是當(dāng)前線程被中斷或其他線程調(diào)用signal/signalAll方法將當(dāng)前線程移動(dòng)到同步隊(duì)列中。當(dāng)跳出while循環(huán)后,會(huì)繼續(xù)調(diào)用acquireQueued(node, savedState)方法,自旋獲取同步狀態(tài),直到成功,這樣說明了要跳出await方法必須要獲得鎖。到這里我們已經(jīng)解決了上面提出的疑問,對(duì)await方法也理解的更加透徹了。下面是await方法的示意圖:

condition.await()示意圖.png

signal/signalAll

調(diào)用Condition的signal()和signalAll()方法,將會(huì)喚醒等待隊(duì)列中等待時(shí)間最長(zhǎng)的節(jié)點(diǎn)(即首節(jié)點(diǎn)),在喚醒節(jié)點(diǎn)之前,會(huì)將節(jié)點(diǎn)移動(dòng)到同步隊(duì)列中。下面先看下Signal()方法的源碼:

public final void signal() {
    //先判斷當(dāng)前線程是否獲取到了鎖,沒有的話,拋出異常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        //喚醒等待隊(duì)列中的頭結(jié)點(diǎn)
        doSignal(first);
}

從上面代碼可以看出,首先會(huì)判斷當(dāng)前線程是否獲取到了鎖,如果沒有獲取到,則會(huì)拋出異常。如果獲取到了鎖,那么先拿到等待隊(duì)列的頭指針引用的節(jié)點(diǎn),之后喚醒等待隊(duì)列中的頭結(jié)點(diǎn),具體細(xì)節(jié)在doSignal(first)方法中,具體看下源碼;

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        //將頭結(jié)點(diǎn)從等待隊(duì)列中移除
        first.nextWaiter = null;
        //對(duì)頭結(jié)點(diǎn)做處理的部分在transferForSignal(first)中
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

    final boolean transferForSignal(Node node) {
        使用CAS將等待狀態(tài)改為0,如果失敗返回false
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        //將節(jié)點(diǎn)移入同步隊(duì)列
        Node p = enq(node);
        int ws = p.waitStatus;
        //如果該節(jié)點(diǎn)等待狀態(tài)>0或者嘗試修改等待狀態(tài)為SIGNAL失敗,則喚醒該節(jié)點(diǎn)對(duì)應(yīng)的線程,返回true
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

閱讀源碼,能夠發(fā)現(xiàn),doSignal主要做了一下幾件事:1、將頭結(jié)點(diǎn)從等待隊(duì)列移除;2、將頭結(jié)點(diǎn)狀態(tài)由CONDITION改為0,即初始狀態(tài);3、將節(jié)點(diǎn)從同步隊(duì)列尾部插入;4、喚醒該節(jié)點(diǎn)。由此我們可以得出結(jié)論:調(diào)用Condition.signal()方法的前提是當(dāng)前線程已經(jīng)獲取到了鎖,該方法會(huì)將等待隊(duì)列中的頭結(jié)點(diǎn)移除并從同步隊(duì)列的尾節(jié)點(diǎn)插入,并喚醒當(dāng)前節(jié)點(diǎn)對(duì)應(yīng)的線程。

condition.signal()方法示意圖.png

signalAll()

signalAll()方法與signal()方法的區(qū)別僅僅體現(xiàn)在doSignal和doSignalAll方法上,我們看下doSignalAll方法的源碼:

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

可以看到這里的區(qū)別就是,doSignalAll會(huì)將等待隊(duì)列中的所有節(jié)點(diǎn)都移動(dòng)到同步隊(duì)列中,并喚醒全部對(duì)應(yīng)節(jié)點(diǎn)的線程。

總結(jié)

下面是我自己總結(jié)的關(guān)于condition.await方法和signal方法的運(yùn)行流程圖:

condition.await流程圖.png
condition.signal流程圖.png

如果有什么問題的話,歡迎大家留言指正,謝謝!

注:本文參考《Java并發(fā)編程的藝術(shù)》

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容