聊一聊disruptor-無(wú)鎖并發(fā)框架

前言

Disruptor是英國(guó)外匯交易公司LMAX開發(fā)的一個(gè)高性能隊(duì)列,研發(fā)的初衷是解決內(nèi)存隊(duì)列的延遲問(wèn)題?;贒isruptor開發(fā)的系統(tǒng)單線程能支撐每秒600萬(wàn)訂單,2010年在QCon演講后,獲得了業(yè)界關(guān)注。
其實(shí)Disruptor與其說(shuō)是一個(gè)框架,不如說(shuō)是一種設(shè)計(jì)思路,這個(gè)設(shè)計(jì)思路對(duì)于存在“并發(fā)、緩沖區(qū)、生產(chǎn)者—消費(fèi)者模型、事務(wù)處理”這些元素的程序來(lái)說(shuō),Disruptor提出了一種大幅提升性能(TPS)的方案。


老司機(jī)帶帶我
  • 來(lái)一張全鏈路流程圖


  • disruptor設(shè)計(jì)理念很超前,解決了傳統(tǒng)隊(duì)列的痛點(diǎn)
    1、false-sharding:CPU偽共享問(wèn)題
    2、無(wú)鎖編程的極致體驗(yàn)-CAS
    3、兩個(gè)獨(dú)立的線程之間高效交換數(shù)據(jù)

一、鎖的代價(jià)

Disruptor論文中講述一個(gè)實(shí)驗(yàn),一個(gè)計(jì)數(shù)器循環(huán)自增5億次

  • 場(chǎng)景1:?jiǎn)尉€程無(wú)鎖時(shí),程序耗時(shí)300ms
  • 場(chǎng)景2:?jiǎn)尉€程有鎖,程序需要耗時(shí)10000ms
  • 場(chǎng)景3:雙線程有鎖,耗時(shí)224000ms
  • .........


Why?

簡(jiǎn)而言之就是多線程鎖競(jìng)爭(zhēng)導(dǎo)致的上下文切換時(shí)間成本遠(yuǎn)遠(yuǎn)大于了線程持有鎖的性能損耗

  • ArrayBlockQueue偽代碼分析
public void put(E e) {  
        final ReentrantLock lock = this.lock;  
        //加鎖  
        lock.lock(); 
        //當(dāng)隊(duì)列滿時(shí),調(diào)用notFull.await()方法,阻塞寫線程。    
        while (count == items.length) {
              notFull.await(); //Condition條件阻塞
        }         
        //把元素 e 插入到隊(duì)尾  
        insert(e); 
        //解鎖 
        lock.unlock();
        //若隊(duì)列為空,激活讀線程
        notEmpty.signal();  
    }  


public E take() throws InterruptedException {  
        final ReentrantLock lock = this.lock;  
       //加鎖
        lock.lock();   
       //當(dāng)隊(duì)列空時(shí),阻塞讀線程
        while (count == 0) {
             notEmpty.await();  
       }           
        //取出隊(duì)頭元素  
        E x = extract();  
       //若隊(duì)列full,激活寫線程
        notFull.signal();  
        return x;  
        //解鎖 
        finally{
            lock.unlock(); 
        } 
           
    }  
  • tail(takeIndex)和head(putIndex)指針都是鎖競(jìng)爭(zhēng)的沖突點(diǎn)
    隊(duì)列的目的就是為生產(chǎn)者和消費(fèi)者提供一個(gè)地方存放要交互的數(shù)據(jù),緩沖上下游的消息,實(shí)際場(chǎng)景中緩沖常常是滿的(生產(chǎn)者比消費(fèi)者快)或者空的(消費(fèi)者比生產(chǎn)者快)。生產(chǎn)者和消費(fèi)者能夠步調(diào)一致的情況非常少見。
  • ArrayBlockQueue是悲觀鎖的一種體現(xiàn),讀寫線程都假設(shè)存在沖突, 多線程并發(fā)場(chǎng)景下,性能很差
  • 接下來(lái)讓我們看看Disruptor的實(shí)現(xiàn)
    disruptor根本就不用鎖,取而代之-CAS,嚴(yán)格意義上說(shuō)仍然是使用鎖, 因?yàn)镃AS本質(zhì)上也是一種樂觀鎖
  • Java悲觀鎖和樂觀鎖邏輯上類似Mysql的鎖

