文章同步更新在個(gè)人公眾號(hào)“梓莘”,歡迎大家關(guān)注,相互交流。
阻塞隊(duì)列
ArrayBlockingQueue 是一個(gè)基于數(shù)組的有界阻塞隊(duì)列,此隊(duì)列基按FIFO原則對(duì)元素進(jìn)行排序
LinkedBlockQueue:一個(gè)基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,次隊(duì)列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue
SynchromousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列,每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于LinkedBlockQueue
阻塞隊(duì)列:首先它是一個(gè)隊(duì)列,而一個(gè)阻塞隊(duì)列在數(shù)據(jù)結(jié)構(gòu)中所其的作用大致如圖

當(dāng)阻塞隊(duì)列是空時(shí),從隊(duì)列中獲取元素的操作將會(huì)被阻塞
當(dāng)阻塞隊(duì)列是滿時(shí),往隊(duì)列中添加元素的操作將會(huì)被阻塞
試圖從空的阻塞隊(duì)列中獲取元素的線程將會(huì)被阻塞,直到其他的線程王空的隊(duì)列中插入新的元素。
試圖往滿的阻塞隊(duì)列中添加新元素的線程通用也會(huì)被阻塞,直到其他的線程從隊(duì)列中移除一個(gè)或者多個(gè)元素或者完全清空隊(duì)列后使隊(duì)列重新變得空閑起來并后續(xù)新增。
為什么用 好處是什么?
在多線程領(lǐng)域中:所謂阻塞,在某些情況下會(huì)掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會(huì)自動(dòng)被喚醒。
為什么需要BlockingQueue?
好處是我們不需要關(guān)系什么時(shí)候需要阻塞線程,什么時(shí)候需要喚醒線程,因?yàn)檫@一切BlockingQueue都實(shí)現(xiàn)了。
在concurrent包發(fā)布之前,在多線程環(huán)境下,我們每個(gè)線程都必須自己去控制這些細(xì)節(jié),尤其還要兼顧效率和線程安全,而這會(huì)給我們的程序帶來不少的復(fù)雜度。
BlockingQueue種類
- ArrayBlockingQueue:由數(shù)組結(jié)果組成的有界阻塞隊(duì)列
- LinkedBlockingQueue:由鏈表結(jié)構(gòu)組成的有界阻塞隊(duì)列,默認(rèn)大小為Integer.MAX_VALUE
- PriorityBlockingQueue :支持優(yōu)先級(jí)排序的無界阻塞隊(duì)列
- DelayQueue: 使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的延遲無界阻塞隊(duì)列
- SynchronousQueue:不存儲(chǔ)元素的阻塞隊(duì)列,也就是單個(gè)元素的隊(duì)列
- LinkedTransferQueue:由鏈表結(jié)構(gòu)組成的無界阻塞隊(duì)列
- LinkedBlockingQueue:由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列
阻塞隊(duì)列的核心方法

