概述
blockQueue 作為線程容器、阻塞隊(duì)列,多用于生產(chǎn)者、消費(fèi)者的關(guān)系模式中,保障并發(fā)編程線程同步,線程池中被用于當(dāng)作存儲任務(wù)的隊(duì)列,還可以保證線程執(zhí)行的有序性。
常用方法
生產(chǎn)
- add(obj):往隊(duì)列里面增加一個(gè)對象,如果隊(duì)列沒有空間拋出異常,反之返回true。
- offer(obj): 往隊(duì)列增加一個(gè)對象,返回true/false
- put(obj): 往隊(duì)列增加一個(gè)對象,如果沒有空間,則會阻塞改線程,直到有空間.
消費(fèi)
- poll(time):取出排在首位的對象,如果在一定時(shí)間內(nèi)沒有返回,則會返回null
- take():取出排在首位的對象,如果隊(duì)列中沒有數(shù)據(jù),則會阻塞該線程直到有數(shù)據(jù)。
查詢
- contains(obj):查詢是否存在某個(gè)元素,返回true/false
- peek():返回隊(duì)列頭部的元素,無則返回null
特點(diǎn)
- 容量有限,可以限定隊(duì)列的長度,如果沒有主動顯示隊(duì)列長度的情況下,默認(rèn)長度為Integer.MAX_VALUE
- 內(nèi)存一致性,遵循h(huán)append-before原則,即寫操作總是先于后面的讀操作。參考資料 happend-before
- 因?yàn)槠淅^承Collection接口,所以可以使用集合的接口,但某些接口并不保證立即執(zhí)行,因?yàn)槠鋬?nèi)部維護(hù)著內(nèi)部鎖(ReentrantLock),所以只有在獲取鎖的情況下才會執(zhí)行對應(yīng)的代碼,以remove()源碼為例:
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
每次操作都會去獲取鎖,如果鎖被其他操作暫用,沒有獲取到鎖,則只能去排隊(duì),所以上面代碼并不會立即執(zhí)行。
常用分類
前言
創(chuàng)建隊(duì)列時(shí),可以添加fair參數(shù),用于聲明內(nèi)部鎖是否是公平鎖,公平鎖用于決定隊(duì)列里面的任務(wù)是否會按照順訊執(zhí)行。
公平鎖:
顯式聲明為公平鎖的任務(wù)執(zhí)行完全按照隊(duì)列的順序執(zhí)行,新的任務(wù)進(jìn)來會存放在隊(duì)尾。
非公平鎖:
隊(duì)列里面的任務(wù)可以按照順序執(zhí)行,但是新的任務(wù)可能會與隊(duì)列爭搶CPU資源,不保證隊(duì)列外的順序。
- ArrayBlockingQueue,創(chuàng)建固定大小的隊(duì)列,內(nèi)部維護(hù)一個(gè)數(shù)組,遵循FIFO原則
- LinkedBlockingQueue,可以自定義隊(duì)列長度,無指定的情況下默認(rèn)為Integer.MAX_VALUE,內(nèi)部維護(hù)著一個(gè)鏈表,遵循著FIFO原則
- PriorityBlockingQueue,類似ArrayBlockingQueue,內(nèi)部維護(hù)一個(gè)數(shù)組,但并不按照FIFO原則,其內(nèi)部有個(gè)compare屬性決定隊(duì)列任務(wù)的執(zhí)行順序。
- SynchronousQueue,特殊的隊(duì)列,內(nèi)部無存儲空間維護(hù)隊(duì)列,只有當(dāng)生產(chǎn)者和消費(fèi)者同時(shí)存在時(shí),才會執(zhí)行,類似與管道。
例子
- 生產(chǎn)者與消費(fèi)者案例,一個(gè)生產(chǎn)者和多個(gè)消費(fèi)者。
public class BlockQueueDemo {
/**
* 生產(chǎn)者
*/
static class Productor implements Runnable{
private BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque();
Productor(BlockingQueue blockingQueue){
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
for(int i=0;i<100;i++){
try {
Thread.sleep(200);
blockingQueue.put(i);
System.out.println("生產(chǎn)者產(chǎn)品了產(chǎn)品"+i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 消費(fèi)者
*/
static class Consumer implements Runnable{
public BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque();
Consumer(BlockingQueue blockingQueue){
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
while(true){
try {
String name = Thread.currentThread().getName();
Integer queueData = blockingQueue.take();
System.out.println("消費(fèi)者"+name+"消費(fèi)了產(chǎn)品"+queueData);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 一個(gè)生產(chǎn)者對應(yīng)多個(gè)消費(fèi)者,采用BlockQueue作為緩沖區(qū)
* @param args
*/
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque();
Productor productor = new Productor(blockingQueue);
Consumer consumer = new Consumer(blockingQueue);
Consumer consumer2 = new Consumer(blockingQueue);
new Thread(productor).start();
new Thread(consumer).start();
new Thread(consumer2).start();
}
}
執(zhí)行結(jié)果:
消費(fèi)者Thread-2消費(fèi)了產(chǎn)品21
生產(chǎn)者產(chǎn)品了產(chǎn)品22
消費(fèi)者Thread-1消費(fèi)了產(chǎn)品22
生產(chǎn)者產(chǎn)品了產(chǎn)品23
消費(fèi)者Thread-2消費(fèi)了產(chǎn)品23
生產(chǎn)者產(chǎn)品了產(chǎn)品24
消費(fèi)者Thread-1消費(fèi)了產(chǎn)品24
生產(chǎn)者產(chǎn)品了產(chǎn)品25
消費(fèi)者Thread-2消費(fèi)了產(chǎn)品25
生產(chǎn)者產(chǎn)品了產(chǎn)品26
消費(fèi)者Thread-1消費(fèi)了產(chǎn)品26
生產(chǎn)者產(chǎn)品了產(chǎn)品27
消費(fèi)者Thread-2消費(fèi)了產(chǎn)品27