CAS:Compare And Swap/Set 顧名思義比較和交換
CPU級(jí)別的指令,cpu去更新一個(gè)值,但如果跟新過(guò)程中值發(fā)生了變化,操作就失敗,然后重試,直到更新成功!
Disruptor的sequence的自增就是CAS的自旋自增,對(duì)應(yīng)的,ArrayBlockQueue的數(shù)組索引index是互斥自增!

  • 樂觀鎖設(shè)計(jì)思想:假設(shè)沒有沖突


    CAS原理圖
  • 悲觀鎖設(shè)計(jì)思想:假設(shè)存在沖突


    悲觀鎖
  • CAS 比較適宜持有鎖的時(shí)間較短的并發(fā)場(chǎng)景(自增、簡(jiǎn)單更新),
    反之持有鎖時(shí)間較長(zhǎng)的場(chǎng)景如秒殺,下單,會(huì)導(dǎo)致自循環(huán)次數(shù)過(guò)多,線程饑餓程度增加

二、disrupter核心數(shù)據(jù)結(jié)構(gòu)-ringbuffer

  • 隊(duì)列上下游的緩沖容器


  • 首尾相接的環(huán)形數(shù)組



    數(shù)組長(zhǎng)度2^n,通過(guò)位運(yùn)算,加快定位的速度。下標(biāo)采取遞增的形式。不用擔(dān)心index溢出的問(wèn)題。index是long類型,即使100萬(wàn)QPS的處理速度,也需要30萬(wàn)年才能用完。

  • 環(huán)持續(xù)向 buffer 中寫入數(shù)據(jù),這個(gè)序號(hào)會(huì)一直增長(zhǎng),直到繞過(guò)整個(gè)環(huán)

  • 環(huán)形數(shù)組結(jié)構(gòu)


  • 新產(chǎn)生的sequence只覆蓋,相對(duì)于傳統(tǒng)隊(duì)列不需要頻繁GC

  private E dequeue() {  
        final Object[] items = this.items;  
        E x = (E) items[takeIndex];  
        items[takeIndex] = null;  
        if (++takeIndex == items.length)  
            takeIndex = 0;  
        count--;/當(dāng)前擁有元素個(gè)數(shù)減1  
        if (itrs != null)  
            itrs.elementDequeued();  
        notFull.signal();//有一個(gè)元素取出成功,那肯定隊(duì)列不滿  
        return x;  
    }  

ArrayBloackQueue出隊(duì)takeIndex索引所在元素設(shè)置為NULL,高吞吐量下隊(duì)列會(huì)產(chǎn)生大量GC

  • CAS維護(hù)了一個(gè)sequence,無(wú)鎖自旋增長(zhǎng)
    每個(gè)生產(chǎn)者或者消費(fèi)者線程,會(huì)先申請(qǐng)可以操作的元素在數(shù)組中的位置,申請(qǐng)到之后,直接在該位置寫入或者讀取數(shù)據(jù)。
    假設(shè)兩個(gè)生產(chǎn)者都想申請(qǐng)第7號(hào)slot, 則它們會(huì)同時(shí)執(zhí)行CAS自增,執(zhí)行成功的人得到該序列號(hào)slot=7,另一個(gè)則重試?yán)^續(xù)申請(qǐng)下一個(gè)可用的slot=8,之后根據(jù)mod/size去環(huán)形數(shù)組中尋找自己的位置。
    消費(fèi)者處理邏輯類似。

三、解決沖突—揭秘內(nèi)存屏障

  • 關(guān)鍵字volatile:Java內(nèi)存模型將在寫操作后插入一個(gè)寫屏障指令,在讀操作前插入一個(gè)讀屏障指令


    java內(nèi)存模型

