1. 引言
生產(chǎn)者、消費者模式是如此的重要,是理解java 多線程并發(fā)的核心知識點,不少同學(xué)面試時,常規(guī)操作是當(dāng)著面試官的面,手寫一個生產(chǎn)者消費者模式。
2. 實現(xiàn)方式
通常情況下,有5種方式來實現(xiàn)
1. synchronized + wait() + notify() 方式
2. 可重入鎖ReentrantLock (配合Condition)
3. BlockingQueue 阻塞隊列方式
4. 信號量Semaphore 方式
5. 管道輸入輸出流PipedInputStream和PipedOutputStream 方式
這里僅介紹,面試比較常見的 第2,第3種方式
3. 可重入鎖ReentrantLock 方式
3.1 可重入鎖概念:
ReentrantLock類實現(xiàn)Lock接口,并在方法訪問共享資源時為其提供同步。操作共享資源的代碼會鎖定當(dāng)前工作線程并阻止嘗試鎖定共享資源的所有其他線程。
顧名思義,ReentrantLock允許線程不止一次地鎖定資源。當(dāng)線程首次進入鎖定狀態(tài)時,保持計數(shù)(hold count)設(shè)置為1。解鎖之前,線程可以再次重新進入鎖定狀態(tài),并且將計數(shù)增加1。
對于每個解鎖請求,保持計數(shù)減1,當(dāng)保持計數(shù)為0時,資源解鎖。
可重入鎖還提供公平參數(shù),可以實現(xiàn)所謂公平鎖和非公平鎖。
1、公平鎖能:新老線程一律需要排隊使用鎖。
2、非公平鎖:老的線程排隊使用鎖;但是無法保證新線程搶占已經(jīng)在排隊的線程的鎖。
ReentrantLock 可重入鎖的方式,需要配合Condition使用,Condition 與Lock的主要區(qū)別在于是否能夠響應(yīng)中斷。
public class ProducerAndConsumerByLock {
private static int count = 0;
private int maxNum = 3;
ReentrantLock lock = new ReentrantLock();
Condition producerCondition = lock.newCondition();
Condition consumerCondition = lock.newCondition();
public static void main(String[] args) {
ProducerAndConsumerByLock test = new ProducerAndConsumerByLock();
new Thread(test.new Producer()).start();
new Thread(test.new Producer()).start();
new Thread(test.new Consumer()).start();
new Thread(test.new Consumer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
//獲取鎖
lock.lock();
try {
// while (getCount() == maxNum) {
// producerCondition.await();
// System.out.println("生產(chǎn)能力達到上限,進入等待狀態(tài)");
// }
count++;
System.out.println(Thread.currentThread().getName()
+ "生產(chǎn)者生產(chǎn),目前總共有" + count);
//喚醒消費者
consumerCondition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
//獲取鎖
lock.unlock();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
Thread.sleep(700);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock();
try {
// while (count == 0) {
// consumerCondition.await();
// }
count--;
System.out.println(Thread.currentThread().getName()
+ "消費者消費,目前總共有" + count);
//喚醒生產(chǎn)者
producerCondition.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
}
}
輸入結(jié)果:
Thread-1生產(chǎn)者生產(chǎn),目前總共有1
Thread-0生產(chǎn)者生產(chǎn),目前總共有2
Thread-2消費者消費,目前總共有1
Thread-3消費者消費,目前總共有0
Thread-0生產(chǎn)者生產(chǎn),目前總共有1
Thread-1生產(chǎn)者生產(chǎn),目前總共有2
Thread-3消費者消費,目前總共有1
Thread-2消費者消費,目前總共有0
Thread-1生產(chǎn)者生產(chǎn),目前總共有1
Thread-0生產(chǎn)者生產(chǎn),目前總共有2
Thread-1生產(chǎn)者生產(chǎn),目前總共有3
Thread-0生產(chǎn)者生產(chǎn),目前總共有4
Thread-3消費者消費,目前總共有3
Thread-2消費者消費,目前總共有2
Thread-1生產(chǎn)者生產(chǎn),目前總共有3
Thread-0生產(chǎn)者生產(chǎn),目前總共有4
Thread-3消費者消費,目前總共有3
Thread-2消費者消費,目前總共有2
Thread-1生產(chǎn)者生產(chǎn),目前總共有3
Thread-0生產(chǎn)者生產(chǎn),目前總共有4
Thread-2消費者消費,目前總共有3
Thread-1生產(chǎn)者生產(chǎn),目前總共有4
Thread-3消費者消費,目前總共有3
Thread-0生產(chǎn)者生產(chǎn),目前總共有4
Thread-0生產(chǎn)者生產(chǎn),目前總共有5
Thread-1生產(chǎn)者生產(chǎn),目前總共有6
Thread-3消費者消費,目前總共有5
Thread-2消費者消費,目前總共有4
Thread-0生產(chǎn)者生產(chǎn),目前總共有5
Thread-1生產(chǎn)者生產(chǎn),目前總共有6
Thread-3消費者消費,目前總共有5
Thread-2消費者消費,目前總共有4
Thread-0生產(chǎn)者生產(chǎn),目前總共有5
Thread-1生產(chǎn)者生產(chǎn),目前總共有6
Thread-2消費者消費,目前總共有5
Thread-3消費者消費,目前總共有4
Thread-3消費者消費,目前總共有3
Thread-2消費者消費,目前總共有2
Thread-3消費者消費,目前總共有1
Thread-2消費者消費,目前總共有0
Process finished with exit code 0
不少人使用注釋掉的while 語句來判斷當(dāng)前生成者產(chǎn)能是否已滿。 不過這種while循環(huán)本身也屬于一個獨立線程,會有沖突的可能,放在演示代碼里,輸出順序可能會混亂。
打開注釋后,輸出如下
Thread-0生產(chǎn)者生產(chǎn),目前總共有1
Thread-1生產(chǎn)者生產(chǎn),目前總共有2
Thread-2消費者消費,目前總共有1
Thread-3消費者消費,目前總共有0
Thread-1生產(chǎn)者生產(chǎn),目前總共有1
Thread-0生產(chǎn)者生產(chǎn),目前總共有2
Thread-3消費者消費,目前總共有1
Thread-2消費者消費,目前總共有0
Thread-1生產(chǎn)者生產(chǎn),目前總共有1
Thread-0生產(chǎn)者生產(chǎn),目前總共有2
Thread-1生產(chǎn)者生產(chǎn),目前總共有3
Thread-2消費者消費,目前總共有2
Thread-3消費者消費,目前總共有1
生產(chǎn)能力達到上限,進入等待狀態(tài)
Thread-0生產(chǎn)者生產(chǎn),目前總共有2
Thread-1生產(chǎn)者生產(chǎn),目前總共有3
Thread-2消費者消費,目前總共有2
Thread-3消費者消費,目前總共有1
生產(chǎn)能力達到上限,進入等待狀態(tài)
Thread-0生產(chǎn)者生產(chǎn),目前總共有2
Thread-1生產(chǎn)者生產(chǎn),目前總共有3
Thread-2消費者消費,目前總共有2
Thread-3消費者消費,目前總共有1
生產(chǎn)能力達到上限,進入等待狀態(tài)
Thread-0生產(chǎn)者生產(chǎn),目前總共有2
Thread-1生產(chǎn)者生產(chǎn),目前總共有3
Thread-3消費者消費,目前總共有2
Thread-2消費者消費,目前總共有1
生產(chǎn)能力達到上限,進入等待狀態(tài)
Thread-0生產(chǎn)者生產(chǎn),目前總共有2
生產(chǎn)能力達到上限,進入等待狀態(tài)
Thread-1生產(chǎn)者生產(chǎn),目前總共有3
Thread-2消費者消費,目前總共有2
Thread-3消費者消費,目前總共有1
生產(chǎn)能力達到上限,進入等待狀態(tài)
Thread-0生產(chǎn)者生產(chǎn),目前總共有2
生產(chǎn)能力達到上限,進入等待狀態(tài)
Thread-1生產(chǎn)者生產(chǎn),目前總共有3
Thread-3消費者消費,目前總共有2
Thread-2消費者消費,目前總共有1
生產(chǎn)能力達到上限,進入等待狀態(tài)
Thread-0生產(chǎn)者生產(chǎn),目前總共有2
生產(chǎn)能力達到上限,進入等待狀態(tài)
Thread-1生產(chǎn)者生產(chǎn),目前總共有3
Thread-3消費者消費,目前總共有2
Thread-2消費者消費,目前總共有1
生產(chǎn)能力達到上限,進入等待狀態(tài)
Thread-0生產(chǎn)者生產(chǎn),目前總共有2
Thread-3消費者消費,目前總共有1
Thread-2消費者消費,目前總共有0
Process finished with exit code 0
4. 阻塞隊列方式
主要是利用LinkedBlockingDeque 或者ArrayBlockingQueue 之類的阻塞隊列。
其take() 與 put() API 是阻塞性質(zhì)的。
public class ProducerAndConsumerByQueue {
private BlockingQueue<Toy> blockingDeque = new LinkedBlockingDeque<>(10);
public static void main(String[] args) {
ProducerAndConsumerByQueue producerAndConsumer = new ProducerAndConsumerByQueue();
new Thread(producerAndConsumer.new Producer()).start();
new Thread(producerAndConsumer.new Consumer()).start();
}
class Producer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i ++ ) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
Toy toy = new Toy(i + "");
try {
blockingDeque.put(toy);
System.out.println( "生產(chǎn)者" + Thread.currentThread().getName() + " 生產(chǎn)玩具" + toy.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i ++ ) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Toy toy = blockingDeque.take();
System.out.println("消費者" + Thread.currentThread().getName() + " 消費玩具" + toy.getName());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Toy {
private String name;
public Toy(String name) {
this.name = name;
}
public String getName() {
return "toy " + name;
}
}
}
輸出結(jié)果
生產(chǎn)者Thread-0 生產(chǎn)玩具toy 0
生產(chǎn)者Thread-0 生產(chǎn)玩具toy 1
消費者Thread-1 消費玩具toy 0
生產(chǎn)者Thread-0 生產(chǎn)玩具toy 2
生產(chǎn)者Thread-0 生產(chǎn)玩具toy 3
消費者Thread-1 消費玩具toy 1
生產(chǎn)者Thread-0 生產(chǎn)玩具toy 4
生產(chǎn)者Thread-0 生產(chǎn)玩具toy 5
生產(chǎn)者Thread-0 生產(chǎn)玩具toy 6
消費者Thread-1 消費玩具toy 2
生產(chǎn)者Thread-0 生產(chǎn)玩具toy 7
生產(chǎn)者Thread-0 生產(chǎn)玩具toy 8
消費者Thread-1 消費玩具toy 3
生產(chǎn)者Thread-0 生產(chǎn)玩具toy 9
消費者Thread-1 消費玩具toy 4
消費者Thread-1 消費玩具toy 5
消費者Thread-1 消費玩具toy 6
消費者Thread-1 消費玩具toy 7
消費者Thread-1 消費玩具toy 8
消費者Thread-1 消費玩具toy 9
Process finished with exit code 0