Java阻塞隊列源碼(1)-ArrayBlockingQueue

Java并發(fā)包下有個BlockingQueue接口,并提供了多種阻塞隊列的實現(xiàn)方式。阻塞隊列通常被用于生產(chǎn)者消費者模型、消息隊列、并行任務等并發(fā)場景,并通過內(nèi)部的鎖和并發(fā)控制實現(xiàn)線程安全。這個系列將分析其中多種實現(xiàn)的源碼,了解阻塞隊列的實現(xiàn)細節(jié),從而能夠根據(jù)使用場景的不同選擇最適合的阻塞隊列實現(xiàn)類。

并發(fā)包下關于阻塞隊列的接口和實現(xiàn)如下所示:

接口與實現(xiàn)類

BlockingQueue接口

從上面的關系圖可以看出,所有的實現(xiàn)類或者接口都派生自BlockingQueue接口,該接口共定義了11個方法,除此之外,它還繼承自接口Queue。

public interface BlockingQueue<E> extends Queue<E> {
    // 非阻塞添加指定元素到隊列中,失敗拋出異常
    boolean add(E e);
    // 非阻塞添加指定元素,失敗返回false
    boolean offer(E e);
    // 阻塞添加
    void put(E e) throws InterruptedException;
    // 阻塞讀取并刪除第一個元素
    E take() throws InterruptedException;
    // 阻塞一段時間內(nèi)添加
    boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
    // 阻塞一段時間讀取并刪除第一個元素
    E poll(long timeout, TimeUnit unit) throws InterruptedException;
    // 隊列剩余容量
    int remainingCapacity();
    // 刪除指定元素(if exist)
    boolean remove(Object o);
    public boolean contains(Object o);
    // 刪除當前隊列所有元素,并添加到新集合中
    int drainTo(Collection<? super E> c);
    int drainTo(Collection<? super E> c, int maxElements);
}

ArrayBlockingQueue類

系列的第一篇文章將分析ArrayBlockingQueue的源碼,首先看下其構(gòu)造函數(shù)。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

// 構(gòu)造函數(shù)共有三個,最終都會調(diào)用該兩參數(shù)的構(gòu)造函數(shù)
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
    // final Object[] items;
    // 保存隊列元素的數(shù)組
        this.items = new Object[capacity];
    // 初始化內(nèi)部鎖,根據(jù)fair決定是否是公平鎖,會影響多個線程讀寫順序   
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

    public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }
// 初始化阻塞隊列,并將傳入的集合添加到隊列
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);
        // ...省略
    }
}

生產(chǎn)者消費者

阻塞隊列的一個使用場景是生產(chǎn)者消費者模式,那么現(xiàn)在舉一個生產(chǎn)者消費者的代碼demo來展示如何使用阻塞隊列,然后進一步分析其中用到的方法的源碼。

