JUC阻塞隊(duì)列之DelayQueue源碼分析

DelayQueue是一個(gè)支持延時(shí)獲取元素的無界阻塞隊(duì)列。并且隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口。在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素。只有在延遲期滿時(shí)才能從隊(duì)列中獲取到元素。DelayQueue的應(yīng)用范圍非常廣闊,如可以用它來保存緩存中元素的有效期,也可用它來實(shí)現(xiàn)定時(shí)任務(wù)。

Delayed接口

在分析DelayQueue源碼之前,我們先來看看Delayd接口,其源碼定義如下:

public interface Delayed extends Comparable < Delayed > {

    /**
     * 指定返回對(duì)象的延時(shí)時(shí)間
     * @param  unit [時(shí)間單位]
     * @return      [延時(shí)的剩余,0或者-1表示延時(shí)已經(jīng)過期]
     */
    long getDelay(TimeUnit unit);
}

我們看到,Delayed接口繼承了Comparable接口,即實(shí)現(xiàn)Delayed接口的對(duì)象必須實(shí)現(xiàn)getDelay(TimeUnit unit)方法和compareTo(T o)方法。這里compareTo(T o)方法可以用來實(shí)現(xiàn)元素的排序,可以將延時(shí)時(shí)間長的放到隊(duì)列的末尾。

DelayQueue構(gòu)造函數(shù)

上面分析了Delayed接口,接下來我們分析DelayQueue的構(gòu)造函數(shù)。DelayQueue提供了2種構(gòu)造函數(shù),一個(gè)是無參構(gòu)造函數(shù),一個(gè)是給定集合為參數(shù)的構(gòu)造函數(shù)。其源碼如下:

/**
 * 構(gòu)建一個(gè)空的DelayQueue
 */
public DelayQueue() {}

/**
 * 給定集合c為參數(shù)的構(gòu)造函數(shù)
 * 將集合c中的元素全部放入到DelayQueue中
 */
public DelayQueue(Collection < ? extends E > c) {
    this.addAll(c);
}

addAll方法是AbstractQueue抽象類中的方法,其源碼如下:

public boolean addAll(Collection < ? extends E > c) {
    // 參數(shù)檢測
    if (c == null)
        throw new NullPointerException();
    if (c == this)
        throw new IllegalArgumentException();
    boolean modified = false;
    //遍歷集合c中的元素
    for (E e: c)
        // 調(diào)用DelayQueue中的add方法
        if (add(e))
            modified = true;
    return modified;
}

從上面的源碼中,我們可以看到,AbstractQueue抽象類中addAll方法實(shí)際是調(diào)用DelayQueue類中的add方法來實(shí)現(xiàn)的。

DelayQueue 入列操作

DelayQueue提供了4中入列操作,分別是:

  • add(E e):阻塞的將制定元素添加到延時(shí)隊(duì)列中去,因?yàn)殛?duì)列是無界的因此此方法永不阻塞。
  • offer(E e):阻塞的將制定元素添加到延時(shí)隊(duì)列中去,因?yàn)殛?duì)列是無界的因此此方法永不阻塞。
  • put(E e):阻塞的將制定元素添加到延時(shí)隊(duì)列中去,因?yàn)殛?duì)列是無界的因此此方法永不阻塞。
  • offer(E e, long timeout, TimeUnit unit):阻塞的將制定元素添加到延時(shí)隊(duì)列中去,因?yàn)殛?duì)列是無界的因此此方法永不阻塞。

這里大家可能會(huì)奇怪,為什么這些入列方法的解釋都是一樣的?這個(gè)問題先等下回答,我們先來看看這幾個(gè)入列方法的源碼定義:

public boolean add(E e) {
    return offer(e);
}

public boolean offer(E e) {
    //獲取可重入鎖
    final ReentrantLock lock = this.lock;
    //加鎖
    lock.lock();
    try {
        //調(diào)用PriorityQueue中的offer方法
        q.offer(e);
        //調(diào)用PriorityQueue中的peek方法
        if (q.peek() == e) {
            leader = null;
            available.signal();
        }
        return true;
    } finally {
        //釋放鎖
        lock.unlock();
    }
}

public void put(E e) {
    offer(e);
}

public boolean offer(E e, long timeout, TimeUnit unit) {
    return offer(e);
}

這里我們從源碼中可以看到,add(E e)方法、put(E e)方法和offer(E e,long timeout,TimeUnit unit)方法都是調(diào)用offer(E e)方法來實(shí)現(xiàn)的,這也是為什么這幾個(gè)方法的解釋都是一樣的原因。其中offer(E e)方法的核心又是調(diào)用了PriorityQueue中的offer(E e)方法,PriorityQueue和PriorityBlockingQueue都是以二叉堆的無界隊(duì)列,只不過PriorityQueue不是阻塞的而PriorityBlockingQueue是阻塞的。

DelayQueue出列操作

DelayQueue提供了3中出列操作方法,它們分別是:

  • poll():檢索并刪除此隊(duì)列的開頭,如果此隊(duì)列沒有延遲延遲的元素,則返回null
  • take():檢索并除去此隊(duì)列的頭,如有必要,請(qǐng)等待直到該隊(duì)列上具有過期延遲的元素可用。
  • poll(long timeout, TimeUnit unit):檢索并刪除此隊(duì)列的頭,如有必要,請(qǐng)等待直到該隊(duì)列上具有過期延遲的元素可用,或者或指定的等待時(shí)間到期。

