Java并發(fā)包下有個BlockingQueue接口,并提供了多種阻塞隊列的實現(xiàn)方式。阻塞隊列通常被用于生產(chǎn)者消費者模型、消息隊列、并行任務等并發(fā)場景,并通過內(nèi)部的鎖和并發(fā)控制實現(xiàn)線程安全。這個系列將分析其中多種實現(xiàn)的源碼,了解阻塞隊列的實現(xiàn)細節(jié),從而能夠根據(jù)使用場景的不同選擇最適合的阻塞隊列實現(xiàn)類。
并發(fā)包下關于阻塞隊列的接口和實現(xiàn)如下所示:
BlockingQueue接口
從上面的關系圖可以看出,所有的實現(xiàn)類或者接口都派生自BlockingQueue接口,該接口共定義了11個方法,除此之外,它還繼承自接口Queue。
public interface BlockingQueue<E> extends Queue<E> {
// 非阻塞添加指定元素到隊列中,失敗拋出異常
boolean add(E e);
// 非阻塞添加指定元素,失敗返回false
boolean offer(E e);
// 阻塞添加
void put(E e) throws InterruptedException;
// 阻塞讀取并刪除第一個元素
E take() throws InterruptedException;
// 阻塞一段時間內(nèi)添加
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 阻塞一段時間讀取并刪除第一個元素
E poll(long timeout, TimeUnit unit) throws InterruptedException;
// 隊列剩余容量
int remainingCapacity();
// 刪除指定元素(if exist)
boolean remove(Object o);
public boolean contains(Object o);
// 刪除當前隊列所有元素,并添加到新集合中
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
ArrayBlockingQueue類
系列的第一篇文章將分析ArrayBlockingQueue的源碼,首先看下其構(gòu)造函數(shù)。
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 構(gòu)造函數(shù)共有三個,最終都會調(diào)用該兩參數(shù)的構(gòu)造函數(shù)
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
// final Object[] items;
// 保存隊列元素的數(shù)組
this.items = new Object[capacity];
// 初始化內(nèi)部鎖,根據(jù)fair決定是否是公平鎖,會影響多個線程讀寫順序
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
// 初始化阻塞隊列,并將傳入的集合添加到隊列
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
// ...省略
}
}
生產(chǎn)者消費者
阻塞隊列的一個使用場景是生產(chǎn)者消費者模式,那么現(xiàn)在舉一個生產(chǎn)者消費者的代碼demo來展示如何使用阻塞隊列,然后進一步分析其中用到的方法的源碼。
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
// 生產(chǎn)者通過put方法不斷向隊列添加元素
while (true) { queue.put(produce()); }
} catch (InterruptedException ex) { ... handle ...}
}
Object produce() { ... }
}
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
// 消費者通過take方法不斷向隊列讀取元素
while (true) { consume(queue.take()); }
} catch (InterruptedException ex) { ... handle ...}
}
void consume(Object x) { ... }
}
class Setup {
// demo中有多個生產(chǎn)者和消費者線程,阻塞隊列能夠保證線程安全
void main() {
BlockingQueue q = new SomeQueueImplementation();
Producer p = new Producer(q);
Consumer c1 = new Consumer(q);
Consumer c2 = new Consumer(q);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
}
}
put方法與take方法
生產(chǎn)者消費者模型中,首先需要生產(chǎn)者添加元素,否則隊列中沒有元素,消費者無法執(zhí)行,添加元素的操作通過put方法來執(zhí)行,消費元素則通過take方法,下面一一分析。
public void put(E e) throws InterruptedException {
//禁止插入null值,null值用于無可讀元素時的讀取失敗標志
checkNotNull(e);
final ReentrantLock lock = this.lock;
//該獲取鎖的方法,即使在阻塞獲取鎖時(即還沒拿到鎖)也會響應其他線程調(diào)用該線程的interrupt方法,拋出InterruptedException異常
lock.lockInterruptibly();
try {
// int count;隊列中元素個數(shù)
// 如果隊列滿了,通過await方法阻塞等待,即使不往下看也能想到take或其他讀取元素的方法中必然有個地方會調(diào)用notFull.signal()方法來喚醒當前線程
while (count == items.length)
notFull.await();
//插入元素e
enqueue(e);
} finally {
lock.unlock();
}
}
//該方法用于在putIndex處插入元素,只在擁有鎖的情況下被調(diào)用
private void enqueue(E x) {
//獲取存儲元素的數(shù)組
final Object[] items = this.items;
//int putIndex;下一個待添加元素的下標,初始為0
items[putIndex] = x;
//防止越界,效果類似于循環(huán)數(shù)組
if (++putIndex == items.length)
putIndex = 0;
//隊列中元素個數(shù)加一
count++;
//喚醒notEmpty.await()
notEmpty.signal();
}
put方法通過鎖來實現(xiàn)線程安全,同時通過notFull和notEmpty兩個方法來進行線程間控制,take方法也大致如此,下面看下take方法
public E take() throws InterruptedException {
// 獲取鎖,保證讀取的安全性
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//判斷隊列是否為空,為空阻塞等待
//在上面分析的enqueue方法的最后會調(diào)用notEmpty.signal來喚醒
while (count == 0)
notEmpty.await();
//如果隊列不為空,退出循環(huán),通過dequeue()返回讀取的元素
return dequeue();
} finally {
lock.unlock();
}
}
// dequeue方法取出takeIndex下標處的元素,只在鎖內(nèi)被調(diào)用
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
// items index for next take, poll, peek or remove
// int takeIndex;初始化為0
//讀取takeIndex處的元素,并刪除數(shù)組中對其引用
E x = (E) items[takeIndex];
items[takeIndex] = null;
//防止越界
if (++takeIndex == items.length)
takeIndex = 0;
//隊列元素數(shù)量減一
count--;
//itrs同于與當前正活躍的迭代器共享狀態(tài),如果存在,通知其一起刪除該元素
if (itrs != null)
itrs.elementDequeued();
//喚醒阻塞的put方法
notFull.signal();
return x;
}
offer()與poll()方法
上面介紹的一對讀寫方法是無限阻塞的,除非有其他線程調(diào)用signal方法來喚醒當前線程。阻塞隊列中還提供了幾個可以指定阻塞時間的讀寫方法
//offer方法可以指定阻塞時間,除此之外與put不同的是offer方法會返回true或者false
//而put方法正常情況下一直阻塞,二者都會拋出中斷異常
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);//禁止插入null元素
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//邏輯與Put方法幾乎一樣,除了增加了阻塞時間
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
//poll方法的邏輯也非常簡單,不過多介紹,與take相似
//不同點在與在阻塞時間到達后,poll方法會返回null值
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
同時poll方法和offer方法還有一種重載實現(xiàn),提供了立即返回結(jié)果的功能,代碼如下
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//在阻塞的方式中,會通過while循環(huán)和await方法的配合來實現(xiàn)阻塞
//但在該方法中,會立刻返回結(jié)果
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
//邏輯相似,理解返回結(jié)果,不阻塞
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
現(xiàn)在已經(jīng)分析了6個常用方法了,接下來把前面接口中出現(xiàn)的方法依次分析一下,可以看出整個該實現(xiàn)類的源碼比較簡單
add()方法
//add方法直接調(diào)用了繼承的抽象類中的add方法
public boolean add(E e) {
return super.add(e);
}
//AbstractQueue中定義的add方法
public boolean add(E e) {
//借用了offer方法的實現(xiàn),offer方法的實現(xiàn)在具體實現(xiàn)類中,上面介紹過
//add方法與另外兩個添加元素方法的不同點在于,添加失敗時拋出IllegalStateException
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
remove()方法
remove方法用于在隊列中刪除指定元素,因為是隨機的,與隊列的FIFO的特性不符,性能較差。
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//判斷隊列中是否有元素
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
//遍歷判斷是否相等,然后調(diào)用removeAt刪除
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
void removeAt(final int removeIndex) {
final Object[] items = this.items;
//如果要刪除的元素正好是正常情況下的那個元素,即takeIndex
//刪除后修改takeIndex并將count減一
if (removeIndex == takeIndex) {
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {//要刪除的不是takeIndex處的元素
final int putIndex = this.putIndex;
//將i+1到putIndex處元素向前移動一個位置,并更新putIndex
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
//隊列元素數(shù)量減一,并刪除迭代器中的該元素
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
//喚醒阻塞的notFull.await
notFull.signal();
}
其他方法匯總
除了上面介紹的幾個方法,還有一些比較簡單的方法,統(tǒng)一放在這一小節(jié)介紹,相互之間關聯(lián)較小,可以看做是多個獨立的方法。
//讀取隊列的第一個元素,但不刪除
//加鎖后讀取數(shù)組中takeIndex的下標處
//在多線程中,讀取的操作也都需要通過加鎖操作進行,否則會讀取到不正確的值
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return (E) items[i];
}
//查看隊列剩余容量
//count變量維護著隊列中元素的個數(shù)
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}
//還有比如size(),clear()等方法,都比較簡單,把代碼粘貼進來就能看懂,不浪費篇幅了
Itr與Itrs介紹
在上面的代碼分析中,多次出現(xiàn)了如下所示的代碼片段
if (itrs != null)
itrs.elementDequeued();
這一節(jié)就介紹下這個對象是什么,起到了什么作用。Itrs是ArrayBlockingQueue中的一個內(nèi)部類,itrs則為其一個成員變量。初始化時為null,transient Itrs itrs = null;
源碼中關于Itrs的描述截取如下
- Shared data between iterators and their queue, allowing queue modifications to update iterators when elements are removed.
該對象在迭代器即阻塞隊列之間共享了數(shù)據(jù),在隊列刪除元素時會更新迭代器。
構(gòu)造迭代器的方法如下:
public Iterator<E> iterator() {
//這里是Itr不是Itrs
return new Itr();
}
然后進入Itr的構(gòu)造函數(shù)中看下:
Itr() {
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
//隊列中沒有元素
if (count == 0) {
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex);
cursor = incCursor(takeIndex);
if (itrs == null) {
//在這里會構(gòu)造一個Itrs對象,并賦值給ArrayBlockingQueue中的itrs對象
//而Itrs內(nèi)部類似個鏈表,用于迭代
itrs = new Itrs(this);
} else {
itrs.register(this); // in this order
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
}
} finally {
lock.unlock();
}
}
總結(jié)
ArrayBlockingQueue的源碼還是比較簡單的,所有的讀寫操作通過內(nèi)部的同一把ReentrantLock鎖來控制,在隊列滿或者隊列空時通過兩個Condition來進行通信。隊列通過一個FIFO的環(huán)形數(shù)組來實現(xiàn),維護了takeIndex和putIndex等變量來決定插入和讀取的元素位置。
瞎聊
過去幾個月因為是畢業(yè)季,時間都被我玩掉了,沒有好好的看源碼,也沒有寫博客,今天放假沒事做終于是補了一篇。寫阻塞隊列是因為上一篇介紹線程池原理的源碼中用到了阻塞隊列,所以順著這個思路準備把阻塞隊列的多個實現(xiàn)類的源碼都看一遍。
寫之前其實沒看過ArrayBlockingQueue的源碼,是一邊看一邊寫出來的。寫著寫著發(fā)現(xiàn)比之前幾篇源碼分析簡單多了,但是開弓沒有回頭箭,既然寫了,內(nèi)容不多也發(fā)出來吧。
現(xiàn)在寫博客、公眾號的人很多,大家的標題都起的很夸張、奇特來試圖吸引人,讓讀者有點進去的欲望,增加更多的粉絲。忘記哪一天我突然覺得這不是一個很恰當?shù)睦碛?,有些太功利了,我寫博客應該是為了積累,為了記錄,也許以后自己可以復習;寫博客不是為了寫給別人看,當然有人因此得到幫助或者因此與我交流是一件順帶的好事,沒有也不用強求。所以不必費盡心思去想些新穎的題目,不用關心語言是否幽默生動,想寫啥寫啥~
歡迎關注我的公眾號
歡迎關注我的公眾號,會經(jīng)常分享一些技術文章和生活隨筆~
今天公眾號改名啦,改成飛坤呀
