隊(duì)列是JAVA開發(fā)過程中一種非常常見的數(shù)據(jù)結(jié)構(gòu),尤其是需要再使用生產(chǎn)者-消費(fèi)者的業(yè)務(wù)模型中,Queue常常作為多線程執(zhí)行任務(wù)的數(shù)據(jù)交界點(diǎn),從而保證生產(chǎn)者產(chǎn)生的數(shù)據(jù)能夠依次被消費(fèi)。
阻塞隊(duì)列的選擇
阻塞隊(duì)列的實(shí)現(xiàn)包括ArrayBlockingQueue與LinkedBlockingQueue。相同點(diǎn)不做贅述,區(qū)別有以下幾點(diǎn):
1.初始化時(shí),ArrayBlockingQueue必須指定隊(duì)列最大容量,LinkedBlockingQueue不強(qiáng)制指定,若不指定,默認(rèn)Interger.Max為最大容量。
2.ArrayBlockingQueue內(nèi)部數(shù)據(jù)結(jié)構(gòu)是數(shù)組:Element[],通過putIndex和takeIndex下標(biāo)的循環(huán)移動(dòng)控制隊(duì)首和隊(duì)尾;LinkedBlockingQueue內(nèi)部結(jié)構(gòu)是鏈表:Node<Element>,通過head 和 tail節(jié)點(diǎn)控制隊(duì)首和隊(duì)尾。
3.ArrayBlockingQueue生產(chǎn)與消費(fèi)之間共用一把鎖,而LinkedBlockingQueue生產(chǎn)與消費(fèi)時(shí)用不同的鎖競(jìng)爭(zhēng)。
對(duì)于阻塞隊(duì)列的選擇,一方面考慮吞吐性能,另一方面考慮內(nèi)存占用。
我們可以看到上面說的第三點(diǎn),可以確定的是,在多生產(chǎn)者與多消費(fèi)者的情況下,LinkedBlockingQueue的吞吐性能肯定是要更高的,而且ArrayBlockingQueue在初始化時(shí)直接就申請(qǐng)了一片連續(xù)的內(nèi)存空間。所以在實(shí)際生產(chǎn)使用環(huán)境中,沒有特殊限制考慮,我們?cè)谑褂米枞?duì)列時(shí)往往用LinkedBlockingQueue。
那什么場(chǎng)景下我們會(huì)偏向于使用ArrayBlockingQueue呢?
- 生產(chǎn)者與消費(fèi)者之間沒有太大競(jìng)爭(zhēng),傾向于單消費(fèi)者,單生產(chǎn)者,且兩者之間沖突較小,這種情況下數(shù)組尋址是明顯要比鏈表去指向next的操作要更快的
- 基本可以確定隊(duì)列大小,且隊(duì)列大小穩(wěn)定在一定的數(shù)量,這個(gè)時(shí)候數(shù)組占用內(nèi)存是比鏈表小的
阻塞隊(duì)列與非阻塞隊(duì)列的選擇
首先,ConcurrentLinkedQueue相對(duì)阻塞隊(duì)列來說,采用的是CAS無鎖操作,沒有take和put方法,主用poll與offer,無界。有人說,既然此隊(duì)列內(nèi)部進(jìn)隊(duì)和出隊(duì)操作采用的是無鎖,那性能肯定比有鎖的BlockingQueue強(qiáng),那BlockingQueue還有啥用武之地,其實(shí)不然,有些時(shí)候我們就需要線程進(jìn)入阻塞狀態(tài)而非不斷自旋消耗CPU,我們可以歸類以下場(chǎng)景:
- 數(shù)據(jù)入隊(duì)速度過快,出隊(duì)速度過慢,這個(gè)時(shí)候ConcurrentLinkedQueue如果不借助其他限制手段,隨著時(shí)間的推移,JVM必然會(huì)進(jìn)行頻繁的FULL GC ,嚴(yán)重的情況下甚至?xí)l(fā)生OOM。使用BlockingQueue可以更好的控制內(nèi)存的狀況。
- 數(shù)據(jù)入隊(duì)速度過慢,出隊(duì)速度過快,這個(gè)時(shí)候消費(fèi)者線程如果一定想要拿到數(shù)據(jù)而不進(jìn)行阻塞,將進(jìn)入大量時(shí)間的自旋狀態(tài),白白浪費(fèi)CPU資源。
- 入隊(duì)與出隊(duì)速度相仿。
這時(shí)候要考慮速度,有多少個(gè)線程在同時(shí)做操作,線程操作的頻率如何?
在大部分場(chǎng)景下,ConcurrentLinkedQueue的性能是要比BlockingQueue要好的,注意是大部分,如果線程之間的競(jìng)爭(zhēng)足夠又高又快,CAS操作的CPU消耗以及線程操作的成功率是極低的,這個(gè)時(shí)候是會(huì)反而不如用鎖競(jìng)爭(zhēng)控制效率來的高。
我們寫了個(gè)測(cè)試類可以大致看下觀感下,在同樣的環(huán)境下,消費(fèi)者與生產(chǎn)者在不斷對(duì)隊(duì)列進(jìn)行操作,然后不斷增加消費(fèi)者與生產(chǎn)者內(nèi)部線程的數(shù)量。
package algorithm;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.LongAdder;
public class TestBlockingQueue {
BlockingQueue<Data> linkedBlockingQueue = new LinkedBlockingQueue<>(100);
// BlockingQueue<Data> linkedBlockingQueue = new ArrayBlockingQueue<>(100);
LongAdder longAdder = new LongAdder();
Producer producer;
Consumer consumer;
static class Data {
String msg;
public Data(String msg) {
this.msg = msg;
}
}
TestBlockingQueue(int size) {
producer = new Producer(size);
consumer = new Consumer(size);
}
public void startTest() {
producer.startProduce();
consumer.startConsume();
}
public long stopTestAndReturn() {
producer.stopProduce();
consumer.stopConsume();
return longAdder.longValue();
}
class Producer{
ExecutorService service;
List<ProduceWorker> workers = new LinkedList<>();
Producer(int concurrentNum) {
service = Executors.newFixedThreadPool(concurrentNum);
for(int i = 0; i < concurrentNum; i++) {
workers.add(new ProduceWorker());
}
}
public void startProduce() {
workers.forEach(worker -> service.execute(worker));
}
public void stopProduce() {
service.shutdownNow();
}
class ProduceWorker implements Runnable {
@Override
public void run() {
for(;;) {
if(!Thread.currentThread().isInterrupted()) {
try {
linkedBlockingQueue.put(new Data(randomString(10)));
} catch (InterruptedException e) {
// e.printStackTrace();
}
} else {
break;
}
}
}
}
}
class Consumer{
ExecutorService service;
List<ConsumeWork> workers = new LinkedList<>();
Consumer(int concurrentNum) {
service = Executors.newFixedThreadPool(concurrentNum);
for(int i = 0; i < concurrentNum; i++) {
workers.add(new ConsumeWork());
}
}
class ConsumeWork implements Runnable {
@Override
public void run() {
for(;;) {
if(!Thread.currentThread().isInterrupted()) {
try {
Data data = linkedBlockingQueue.take();
if (null != data) {
longAdder.increment();
}
} catch (InterruptedException e) {
// e.printStackTrace();
}
} else {
break;
}
}
}
}
public void startConsume() {
workers.forEach(worker -> service.execute(worker));
}
public void stopConsume() {
service.shutdownNow();
}
}
public static void main(String[] args) {
long timeStart = System.currentTimeMillis();
System.out.println(timeStart);
TestBlockingQueue testQueue = new TestBlockingQueue(4);
testQueue.startTest();
try {
Thread.currentThread().sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
long result = testQueue.stopTestAndReturn();
System.out.println("最終結(jié)果為; " + result);
long timeEnd = System.currentTimeMillis();
System.out.println(timeEnd);
//計(jì)算每s吞吐
double average = (result / (timeEnd -timeStart)) * 1000;
System.out.println("1每秒吞吐: " + average);
}
static String randomString(int strLength) {
Random rnd = ThreadLocalRandom.current();
StringBuilder ret = new StringBuilder();
for (int i = 0; i < strLength; i++) {
boolean isChar = (rnd.nextInt(2) % 2 == 0);// 輸出字母還是數(shù)字
if (isChar) { // 字符串
int choice = rnd.nextInt(2) % 2 == 0 ? 65 : 97; // 取得大寫字母還是小寫字母
ret.append((char) (choice + rnd.nextInt(26)));
} else { // 數(shù)字
ret.append(Integer.toString(rnd.nextInt(10)));
}
}
return ret.toString();
}
}
ConcurrentQueue無法用put和take方法,需要用poll和offer,其他代碼一致,不同的地方在于
Data data = concurrentLinkedQueue.poll();
if (null != data) {
longAdder.increment();
}
concurrentLinkedQueue.offer(new Data(randomString(10)));
可以先試試同時(shí)有4個(gè)生產(chǎn)者和消費(fèi)者在不斷進(jìn)行隊(duì)列操作,然后再試試1000個(gè)生產(chǎn)者與消費(fèi)者在不斷進(jìn)行隊(duì)列操作。


這個(gè)情況下可以看出無鎖操作是遠(yuǎn)高于有鎖操作的


負(fù)載過高的情況下,CAS效率低下,反而不如有鎖操作