LinkedBlockingQueue和ConcurrentLinkedQueue的選擇問題

隊(duì)列是JAVA開發(fā)過程中一種非常常見的數(shù)據(jù)結(jié)構(gòu),尤其是需要再使用生產(chǎn)者-消費(fèi)者的業(yè)務(wù)模型中,Queue常常作為多線程執(zhí)行任務(wù)的數(shù)據(jù)交界點(diǎn),從而保證生產(chǎn)者產(chǎn)生的數(shù)據(jù)能夠依次被消費(fèi)。

阻塞隊(duì)列的選擇

阻塞隊(duì)列的實(shí)現(xiàn)包括ArrayBlockingQueue與LinkedBlockingQueue。相同點(diǎn)不做贅述,區(qū)別有以下幾點(diǎn):
1.初始化時(shí),ArrayBlockingQueue必須指定隊(duì)列最大容量,LinkedBlockingQueue不強(qiáng)制指定,若不指定,默認(rèn)Interger.Max為最大容量。
2.ArrayBlockingQueue內(nèi)部數(shù)據(jù)結(jié)構(gòu)是數(shù)組:Element[],通過putIndex和takeIndex下標(biāo)的循環(huán)移動(dòng)控制隊(duì)首和隊(duì)尾;LinkedBlockingQueue內(nèi)部結(jié)構(gòu)是鏈表:Node<Element>,通過head 和 tail節(jié)點(diǎn)控制隊(duì)首和隊(duì)尾。
3.ArrayBlockingQueue生產(chǎn)與消費(fèi)之間共用一把鎖,而LinkedBlockingQueue生產(chǎn)與消費(fèi)時(shí)用不同的鎖競(jìng)爭(zhēng)。

對(duì)于阻塞隊(duì)列的選擇,一方面考慮吞吐性能,另一方面考慮內(nèi)存占用。

我們可以看到上面說的第三點(diǎn),可以確定的是,在多生產(chǎn)者與多消費(fèi)者的情況下,LinkedBlockingQueue的吞吐性能肯定是要更高的,而且ArrayBlockingQueue在初始化時(shí)直接就申請(qǐng)了一片連續(xù)的內(nèi)存空間。所以在實(shí)際生產(chǎn)使用環(huán)境中,沒有特殊限制考慮,我們?cè)谑褂米枞?duì)列時(shí)往往用LinkedBlockingQueue。

那什么場(chǎng)景下我們會(huì)偏向于使用ArrayBlockingQueue呢?
  • 生產(chǎn)者與消費(fèi)者之間沒有太大競(jìng)爭(zhēng),傾向于單消費(fèi)者,單生產(chǎn)者,且兩者之間沖突較小,這種情況下數(shù)組尋址是明顯要比鏈表去指向next的操作要更快的
  • 基本可以確定隊(duì)列大小,且隊(duì)列大小穩(wěn)定在一定的數(shù)量,這個(gè)時(shí)候數(shù)組占用內(nèi)存是比鏈表小的

阻塞隊(duì)列與非阻塞隊(duì)列的選擇

首先,ConcurrentLinkedQueue相對(duì)阻塞隊(duì)列來說,采用的是CAS無鎖操作,沒有take和put方法,主用poll與offer,無界。有人說,既然此隊(duì)列內(nèi)部進(jìn)隊(duì)和出隊(duì)操作采用的是無鎖,那性能肯定比有鎖的BlockingQueue強(qiáng),那BlockingQueue還有啥用武之地,其實(shí)不然,有些時(shí)候我們就需要線程進(jìn)入阻塞狀態(tài)而非不斷自旋消耗CPU,我們可以歸類以下場(chǎng)景:

  • 數(shù)據(jù)入隊(duì)速度過快,出隊(duì)速度過慢,這個(gè)時(shí)候ConcurrentLinkedQueue如果不借助其他限制手段,隨著時(shí)間的推移,JVM必然會(huì)進(jìn)行頻繁的FULL GC ,嚴(yán)重的情況下甚至?xí)l(fā)生OOM。使用BlockingQueue可以更好的控制內(nèi)存的狀況。
  • 數(shù)據(jù)入隊(duì)速度過慢,出隊(duì)速度過快,這個(gè)時(shí)候消費(fèi)者線程如果一定想要拿到數(shù)據(jù)而不進(jìn)行阻塞,將進(jìn)入大量時(shí)間的自旋狀態(tài),白白浪費(fèi)CPU資源。
  • 入隊(duì)與出隊(duì)速度相仿。
    這時(shí)候要考慮速度,有多少個(gè)線程在同時(shí)做操作,線程操作的頻率如何?
    在大部分場(chǎng)景下,ConcurrentLinkedQueue的性能是要比BlockingQueue要好的,注意是大部分,如果線程之間的競(jìng)爭(zhēng)足夠又高又快,CAS操作的CPU消耗以及線程操作的成功率是極低的,這個(gè)時(shí)候是會(huì)反而不如用鎖競(jìng)爭(zhēng)控制效率來的高。
    我們寫了個(gè)測(cè)試類可以大致看下觀感下,在同樣的環(huán)境下,消費(fèi)者與生產(chǎn)者在不斷對(duì)隊(duì)列進(jìn)行操作,然后不斷增加消費(fèi)者與生產(chǎn)者內(nèi)部線程的數(shù)量。
package algorithm;

import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.LongAdder;

public class TestBlockingQueue {

    BlockingQueue<Data> linkedBlockingQueue = new LinkedBlockingQueue<>(100);
//    BlockingQueue<Data> linkedBlockingQueue = new ArrayBlockingQueue<>(100);
    LongAdder longAdder = new LongAdder();
    Producer producer;
    Consumer consumer;

