使用模型
單生產(chǎn)者/多生產(chǎn)者 + 多個(gè)單線程消費(fèi)者
生產(chǎn)者
往RingBuffer里邊寫Event,可以單線程寫,也可以多線程并發(fā)寫。
-
ProducerType.SINGLE:?jiǎn)紊a(chǎn)者,序號(hào)分配無(wú)鎖(最快) -
ProducerType.MULTI:多生產(chǎn)者并發(fā)寫,序號(hào)分配用 CAS
消費(fèi)者
Disruptor 的消費(fèi)者是長(zhǎng)期運(yùn)行的 EventProcessor 線程,死循環(huán) while(running) 從 RingBuffer取事件處理。Disruptor 構(gòu)造器只接收一個(gè) ThreadFactory,然后為每個(gè) EventProcessor(消費(fèi)者) 調(diào)用一次這個(gè)工廠來(lái)創(chuàng)建獨(dú)立線程。比如:
// 1個(gè) Disruptor,1個(gè) ThreadFactory
disruptor.handleEventsWith(handlerA, handlerB);
// 內(nèi)部創(chuàng)建 2個(gè) EventProcessor線程,ThreadFactory 被調(diào)用 2次
根據(jù)實(shí)際的業(yè)務(wù)邏輯來(lái)選擇消費(fèi)者如何編排(DAG)
// 并行:A 和 B 同時(shí)處理同一個(gè)事件
disruptor.handleEventsWith(handlerA, handlerB);
// 串行:B 等 A 處理完再處理
disruptor.handleEventsWith(handlerA).then(handlerB);
// 菱形:A 先處理,然后 B 和 C 并行,最后 D
disruptor.handleEventsWith(handlerA)
.then(handlerB, handlerC)
.then(handlerD);
串行、菱形一般是各個(gè)EventHandler處理不同的業(yè)務(wù)邏輯。并行可能有兩種情況,一個(gè)是確實(shí)是可以并行處理的兩種不同業(yè)務(wù)邏輯,還一個(gè)情況是同一個(gè)業(yè)務(wù)邏輯,為了充分利用多核CPU,handleEventsWith綁定兩個(gè)同樣的handler,disruptor.handleEventsWith(handlerA, handlerA),加速處理消費(fèi)。
Disruptor的甜蜜區(qū),以及io阻塞問(wèn)題
計(jì)算密集型 + 高頻 + 低延遲的消費(fèi)者邏輯是Disruptor的甜蜜區(qū),比如:
- 訂單匹配引擎
- 風(fēng)控規(guī)則計(jì)算
- 復(fù)雜事件處理(CEP)
消費(fèi)者線程模型一般是每個(gè)單線程處理每個(gè)handler的邏輯,消費(fèi)者在專用線程里同步做完所有計(jì)算密集型邏輯,利用 CPU cache親和性和無(wú)鎖隊(duì)列榨干性能。也就是消費(fèi)者線程釘死在單個(gè) CPU 核上,利用 cache affinity和無(wú)鎖隊(duì)列把納秒級(jí)延遲榨到極致。
那如果是io型任務(wù)呢?
首先,EventProcessor 線程一般不能被阻塞,它是 Disruptor 消費(fèi)鏈路的"心臟停跳點(diǎn)":
- 阻塞了 → RingBuffer 消費(fèi)停滯 → 生產(chǎn)者 next() 遲早阻塞(或覆蓋舊事件)→ 整條管線背壓崩潰
- 如果是 handleEventsWith(A, B) 并行模式,同一個(gè)消費(fèi)者組里的其他 handler 也會(huì)一起卡
所以,有兩種選擇:
-
使用異步非阻塞的io,比如netty:
Channel channel.eventLoop().execute(new Runnable() { @Override public void run() { if (channel.isActive()) { channel.writeAndFlush(new TextWebSocketFrame(finalMsg)); } } }); 消費(fèi)者確實(shí)有阻塞 IO
那就應(yīng)該在 onEvent 里把任務(wù)丟給額外的線程池,讓EventProcessor 保持"非阻塞調(diào)度者"的角色。 但這樣一來(lái),Disruptor 的價(jià)值就大幅縮水了——它退化成了一條"高性能隊(duì)列 + 單線程調(diào)度器"。這時(shí)候用 LinkedBlockingQueue + 單線程消費(fèi)者 也能達(dá)到類似效果,只是延遲和吞吐量稍差。
Log4j2的特例與取舍
log4j2 的 AsyncLogger 是 Disruptor 最典型的"異步解耦"用法,不是計(jì)算密集型。模型如下:
業(yè)務(wù)線程A ──┐
業(yè)務(wù)線程B ──┼──→ Disruptor RingBuffer ──→ 單消費(fèi)者后臺(tái)線程 ──→ 刷盤/發(fā)網(wǎng)絡(luò)
業(yè)務(wù)線程C ──┘ (無(wú)鎖publish) (阻塞IO)
關(guān)鍵點(diǎn):
- 業(yè)務(wù)線程完全不阻塞
多個(gè)業(yè)務(wù)線程并發(fā) publish日志事件,Disruptor 的無(wú)鎖隊(duì)列保證這是納秒級(jí)操作。業(yè)務(wù)代碼感受不到 IO 延遲。 - 阻塞只困在后臺(tái)消費(fèi)者線程
只有一個(gè)消費(fèi)者線程在做刷盤、網(wǎng)絡(luò)發(fā)送這些慢 IO。它阻塞了也無(wú)所謂,RingBuffer 頂著,業(yè)務(wù)線程繼續(xù)飛。 - 避免 GC(最大的隱藏收益)
log4j2 配合 Disruptor 會(huì)預(yù)分配 RingBuffer 大小的 LogEvent 對(duì)象池。日志事件不是 new 出來(lái)的,而是復(fù)用 RingBuffer里的槽位。對(duì)于高吞吐系統(tǒng),這消除了百萬(wàn)級(jí)對(duì)象的 GC 壓力。
所以 log4j2 不是把 Disruptor 當(dāng)"計(jì)算引擎"用,而是當(dāng)"無(wú)鎖緩沖 + 對(duì)象池 + 線程隔離器"用。
但是,單消費(fèi)者 + 阻塞 IO + 有界 RingBuffer,這三者組合下,如果持續(xù)生產(chǎn)速度 > 消費(fèi)速度,RingBuffer終究會(huì)滿,生產(chǎn)者最終一定會(huì)被背壓。
RingBuffer 能"頂著"的前提是:
- 平均消費(fèi)速率 >= 平均生產(chǎn)速率
刷盤雖然每次有毫秒級(jí)延遲,但批量寫入的吞吐量很高。大多數(shù)情況下消費(fèi)者跟得上,RingBuffer作為峰值緩沖,吸收瞬時(shí)的 burst,業(yè)務(wù)線程幾乎無(wú)感知。 -
RingBuffer滿后的行為取決于WaitStrategy
log4j2 默認(rèn)用的是TimeoutBlockingWaitStrategy或BlockingWaitStrategy。當(dāng)RingBuffer滿時(shí),業(yè)務(wù)線程的 publish()會(huì)阻塞等待(或超時(shí))。所以極端壓力下,業(yè)務(wù)線程還是會(huì)卡。 - log4j2是個(gè)日志框架 允許丟棄策略
當(dāng) RingBuffer 滿時(shí),可以選擇丟棄最新日志而不是阻塞業(yè)務(wù)線程。但這對(duì)交易系統(tǒng)通常就不可接受了。
┌────────────────────────────┬───────────────────────────────────────────┐
│ 場(chǎng)景 │ 結(jié)果 │
├────────────────────────────┼───────────────────────────────────────────┤
│ 消費(fèi)者非阻塞 │最優(yōu),管線永不背壓 │
├────────────────────────────┼───────────────────────────────────────────┤
│ 消費(fèi)者偶爾阻塞,平均能跟上 │RingBuffer 吸收峰值,大部分時(shí)間業(yè)務(wù)線程不受影響│
├────────────────────────────┼───────────────────────────────────────────┤
│ 消費(fèi)者持續(xù)阻塞,跟不上生產(chǎn) │ RingBuffer 滿 → 生產(chǎn)者 next()/publish() 阻塞│
│ │ → 背壓傳遞到業(yè)務(wù)線程 │
└────────────────────────────┴───────────────────────────────────────────┘
所以嚴(yán)格來(lái)說(shuō),單消費(fèi)者阻塞 IO確實(shí)會(huì)制造背壓風(fēng)險(xiǎn),只是 log4j2 通過(guò)大 RingBuffer 把風(fēng)險(xiǎn)推遲了。
在用 Disruptor 的項(xiàng)目
┌──────────────┬────────────────────────────────────────────────────────┐
│ 項(xiàng)目 │ 用途 │
├──────────────┼────────────────────────────────────────────────────────┤
│ LMAX │ 發(fā)源地,外匯交易訂單匹配和風(fēng)控 │
│ Exchange │ │
├──────────────┼────────────────────────────────────────────────────────┤
│ Apache │ AsyncLogger 異步日志 │
│ Log4j2 │ │
├──────────────┼────────────────────────────────────────────────────────┤
│ Apache Storm │ 早期版本 Worker 間消息傳輸(后續(xù)版本換了) │
├──────────────┼────────────────────────────────────────────────────────┤
│ Apache Camel │ SEDA 組件的底層隊(duì)列 │
├──────────────┼─────────────────────────────────────────────────────────┤
│ Agrona │和 Disruptor 同團(tuán)隊(duì)(LMAX),無(wú)鎖數(shù)據(jù)結(jié)構(gòu)庫(kù),很多項(xiàng)目間接依賴│
├──────────────┼─────────────────────────────────────────────────────────┤
│ Hazelcast │ 部分內(nèi)部并發(fā)結(jié)構(gòu)受其影響 │
├──────────────┼─────────────────────────────────────────────────────────┤
│ JCTools │ 不是用 Disruptor,而是 Martin Thompson(Disruptor │
│ │ 作者之一)的另一套無(wú)鎖集合,很多框架底層用它替代 JDK 并發(fā)容器 │
└──────────────┴─────────────────────────────────────────────────────────┘
國(guó)內(nèi)項(xiàng)目公開宣稱用的不多,但不少證券/期貨公司的交易柜臺(tái)內(nèi)部會(huì)參考 LMAX 的架構(gòu)用 Disruptor 做委托鏈路。