再談Disruptor:甜蜜區(qū)與使用方法

使用模型

單生產(chǎn)者/多生產(chǎn)者 + 多個(gè)單線程消費(fèi)者

生產(chǎn)者

往RingBuffer里邊寫Event,可以單線程寫,也可以多線程并發(fā)寫。

  • ProducerType.SINGLE:?jiǎn)紊a(chǎn)者,序號(hào)分配無(wú)鎖(最快)
  • ProducerType.MULTI:多生產(chǎn)者并發(fā)寫,序號(hào)分配用 CAS

消費(fèi)者

Disruptor 的消費(fèi)者是長(zhǎng)期運(yùn)行的 EventProcessor 線程,死循環(huán) while(running) 從 RingBuffer取事件處理。Disruptor 構(gòu)造器只接收一個(gè) ThreadFactory,然后為每個(gè) EventProcessor(消費(fèi)者) 調(diào)用一次這個(gè)工廠來(lái)創(chuàng)建獨(dú)立線程。比如:

// 1個(gè) Disruptor,1個(gè) ThreadFactory
disruptor.handleEventsWith(handlerA, handlerB);
// 內(nèi)部創(chuàng)建 2個(gè) EventProcessor線程,ThreadFactory 被調(diào)用 2次

根據(jù)實(shí)際的業(yè)務(wù)邏輯來(lái)選擇消費(fèi)者如何編排(DAG)

  // 并行:A 和 B 同時(shí)處理同一個(gè)事件
  disruptor.handleEventsWith(handlerA, handlerB);

  // 串行:B 等 A 處理完再處理
  disruptor.handleEventsWith(handlerA).then(handlerB);

  // 菱形:A 先處理,然后 B 和 C 并行,最后 D
  disruptor.handleEventsWith(handlerA)
           .then(handlerB, handlerC)
           .then(handlerD);

串行、菱形一般是各個(gè)EventHandler處理不同的業(yè)務(wù)邏輯。并行可能有兩種情況,一個(gè)是確實(shí)是可以并行處理的兩種不同業(yè)務(wù)邏輯,還一個(gè)情況是同一個(gè)業(yè)務(wù)邏輯,為了充分利用多核CPU,handleEventsWith綁定兩個(gè)同樣的handler,disruptor.handleEventsWith(handlerA, handlerA),加速處理消費(fèi)。

Disruptor的甜蜜區(qū),以及io阻塞問(wèn)題

計(jì)算密集型 + 高頻 + 低延遲的消費(fèi)者邏輯是Disruptor的甜蜜區(qū),比如:

  • 訂單匹配引擎
  • 風(fēng)控規(guī)則計(jì)算
  • 復(fù)雜事件處理(CEP)

消費(fèi)者線程模型一般是每個(gè)單線程處理每個(gè)handler的邏輯,消費(fèi)者在專用線程里同步做完所有計(jì)算密集型邏輯,利用 CPU cache親和性和無(wú)鎖隊(duì)列榨干性能。也就是消費(fèi)者線程釘死在單個(gè) CPU 核上,利用 cache affinity和無(wú)鎖隊(duì)列把納秒級(jí)延遲榨到極致。

那如果是io型任務(wù)呢?

首先,EventProcessor 線程一般不能被阻塞,它是 Disruptor 消費(fèi)鏈路的"心臟停跳點(diǎn)":

  • 阻塞了 → RingBuffer 消費(fèi)停滯 → 生產(chǎn)者 next() 遲早阻塞(或覆蓋舊事件)→ 整條管線背壓崩潰
  • 如果是 handleEventsWith(A, B) 并行模式,同一個(gè)消費(fèi)者組里的其他 handler 也會(huì)一起卡

所以,有兩種選擇:

  • 使用異步非阻塞的io,比如netty:

    Channel channel.eventLoop().execute(new Runnable() {
       @Override
       public void run() {
          if (channel.isActive()) {
             channel.writeAndFlush(new TextWebSocketFrame(finalMsg));
          }
       }
    });
    
  • 消費(fèi)者確實(shí)有阻塞 IO
    那就應(yīng)該在 onEvent 里把任務(wù)丟給額外的線程池,讓EventProcessor 保持"非阻塞調(diào)度者"的角色。 但這樣一來(lái),Disruptor 的價(jià)值就大幅縮水了——它退化成了一條"高性能隊(duì)列 + 單線程調(diào)度器"。這時(shí)候用 LinkedBlockingQueue + 單線程消費(fèi)者 也能達(dá)到類似效果,只是延遲和吞吐量稍差。

Log4j2的特例與取舍

log4j2 的 AsyncLogger 是 Disruptor 最典型的"異步解耦"用法,不是計(jì)算密集型。模型如下:

業(yè)務(wù)線程A ──┐
業(yè)務(wù)線程B ──┼──→ Disruptor RingBuffer ──→ 單消費(fèi)者后臺(tái)線程 ──→ 刷盤/發(fā)網(wǎng)絡(luò)
業(yè)務(wù)線程C ──┘        (無(wú)鎖publish)          (阻塞IO)