下面我們來一個(gè)一個(gè)分析出列操作的原來。

poll():

poll操作的源碼定義如下:

public E poll() {
   //獲取可重入鎖
    final ReentrantLock lock = this.lock;
    //加鎖
    lock.lock();
    try {
       //獲取隊(duì)列中的第一個(gè)元素
        E first = q.peek();
        //若果元素為null,或者頭元素還未過期,則返回false
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
           //調(diào)用PriorityQueue中的出列方法
            return q.poll();
    } finally {
        lock.unlock();
    }
}

該方法與PriorityQueue的poll方法唯一的區(qū)別就是多了if (first == null || first.getDelay(NANOSECONDS) > 0)這個(gè)條件判斷,該條件是表示如果隊(duì)列中沒有元素或者隊(duì)列中的元素未過期,則返回null。

take

take操作源碼定義如下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    //加鎖
    lock.lockInterruptibly();
    try {
        //西循環(huán)
        for (;;) {
            //查看隊(duì)列頭元素
            E first = q.peek();
            //如果隊(duì)列頭元素為null,則表示隊(duì)列中沒有數(shù)據(jù),線程進(jìn)入等待隊(duì)列
            if (first == null)
                available.await();
            else {
                // 獲取first元素剩余的延時(shí)時(shí)間
                long delay = first.getDelay(NANOSECONDS);
                //若果剩余延時(shí)時(shí)間<=0 表示元素已經(jīng)過期,可以從隊(duì)列中獲取元素
                if (delay <= 0)
                    //直接返回頭部元素
                    return q.poll();
                //若果剩余延時(shí)時(shí)間>0,表示元素還未過期,則將first置為null,防止內(nèi)存溢出
                first = null; // don't retain ref while waiting
                //如果leader不為null,則直接進(jìn)入等待隊(duì)列中等待
                if (leader != null)
                    available.await();
                else {
                    //若果leader為null,則把當(dāng)前線程賦值給leader,并超時(shí)等待delay納秒
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            //喚醒線程
            available.signal();
        lock.unlock();
    }
}

take操作比poll操作稍微要復(fù)雜些,但是邏輯還是相對(duì)比較簡單。只是在獲取元素的時(shí)候先檢查元素的剩余延時(shí)時(shí)間,如果剩余延時(shí)時(shí)間<=0,則直接返回隊(duì)列頭元素。如果剩余延時(shí)時(shí)間>0,則判斷l(xiāng)eader是否為null,若果leader不為null,則表示已經(jīng)有線程在等待獲取隊(duì)列的頭部元素,因此直接進(jìn)入等待隊(duì)列中等待。若果leader為null,則表示這是第一個(gè)獲取頭部元素的線程,把當(dāng)前線程賦值給leader,然后超時(shí)等待剩余延時(shí)時(shí)間。在take操作中需要注意的一點(diǎn)是fist=null,因?yàn)槿绻鹒irst不置為null的話會(huì)引起內(nèi)存溢出的異常,這是因?yàn)樵诓l(fā)的時(shí)候,每個(gè)線程都會(huì)持有一份first,因此first不會(huì)被釋放,若果線程數(shù)過多,就會(huì)導(dǎo)致內(nèi)存溢出的異常。

poll(long timeout, TimeUnit unit)

超時(shí)等待獲取隊(duì)列元素的源碼如下:

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();
            if (first == null) {
                if (nanos <= 0)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return q.poll();
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

這個(gè)出列操作的邏輯和take出列操作的邏輯幾乎一樣,唯一不同的在于take是無時(shí)間限制等待,而改操作是超時(shí)等待。

總結(jié)

DelayQueue的入列和出列操作邏輯相對(duì)比較簡單,就是在獲取元素的時(shí)候,判斷元素是否已經(jīng)過期,若果過期就可以直接獲取,沒有過期的話poll操作是直接返回null,take操作是進(jìn)入等待隊(duì)列中等待。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 轉(zhuǎn)《Java并發(fā)編程的藝術(shù)》 1.什么是阻塞隊(duì)列 阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)...
    沉淪2014閱讀 546評(píng)論 0 0
  • 阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。這兩個(gè)附加的操作是:在隊(duì)列為空時(shí),獲取元素的線...
    端木軒閱讀 1,061評(píng)論 0 2
  • 引言 JDK中除了上文提到的各種并發(fā)容器,還提供了豐富的阻塞隊(duì)列。阻塞隊(duì)列統(tǒng)一實(shí)現(xiàn)了BlockingQueue接口...
    小刀愛編程閱讀 666評(píng)論 0 0
  • 四月伊始 西南方的田野里漸鳴起了蛙聲 伴著幾聲犬吠 慵懶而隨意 邊際的光與黑夜一線分明 微風(fēng)吹動(dòng)燈火隱約搖晃 那溫...
    門前門前一棵葡萄樹閱讀 224評(píng)論 0 1
  • “嗯,我知道了?!蔽业瓚?yīng)道。 誒,這年頭哪有什么好男人啊,都說十個(gè)男人七個(gè)傻,八個(gè)呆,九個(gè)壞,...
    走過錯(cuò)過閱讀 200評(píng)論 0 1

友情鏈接更多精彩內(nèi)容