DelayQueue

offer

public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 插入到隊(duì)列里面
        q.offer(e);
        // 如果入隊(duì)的元素排在隊(duì)首,說明該元素優(yōu)先級(jí)最高,也就是最快過期的。
        // 為什么一定要跟入隊(duì)一致呢?
        if (q.peek() == e) {
            leader = null;
            // 喚醒a(bǔ)vailable,表示隊(duì)列里面有東西可用了
            available.signal();
        }
        return true;
    } finally {
        lock.unlock();
    }
}

take

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        // 自旋
        for (;;) {
            // 拿到頭節(jié)點(diǎn)
            E first = q.peek();
            if (first == null)
                // 如果為空,一直等到頭節(jié)點(diǎn)到來為止
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                // 如果已經(jīng)超時(shí),那么出棧頭節(jié)點(diǎn)
                if (delay <= 0)
                    return q.poll();
                // 執(zhí)行到這里說明隊(duì)列不為空,且頭節(jié)點(diǎn)并沒有到期
                first = null; // don't retain ref while waiting
                // 如果leader線程存在,那么除了leader線程外,都無條件無限期等待available被喚醒
                if (leader != null)
                    available.await();
                else {
                    // 如果不存在leader,那么將當(dāng)前線程設(shè)置為leader線程
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        // 且等待頭節(jié)點(diǎn)超時(shí),好再次自旋后返回該頭節(jié)點(diǎn)
                        available.awaitNanos(delay);
                    } finally {
                        // 這里大部分是頭節(jié)點(diǎn)到期,被超時(shí)喚醒。如果leader沒變,那么先將leader置空
                        // 因?yàn)樵俅巫孕髸?huì)返回頭節(jié)點(diǎn),那么leader也就完成任務(wù)了。
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        // 到這里說明上個(gè)leader線程所等待的頭節(jié)點(diǎn)已經(jīng)返回,leader已經(jīng)重置
        // 這個(gè)時(shí)候還有其他線程被await中,leader完成任務(wù)后,需要去喚醒其他線程
        // 被喚醒的線程會(huì)進(jìn)入新的輪詢之中,并將自己設(shè)置為新的leader,并等待頭節(jié)點(diǎn)到期
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

poll

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        E first = q.peek();
        // 如果頭節(jié)點(diǎn)為空或頭節(jié)點(diǎn)還沒有到期,返回null
        if (first == null || first.getDelay(NANOSECONDS) > 0)
            return null;
        else
            return q.poll();
    } finally {
        lock.unlock();
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容