    static class Data {
        String msg;

        public Data(String msg) {
            this.msg = msg;
        }
    }

    TestBlockingQueue(int size) {
        producer = new Producer(size);
        consumer = new Consumer(size);
    }

    public void startTest() {
        producer.startProduce();
        consumer.startConsume();
    }

    public long stopTestAndReturn() {
        producer.stopProduce();
        consumer.stopConsume();
        return longAdder.longValue();
    }


    class Producer{

        ExecutorService service;
        List<ProduceWorker> workers = new LinkedList<>();
        Producer(int concurrentNum) {
            service = Executors.newFixedThreadPool(concurrentNum);
            for(int i = 0; i < concurrentNum; i++) {
                workers.add(new ProduceWorker());
            }
        }

        public void startProduce() {
            workers.forEach(worker -> service.execute(worker));
        }

        public void stopProduce() {
            service.shutdownNow();
        }

        class ProduceWorker implements Runnable {

            @Override
            public void run() {
                for(;;) {
                    if(!Thread.currentThread().isInterrupted()) {
                        try {
                            linkedBlockingQueue.put(new Data(randomString(10)));
                        } catch (InterruptedException e) {
//                        e.printStackTrace();
                        }
                    } else {
                        break;
                    }
                }
            }
        }

    }

    class Consumer{
        ExecutorService service;

        List<ConsumeWork> workers = new LinkedList<>();

        Consumer(int concurrentNum) {
            service = Executors.newFixedThreadPool(concurrentNum);
            for(int i = 0; i < concurrentNum; i++) {
                workers.add(new ConsumeWork());
            }
        }

        class ConsumeWork implements Runnable {

            @Override
            public void run() {
                for(;;) {
                    if(!Thread.currentThread().isInterrupted()) {
                        try {
                            Data data = linkedBlockingQueue.take();
                            if (null != data) {
                                longAdder.increment();
                            }
                        } catch (InterruptedException e) {
//                        e.printStackTrace();
                        }
                    } else {
                        break;
                    }
                }
            }
        }

        public void startConsume() {
            workers.forEach(worker -> service.execute(worker));
        }

        public void stopConsume() {
            service.shutdownNow();
        }
    }


    public static void main(String[] args) {
        long timeStart = System.currentTimeMillis();
        System.out.println(timeStart);
        TestBlockingQueue testQueue = new TestBlockingQueue(4);
        testQueue.startTest();
        try {
            Thread.currentThread().sleep(5000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long result = testQueue.stopTestAndReturn();
        System.out.println("最終結(jié)果為; " + result);
        long timeEnd = System.currentTimeMillis();
        System.out.println(timeEnd);
        //計(jì)算每s吞吐
        double average = (result / (timeEnd -timeStart)) * 1000;
        System.out.println("1每秒吞吐: " + average);
    }

    static String randomString(int strLength) {
        Random rnd = ThreadLocalRandom.current();
        StringBuilder ret = new StringBuilder();
        for (int i = 0; i < strLength; i++) {
            boolean isChar = (rnd.nextInt(2) % 2 == 0);// 輸出字母還是數(shù)字
            if (isChar) { // 字符串
                int choice = rnd.nextInt(2) % 2 == 0 ? 65 : 97; // 取得大寫字母還是小寫字母
                ret.append((char) (choice + rnd.nextInt(26)));
            } else { // 數(shù)字
                ret.append(Integer.toString(rnd.nextInt(10)));
            }
        }
        return ret.toString();
    }
}

ConcurrentQueue無法用put和take方法,需要用poll和offer,其他代碼一致,不同的地方在于

                        Data data = concurrentLinkedQueue.poll();
                        if (null != data) {
                            longAdder.increment();
                        }
concurrentLinkedQueue.offer(new Data(randomString(10)));

可以先試試同時(shí)有4個(gè)生產(chǎn)者和消費(fèi)者在不斷進(jìn)行隊(duì)列操作,然后再試試1000個(gè)生產(chǎn)者與消費(fèi)者在不斷進(jìn)行隊(duì)列操作。

BlockingQueue-4

![ConcurrentQueue-4](https://upload-images.jianshu.io/upload_images/14745967-9f2a71o z66c7c3f219.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
這個(gè)情況下可以看出無鎖操作是遠(yuǎn)高于有鎖操作的
BlockingQueue-100

ConcurrentQueue-100

負(fù)載過高的情況下,CAS效率低下,反而不如有鎖操作

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 五 相關(guān)文章 JAVA并發(fā)編程與高并發(fā)解決方案 - 并發(fā)編程 一 ...
    chuIllusions丶閱讀 2,052評(píng)論 0 7
  • 線程池用過哪些?線程池有哪些參數(shù)?這幾個(gè)常用線程池的用法和實(shí)際場(chǎng)景?線程池是為了解決大量的請(qǐng)求造成的服務(wù)器大量創(chuàng)建...
    帥可兒妞閱讀 417評(píng)論 0 0
  • 引用:https://www.cnblogs.com/KingIceMou/p/8075343.html 前言:在...
    dlihasa閱讀 565評(píng)論 0 3
  • 前言:在新增的Concurrent包中,BlockingQueue很好的解決了多線程中,如何高效安全“傳輸”數(shù)據(jù)的...
    freezml閱讀 223評(píng)論 0 0
  • 母親今天開始去上班了,家中終于只剩下我一個(gè)人。下班回來,聽著樓下的老爺爺豆腐腦的叫賣聲,好像還是那時(shí)手里拿著兩塊錢...
    汝顧念她閱讀 343評(píng)論 0 0

友情鏈接更多精彩內(nèi)容