RingBuffer的指針(cursor)屬于一個(gè)volatile變量,同時(shí)也是我們能夠不用鎖操作就能實(shí)現(xiàn)Disruptor的原因之一


生產(chǎn)者對(duì)RingBuffer更新序列號(hào),之后會(huì)對(duì)volatile字段(cursor)的寫操作創(chuàng)建了一個(gè)內(nèi)存屏障,這個(gè)屏障將刷新所有緩存里的值(緩存失效)
消費(fèi)者獲取RingBuffer序列號(hào),涉及到讀沖突的緩存失效,C2在C1之后,C2拿到C1更新過(guò)的序列號(hào)之后,C2才能獲取next序列號(hào)。內(nèi)存屏障保證了他們之前的執(zhí)行順序,消費(fèi)者總能獲取最新的序列號(hào)

  • 內(nèi)存屏障作為另一個(gè)CPU級(jí)的指令,沒有鎖那樣大的開銷,volatile意味著你不用加鎖,就能讓獲得性能的提升

四、disruptor多線程并發(fā)讀寫過(guò)程淺析

讀寫并行簡(jiǎn)圖


多個(gè)生產(chǎn)者的情況下,會(huì)遇到“多個(gè)線程重復(fù)寫同一個(gè)元素”的問(wèn)題,解決方法是,每個(gè)線程獲取不同的一段數(shù)組空間進(jìn)行操作,這個(gè)通過(guò)CAS很容易達(dá)到。只需要在分配元素的時(shí)候,通過(guò)CAS無(wú)腦自增即可判斷。
Disruptor在多個(gè)生產(chǎn)者的情況下,引入了一個(gè)與Ring Buffer大小相同的buffer:AvailableBuffer。當(dāng)某個(gè)位置寫入成功的時(shí)候,便把Availble Buffer相應(yīng)的位置置位,標(biāo)記為寫入成功。讀取的時(shí)候,會(huì)遍歷available Buffer,來(lái)判斷元素是否已經(jīng)就緒。
消費(fèi)者保持一個(gè)自己的序列,每次累加后nextSequence,去獲取可訪問(wèn)的最大序列。對(duì)于一個(gè)生產(chǎn)者,就是nextSequence到RingBuffer當(dāng)前游標(biāo)的序列。對(duì)于多個(gè)生產(chǎn)者,就是nextSequence到RingBuffer當(dāng)前游標(biāo)之間,最大的連續(xù)的序列集。
消費(fèi)端部分源碼分析

public long waitFor(final long sequence){
        checkAlert();
        //獲取最大的可消費(fèi)的序列,依賴等待策略,策略設(shè)計(jì)模式的一種體現(xiàn)
        long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
        if (availableSequence < sequence)  {
            return availableSequence;
        }
        return sequencer.getHighestPublishedSequence(sequence,availableSequence);
    }

1、讀寫不存在沖突:消費(fèi)者讀取到序號(hào) x 位置元素都被生產(chǎn)者寫入成功,消費(fèi)者消費(fèi)這一段區(qū)間數(shù)據(jù)。
2、讀寫存在沖突:消費(fèi)者讀取到序號(hào)x位置生產(chǎn)者正在寫入,也就是下圖availble Buffer中標(biāo)記為-1的位置,則消費(fèi)者返回該序號(hào)x,并執(zhí)行一段等待策略

  • 常見的等待策略
    BlockingWaitStrategy:通過(guò)線程阻塞的方式,等待生產(chǎn)者喚醒
    BusySpinWaitStrategy:線程一直自旋等待,比較耗CPU
    YieldingWaitStrategy: 自旋 + yield + 自旋(折中方案)
    等等
  • 多個(gè)生產(chǎn)者情況下,消費(fèi)者消費(fèi)過(guò)程示意圖


  • disruptor寫線程源碼片段分析