- 拋出異常:當(dāng)阻塞隊(duì)列滿時(shí),再往隊(duì)列里add插入元素會(huì)拋出java.lang.IllegalStateException: Queue full
當(dāng)阻塞隊(duì)列空時(shí),再?gòu)年?duì)列里remove移除元素會(huì)拋NoSuchElementException
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
System.out.println(blockingQueue.element());
//java.lang.IllegalStateException: Queue full
//System.out.println(blockingQueue.add("d"));
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
System.out.println(blockingQueue.remove());
//java.util.NoSuchElementException
//System.out.println(blockingQueue.remove());
}
- 特殊值:插入方法,成功true失敗false
移除方法,成功返回出隊(duì)列的元素,隊(duì)列里面沒有就返回null
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
//false
System.out.println(blockingQueue.offer("d"));
System.out.println(blockingQueue.peek());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
//null
System.out.println(blockingQueue.poll());
}
- 一直阻塞:當(dāng)阻塞隊(duì)列滿時(shí),生產(chǎn)者線程繼續(xù)往隊(duì)列中put元素,隊(duì)列會(huì)一直阻塞生產(chǎn)線程直到put數(shù)據(jù)或者響應(yīng)中斷請(qǐng)求
當(dāng)隊(duì)列空時(shí),消費(fèi)者線程視圖從隊(duì)列里take元素,隊(duì)列會(huì)一直阻塞消費(fèi)者線程直到隊(duì)列可用
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
//blockingQueue.put("d");
blockingQueue.take();
blockingQueue.take();
blockingQueue.take();
//blockingQueue.take();
}
- 超時(shí)退出:當(dāng)阻塞隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞生產(chǎn)者線程一定時(shí)間,超過限時(shí)后生產(chǎn)者線程會(huì)退出
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
blockingQueue.offer("a",2L, TimeUnit.SECONDS);
blockingQueue.offer("b",2L, TimeUnit.SECONDS);
blockingQueue.offer("c",2L, TimeUnit.SECONDS);
blockingQueue.offer("d",2L, TimeUnit.SECONDS);
}
SynchronousQueue
SynchronousQueue與其他BlockingQueue不同,SynchronousQueue是一個(gè)不存儲(chǔ)元素的BlockingQueue,每一個(gè)put操作必須要等待一個(gè)take操作,否則不能繼續(xù)添加元素,反之亦然。
package com.zixin;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
/**
* @ClassName SynchronousQueueDemo
* @Description
* @Author zishen
* @Date 2019/12/31 9:10
* @Version 1.0
* AA put 1
* BB take 1
* AA put 2
* BB take 2
* AA put 3
* BB take 3
**/
public class SynchronousQueueDemo {
public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(()->{
try {
System.out.println(Thread.currentThread().getName()+" put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName()+" put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName()+" put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
},"AA").start();
new Thread(()->{
try {
Thread.sleep(5);
System.out.println(Thread.currentThread().getName()+" take 1");
blockingQueue.take();
Thread.sleep(5);
System.out.println(Thread.currentThread().getName()+" take 2");
blockingQueue.take();
Thread.sleep(5);
System.out.println(Thread.currentThread().getName()+" take 3");
blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"BB").start();
}
}
線程消費(fèi)之生產(chǎn)者消費(fèi)者
package com.zixin;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 資源類
*/
class ShareData{
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void increment()throws Exception{
lock.lock();
try{
//1、判斷
while(number !=0){
//等待 不能生產(chǎn)
condition.await();
}
//2、干活
number++;
System.out.println(Thread.currentThread().getName()+" "+number);
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void decrement()throws Exception{
lock.lock();
try{
//1、判斷 要使用while 如果用if可能會(huì)產(chǎn)生虛假喚起
while(number ==0){
//等待 不能生產(chǎn)
condition.await();
}
//2、干活
number--;
System.out.println(Thread.currentThread().getName()+" "+number);
condition.signalAll();
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
/**
* @ClassName ProduceConsumerTraditionDemo
* @Description 初始值為0的變量。兩個(gè)線程對(duì)其交替操作,一個(gè)加1一個(gè)減1 來5輪
* 1、線程 操作 資源類
* 2、判斷 干活 通知
* 3、防止虛假喚醒操作
* @Author zixin
* @Date 2019/12/31 10:35
* @Version 1.0
**/
public class ProduceConsumerTraditionDemo {
public static void main(String[] args) {
ShareData shareData = new ShareData();
new Thread(()->{
for (int i=0;i<5;i++){
try {
shareData.increment();
} catch (Exception e) {
e.printStackTrace();
}
}
},"AA").start();
new Thread(()->{
for (int i=0;i<5;i++){
try {
shareData.decrement();
} catch (Exception e) {
e.printStackTrace();
}
}
},"BB").start();
}
}
synchronized和lock的區(qū)別
原始構(gòu)成:
synchronized是關(guān)鍵字屬于JVM層面
monitorenter(底層是通過monitor對(duì)象來完成,其實(shí)wait/notify等方法也依賴于monitor,對(duì)象只有在同步塊或方法中才能調(diào)wait/notify方法)
monitorexit
lock是具體類(java.util.concurrent.locks.Lock)是api層面的鎖使用方法:
snchronized不需要用戶去手動(dòng)釋放鎖,當(dāng)synchronized代碼執(zhí)行完成后系統(tǒng)會(huì)自動(dòng)讓線程釋放對(duì)鎖的占用
ReentrantLock則需要用戶去手動(dòng)釋放鎖若么有主動(dòng)釋放鎖,就有可能導(dǎo)致出現(xiàn)死鎖現(xiàn)象
需要lock()和unlock()方法配合try/finally語句塊來完成等待是否可中斷:
synchronized不可中斷,除非拋出異?;蛘_\(yùn)行完成。
ReentrantLock可中斷:
1、設(shè)置超時(shí)方法tryLock(long timeout,TimeUnit unit)
2、LockInterruptibly()放代碼塊中,調(diào)用interrupt()方法可中斷加鎖是否公平:
synchronized非公平鎖
ReentrantLock兩者都可以,默認(rèn)公平鎖,構(gòu)造方法可以傳入boolean值true為公平鎖,false為非公平鎖鎖綁定多個(gè)條件Condition
synchronized沒有
ReentrantLock用來實(shí)現(xiàn)介紹喚醒需要喚醒的線程組,可以精確喚醒,而不是像synchronized要么喚醒一個(gè)線程要么喚醒全部線程