關(guān)鍵點(diǎn):

  1. 業(yè)務(wù)線程完全不阻塞
    多個(gè)業(yè)務(wù)線程并發(fā) publish日志事件,Disruptor 的無(wú)鎖隊(duì)列保證這是納秒級(jí)操作。業(yè)務(wù)代碼感受不到 IO 延遲。
  2. 阻塞只困在后臺(tái)消費(fèi)者線程
    只有一個(gè)消費(fèi)者線程在做刷盤、網(wǎng)絡(luò)發(fā)送這些慢 IO。它阻塞了也無(wú)所謂,RingBuffer 頂著,業(yè)務(wù)線程繼續(xù)飛。
  3. 避免 GC(最大的隱藏收益)
    log4j2 配合 Disruptor 會(huì)預(yù)分配 RingBuffer 大小的 LogEvent 對(duì)象池。日志事件不是 new 出來(lái)的,而是復(fù)用 RingBuffer里的槽位。對(duì)于高吞吐系統(tǒng),這消除了百萬(wàn)級(jí)對(duì)象的 GC 壓力。

所以 log4j2 不是把 Disruptor 當(dāng)"計(jì)算引擎"用,而是當(dāng)"無(wú)鎖緩沖 + 對(duì)象池 + 線程隔離器"用。

但是,單消費(fèi)者 + 阻塞 IO + 有界 RingBuffer,這三者組合下,如果持續(xù)生產(chǎn)速度 > 消費(fèi)速度,RingBuffer終究會(huì)滿,生產(chǎn)者最終一定會(huì)被背壓。

RingBuffer 能"頂著"的前提是:

  1. 平均消費(fèi)速率 >= 平均生產(chǎn)速率
    刷盤雖然每次有毫秒級(jí)延遲,但批量寫入的吞吐量很高。大多數(shù)情況下消費(fèi)者跟得上,RingBuffer 作為峰值緩沖,吸收瞬時(shí)的 burst,業(yè)務(wù)線程幾乎無(wú)感知。
  2. RingBuffer 滿后的行為取決于 WaitStrategy
    log4j2 默認(rèn)用的是 TimeoutBlockingWaitStrategyBlockingWaitStrategy。當(dāng) RingBuffer 滿時(shí),業(yè)務(wù)線程的 publish()會(huì)阻塞等待(或超時(shí))。所以極端壓力下,業(yè)務(wù)線程還是會(huì)卡。
  3. log4j2是個(gè)日志框架 允許丟棄策略
    當(dāng) RingBuffer 滿時(shí),可以選擇丟棄最新日志而不是阻塞業(yè)務(wù)線程。但這對(duì)交易系統(tǒng)通常就不可接受了。
┌────────────────────────────┬───────────────────────────────────────────┐
│            場(chǎng)景            │                  結(jié)果                      │
├────────────────────────────┼───────────────────────────────────────────┤
│ 消費(fèi)者非阻塞                │最優(yōu),管線永不背壓                           │
├────────────────────────────┼───────────────────────────────────────────┤
│ 消費(fèi)者偶爾阻塞,平均能跟上   │RingBuffer 吸收峰值,大部分時(shí)間業(yè)務(wù)線程不受影響│
├────────────────────────────┼───────────────────────────────────────────┤
│ 消費(fèi)者持續(xù)阻塞,跟不上生產(chǎn)   │ RingBuffer 滿 → 生產(chǎn)者 next()/publish() 阻塞│
│                            │        → 背壓傳遞到業(yè)務(wù)線程                 │
└────────────────────────────┴───────────────────────────────────────────┘

所以嚴(yán)格來(lái)說(shuō),單消費(fèi)者阻塞 IO確實(shí)會(huì)制造背壓風(fēng)險(xiǎn),只是 log4j2 通過(guò)大 RingBuffer 把風(fēng)險(xiǎn)推遲了。

在用 Disruptor 的項(xiàng)目

┌──────────────┬────────────────────────────────────────────────────────┐
│     項(xiàng)目      │                  用途                                  │
├──────────────┼────────────────────────────────────────────────────────┤
│ LMAX         │ 發(fā)源地,外匯交易訂單匹配和風(fēng)控                            │
│ Exchange     │                                                        │
├──────────────┼────────────────────────────────────────────────────────┤
│ Apache       │ AsyncLogger 異步日志                                    │
│ Log4j2       │                                                        │
├──────────────┼────────────────────────────────────────────────────────┤
│ Apache Storm │ 早期版本 Worker 間消息傳輸(后續(xù)版本換了)                │
├──────────────┼────────────────────────────────────────────────────────┤
│ Apache Camel │ SEDA 組件的底層隊(duì)列                                     │
├──────────────┼─────────────────────────────────────────────────────────┤
│ Agrona       │和 Disruptor 同團(tuán)隊(duì)(LMAX),無(wú)鎖數(shù)據(jù)結(jié)構(gòu)庫(kù),很多項(xiàng)目間接依賴│
├──────────────┼─────────────────────────────────────────────────────────┤
│ Hazelcast    │ 部分內(nèi)部并發(fā)結(jié)構(gòu)受其影響                                 │
├──────────────┼─────────────────────────────────────────────────────────┤
│ JCTools      │ 不是用 Disruptor,而是 Martin Thompson(Disruptor         │
│              │ 作者之一)的另一套無(wú)鎖集合,很多框架底層用它替代 JDK 并發(fā)容器 │
└──────────────┴─────────────────────────────────────────────────────────┘

國(guó)內(nèi)項(xiàng)目公開宣稱用的不多,但不少證券/期貨公司的交易柜臺(tái)內(nèi)部會(huì)參考 LMAX 的架構(gòu)用 Disruptor 做委托鏈路。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容