do
    {
        current = cursor.get();
        next = current + n;

        if (!hasAvailableCapacity(gatingSequences, n, current))
        {
            throw InsufficientCapacityException.INSTANCE;
        }
    }
    while (!cursor.compareAndSet(current, next));
    //next 類比于ArrayBlockQueue的數(shù)組索引index
    return next;

多線程環(huán)境下,多個(gè)生產(chǎn)者通過(guò)do/while循環(huán)的條件CAS,來(lái)判斷每次申請(qǐng)的空間是否已經(jīng)被其他生產(chǎn)者占據(jù)。假如已經(jīng)被占據(jù),該函數(shù)會(huì)返回失敗,While循環(huán)重新執(zhí)行,申請(qǐng)寫入空間。

  • 多個(gè)生產(chǎn)者情況下,生產(chǎn)者生產(chǎn)過(guò)程示意圖


五、聊一聊緩存?zhèn)喂蚕?/h1>

計(jì)算機(jī)系統(tǒng)中為了解決主內(nèi)存與CPU運(yùn)行速度的差距,在CPU與主內(nèi)存之間添加(Cache)CPU硬件級(jí)別緩存系統(tǒng)中是以緩存行(cache line)為單位存儲(chǔ)的,當(dāng)多線程修改互相獨(dú)立的變量時(shí),如果這些變量共享同一個(gè)緩存行,就會(huì)無(wú)意中影響彼此的性能,這就是偽共享,多線程環(huán)境下會(huì)導(dǎo)致緩存命中率很低!

  • 偽共享問(wèn)題的產(chǎn)生



    如上圖變量x,y同時(shí)被放到了CPU的一級(jí)和二級(jí)緩存,當(dāng)線程1使用CPU1對(duì)變量x進(jìn)行更新時(shí)候,首先會(huì)修改cpu1的一級(jí)緩存變量x所在緩存行,這時(shí)候緩存一致性協(xié)議會(huì)導(dǎo)致cpu2中變量x對(duì)應(yīng)的緩存行失效,那么線程2寫入變量x的時(shí)候就只能去二級(jí)緩存去查找,這就破壞了一級(jí)緩存,而一級(jí)緩存比二級(jí)緩存更快。更壞的情況下如果cpu只有一級(jí)緩存,那么會(huì)導(dǎo)致頻繁的直接訪問(wèn)主內(nèi)存,增大系統(tǒng)開銷。

  • ArrayBlockingQueue有三個(gè)成員變量
    takeIndex:出隊(duì)元素下標(biāo)
    putIndex:入隊(duì)元素的下標(biāo)
    count:隊(duì)列中元素的數(shù)量
    這三個(gè)變量很容易放到一個(gè)緩存行中,但是之間修改沒有太多的關(guān)聯(lián)。所以每次修改,都會(huì)使之前緩存的數(shù)據(jù)失效,需要從主存中重新讀取,從而不能完全達(dá)到共享的效果。


    ArrayBlockingQueue偽共享示意圖
  • 連續(xù)內(nèi)存塊巧妙規(guī)避偽共享問(wèn)題
    緩存行以64個(gè)字節(jié)為單位(,long 類型是 8 字節(jié),因此在一個(gè)緩存行中可以存 8 個(gè) long 類型的變量,如果你訪問(wèn)一個(gè) long 數(shù)組,當(dāng)數(shù)組中的一個(gè)值被加載到緩存中,它會(huì)額外加載另外 7 個(gè),以致你能非常快地遍歷這個(gè)數(shù)組(緩存共享的免費(fèi)福利)
    Long[] arr = new Long[64 * 1024 * 1024];
    long start = System.nanoTime();
    for (int i = 0; i < arr.length; i++) {
        arr[i] *= 3;
    }
    System.out.println(System.nanoTime() - start);
    long start2 = System.nanoTime();
    for (int i = 0; i < arr.length; i += 8) {
        arr[i] *= 3;
    }
    System.out.println(System.nanoTime() - start2);
  • 表面上看,循環(huán)二工作量為第循環(huán)一的1/8;但是執(zhí)行時(shí)間是相差不大的,因?yàn)槊?8個(gè)Long占用8*8=64字節(jié),正好一個(gè)cache,也就是說(shuō)這兩個(gè)循環(huán)訪問(wèn)內(nèi)存的次數(shù)是一致的,導(dǎo)致耗時(shí)相差不大(訪問(wèn)cache時(shí)間rt忽略不計(jì))
  • disruptor怎么解決偽共享問(wèn)題
    其中一個(gè)解決思路,就是讓不同線程操作的對(duì)象處于不同的緩存行即可即緩存行填充(Padding),使一個(gè)對(duì)象占用的內(nèi)存大小剛好為64bytes或它的整數(shù)倍,這樣就保證了一個(gè)緩存行里不會(huì)有多個(gè)對(duì)象,這其實(shí)是一種以空間換時(shí)間的方案。
  • disruptor sequence偽代碼
 //在序號(hào)實(shí)際value變量(long型)左邊填充7個(gè)long變量
 class LhsPadding
{
    protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{
    protected volatile long value;
}
//在序號(hào)實(shí)際value變量(long型)右邊填充7個(gè)long變量
class RhsPadding extends Value
{
    protected long p9, p10, p11, p12, p13, p14, p15;
}
public class Sequence extends RhsPadding

Sequence實(shí)際value變量的左右均被填充了7個(gè)long型變量,其自身也是long型變量,一個(gè)long型變量占據(jù)8個(gè)字節(jié),所以序號(hào)與他上一個(gè)/下一個(gè)序號(hào)之間的最小內(nèi)存距離為:158=120byte,加上對(duì)象頭的8個(gè)字節(jié),可以確保sequence大小128byte=264byte(有的CPU緩存行是128byte)
這樣直接的代價(jià)就是增大的15倍的內(nèi)存消耗空間,這樣的設(shè)計(jì)導(dǎo)致不可能有兩個(gè)cursor出現(xiàn)在同一個(gè)cpu cache line中, 就解決了”偽共享”問(wèn)題!

六、demo應(yīng)用