class Producer implements Runnable {
    private final BlockingQueue queue;
    Producer(BlockingQueue q) { queue = q; }
    public void run() {
        try {
        // 生產(chǎn)者通過put方法不斷向隊列添加元素
            while (true) { queue.put(produce()); }
        } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

class Consumer implements Runnable {
    private final BlockingQueue queue;
    Consumer(BlockingQueue q) { queue = q; }
    public void run() {
        try {
        // 消費者通過take方法不斷向隊列讀取元素
            while (true) { consume(queue.take()); }
        } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
}

class Setup {
// demo中有多個生產(chǎn)者和消費者線程,阻塞隊列能夠保證線程安全    
    void main() {
        BlockingQueue q = new SomeQueueImplementation();
        Producer p = new Producer(q);
        Consumer c1 = new Consumer(q);
        Consumer c2 = new Consumer(q);
        new Thread(p).start();
        new Thread(c1).start();
        new Thread(c2).start();
    }
}

put方法與take方法

生產(chǎn)者消費者模型中,首先需要生產(chǎn)者添加元素,否則隊列中沒有元素,消費者無法執(zhí)行,添加元素的操作通過put方法來執(zhí)行,消費元素則通過take方法,下面一一分析。

public void put(E e) throws InterruptedException {
//禁止插入null值,null值用于無可讀元素時的讀取失敗標志    
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
//該獲取鎖的方法,即使在阻塞獲取鎖時(即還沒拿到鎖)也會響應其他線程調(diào)用該線程的interrupt方法,拋出InterruptedException異常
    lock.lockInterruptibly();
    try {
// int count;隊列中元素個數(shù)
// 如果隊列滿了,通過await方法阻塞等待,即使不往下看也能想到take或其他讀取元素的方法中必然有個地方會調(diào)用notFull.signal()方法來喚醒當前線程
        while (count == items.length)
            notFull.await();
        //插入元素e    
        enqueue(e);
    } finally {
        lock.unlock();
    }
}
//該方法用于在putIndex處插入元素,只在擁有鎖的情況下被調(diào)用
private void enqueue(E x) {
    //獲取存儲元素的數(shù)組
    final Object[] items = this.items;
//int putIndex;下一個待添加元素的下標,初始為0
    items[putIndex] = x;
    //防止越界,效果類似于循環(huán)數(shù)組
    if (++putIndex == items.length)
        putIndex = 0;
    //隊列中元素個數(shù)加一    
    count++;
    //喚醒notEmpty.await()
    notEmpty.signal();
}

put方法通過鎖來實現(xiàn)線程安全,同時通過notFullnotEmpty兩個方法來進行線程間控制,take方法也大致如此,下面看下take方法

public E take() throws InterruptedException {
// 獲取鎖,保證讀取的安全性    
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
    //判斷隊列是否為空,為空阻塞等待
    //在上面分析的enqueue方法的最后會調(diào)用notEmpty.signal來喚醒    
        while (count == 0)
            notEmpty.await();
    //如果隊列不為空,退出循環(huán),通過dequeue()返回讀取的元素
        return dequeue();
    } finally {
        lock.unlock();
    }
}
// dequeue方法取出takeIndex下標處的元素,只在鎖內(nèi)被調(diào)用
private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
// items index for next take, poll, peek or remove
// int takeIndex;初始化為0
//讀取takeIndex處的元素,并刪除數(shù)組中對其引用    
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    //防止越界
    if (++takeIndex == items.length)
        takeIndex = 0;
    //隊列元素數(shù)量減一
    count--;
//itrs同于與當前正活躍的迭代器共享狀態(tài),如果存在,通知其一起刪除該元素
    if (itrs != null)
        itrs.elementDequeued();
    //喚醒阻塞的put方法
    notFull.signal();
    return x;
}

offer()與poll()方法

上面介紹的一對讀寫方法是無限阻塞的,除非有其他線程調(diào)用signal方法來喚醒當前線程。阻塞隊列中還提供了幾個可以指定阻塞時間的讀寫方法

//offer方法可以指定阻塞時間,除此之外與put不同的是offer方法會返回true或者false
//而put方法正常情況下一直阻塞,二者都會拋出中斷異常
public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
    checkNotNull(e);//禁止插入null元素
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
    //邏輯與Put方法幾乎一樣,除了增加了阻塞時間    
        while (count == items.length) {
            if (nanos <= 0)
                return false;
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(e);
        return true;
    } finally {
        lock.unlock();
    }
}
//poll方法的邏輯也非常簡單,不過多介紹,與take相似
//不同點在與在阻塞時間到達后,poll方法會返回null值
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

同時poll方法和offer方法還有一種重載實現(xiàn),提供了立即返回結(jié)果的功能,代碼如下

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
    //在阻塞的方式中,會通過while循環(huán)和await方法的配合來實現(xiàn)阻塞
    //但在該方法中,會立刻返回結(jié)果    
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}
//邏輯相似,理解返回結(jié)果,不阻塞
public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

現(xiàn)在已經(jīng)分析了6個常用方法了,接下來把前面接口中出現(xiàn)的方法依次分析一下,可以看出整個該實現(xiàn)類的源碼比較簡單

add()方法

//add方法直接調(diào)用了繼承的抽象類中的add方法
public boolean add(E e) {
    return super.add(e);
}
//AbstractQueue中定義的add方法
public boolean add(E e) {
//借用了offer方法的實現(xiàn),offer方法的實現(xiàn)在具體實現(xiàn)類中,上面介紹過
//add方法與另外兩個添加元素方法的不同點在于,添加失敗時拋出IllegalStateException
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

remove()方法

remove方法用于在隊列中刪除指定元素,因為是隨機的,與隊列的FIFO的特性不符,性能較差。

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //判斷隊列中是否有元素
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
        //遍歷判斷是否相等,然后調(diào)用removeAt刪除    
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}

void removeAt(final int removeIndex) {
    final Object[] items = this.items;
    //如果要刪除的元素正好是正常情況下的那個元素,即takeIndex
    //刪除后修改takeIndex并將count減一
    if (removeIndex == takeIndex) {
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
    } else {//要刪除的不是takeIndex處的元素
        final int putIndex = this.putIndex;
        //將i+1到putIndex處元素向前移動一個位置,并更新putIndex
        for (int i = removeIndex;;) {
            int next = i + 1;
            if (next == items.length)
                next = 0;
            if (next != putIndex) {
                items[i] = items[next];
                i = next;
            } else {
                items[i] = null;
                this.putIndex = i;
                break;
            }
        }
        //隊列元素數(shù)量減一,并刪除迭代器中的該元素
        count--;
        if (itrs != null)
            itrs.removedAt(removeIndex);
    }
    //喚醒阻塞的notFull.await
    notFull.signal();
}