  • 定義事件,Event就是通過(guò) Disruptor 進(jìn)行交換的數(shù)據(jù)類型(事件監(jiān)聽模式)
public class LongEvent
{
    private long value;

    public void set(long value)
    {
        this.value = value;
    }
}
  • 定義事件工廠
{
    public LongEvent newInstance()
    {
        return new LongEvent();
    }
}
  • 事件的具體實(shí)現(xiàn)
public class LongEventHandler implements EventHandler<LongEvent>
{
    //sequence是上圖環(huán)形數(shù)組中的序列號(hào)
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println("Event: " + event);
    }
}
  • 啟動(dòng)disruptor
EventFactory<LongEvent> eventFactory = new LongEventFactory();
ExecutorService executor = Executors.newSingleThreadExecutor();
int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必須是 2 的 N 次方;
        
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,
                ringBufferSize, executor, ProducerType.SINGLE,
                new YieldingWaitStrategy());
        
EventHandler<LongEvent> eventHandler = new LongEventHandler();
disruptor.handleEventsWith(eventHandler);
        
disruptor.start();
  • 發(fā)布事件
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
long sequence = ringBuffer.next();//請(qǐng)求下一個(gè)事件序號(hào);
    
try {
    LongEvent event = ringBuffer.get(sequence);//獲取該序號(hào)對(duì)應(yīng)的事件對(duì)象;
    long data = getEventData();//獲取要通過(guò)事件傳遞的業(yè)務(wù)數(shù)據(jù);
    event.set(data);
} finally{
    ringBuffer.publish(sequence);//發(fā)布事件;
}

六 應(yīng)用場(chǎng)景

  • wacai-zipkin鏈路跟蹤服務(wù)端kafka日志處理
  • Log4J2

三克油

fudata技術(shù)沙龍還需要伙計(jì)們繼續(xù)努力添磚加瓦!

狗年大吉-汪汪汪
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容