其他方法匯總

除了上面介紹的幾個方法,還有一些比較簡單的方法,統(tǒng)一放在這一小節(jié)介紹,相互之間關聯(lián)較小,可以看做是多個獨立的方法。

//讀取隊列的第一個元素,但不刪除
//加鎖后讀取數(shù)組中takeIndex的下標處
//在多線程中,讀取的操作也都需要通過加鎖操作進行,否則會讀取到不正確的值
public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}
final E itemAt(int i) {
    return (E) items[i];
}

//查看隊列剩余容量
//count變量維護著隊列中元素的個數(shù)
public int remainingCapacity() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return items.length - count;
    } finally {
        lock.unlock();
    }
}
//還有比如size(),clear()等方法,都比較簡單,把代碼粘貼進來就能看懂,不浪費篇幅了

Itr與Itrs介紹

在上面的代碼分析中,多次出現(xiàn)了如下所示的代碼片段

if (itrs != null)
    itrs.elementDequeued();

這一節(jié)就介紹下這個對象是什么,起到了什么作用。ItrsArrayBlockingQueue中的一個內(nèi)部類,itrs則為其一個成員變量。初始化時為null,transient Itrs itrs = null;

源碼中關于Itrs的描述截取如下

  • Shared data between iterators and their queue, allowing queue modifications to update iterators when elements are removed.

該對象在迭代器即阻塞隊列之間共享了數(shù)據(jù),在隊列刪除元素時會更新迭代器。

構(gòu)造迭代器的方法如下:

public Iterator<E> iterator() {
    //這里是Itr不是Itrs
    return new Itr();
}

然后進入Itr的構(gòu)造函數(shù)中看下:

Itr() {
    lastRet = NONE;
    final ReentrantLock lock = ArrayBlockingQueue.this.lock;
    lock.lock();
    try {
        //隊列中沒有元素
        if (count == 0) {
            cursor = NONE;
            nextIndex = NONE;
            prevTakeIndex = DETACHED;
        } else {
            final int takeIndex = ArrayBlockingQueue.this.takeIndex;
            prevTakeIndex = takeIndex;
            nextItem = itemAt(nextIndex = takeIndex);
            cursor = incCursor(takeIndex);
            if (itrs == null) {
        //在這里會構(gòu)造一個Itrs對象,并賦值給ArrayBlockingQueue中的itrs對象
        //而Itrs內(nèi)部類似個鏈表,用于迭代   
                itrs = new Itrs(this);
            } else {
                itrs.register(this); // in this order
                itrs.doSomeSweeping(false);
            }
            prevCycles = itrs.cycles;
        }
    } finally {
        lock.unlock();
    }
}

總結(jié)

ArrayBlockingQueue的源碼還是比較簡單的,所有的讀寫操作通過內(nèi)部的同一把ReentrantLock鎖來控制,在隊列滿或者隊列空時通過兩個Condition來進行通信。隊列通過一個FIFO的環(huán)形數(shù)組來實現(xiàn),維護了takeIndexputIndex等變量來決定插入和讀取的元素位置。

瞎聊

過去幾個月因為是畢業(yè)季,時間都被我玩掉了,沒有好好的看源碼,也沒有寫博客,今天放假沒事做終于是補了一篇。寫阻塞隊列是因為上一篇介紹線程池原理的源碼中用到了阻塞隊列,所以順著這個思路準備把阻塞隊列的多個實現(xiàn)類的源碼都看一遍。

寫之前其實沒看過ArrayBlockingQueue的源碼,是一邊看一邊寫出來的。寫著寫著發(fā)現(xiàn)比之前幾篇源碼分析簡單多了,但是開弓沒有回頭箭,既然寫了,內(nèi)容不多也發(fā)出來吧。

現(xiàn)在寫博客、公眾號的人很多,大家的標題都起的很夸張、奇特來試圖吸引人,讓讀者有點進去的欲望,增加更多的粉絲。忘記哪一天我突然覺得這不是一個很恰當?shù)睦碛?,有些太功利了,我寫博客應該是為了積累,為了記錄,也許以后自己可以復習;寫博客不是為了寫給別人看,當然有人因此得到幫助或者因此與我交流是一件順帶的好事,沒有也不用強求。所以不必費盡心思去想些新穎的題目,不用關心語言是否幽默生動,想寫啥寫啥~

歡迎關注我的公眾號

歡迎關注我的公眾號,會經(jīng)常分享一些技術文章和生活隨筆~

今天公眾號改名啦,改成飛坤呀

飛坤呀
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容