前言
介紹高性能隊(duì)列Disruptor原理以及使用例子。
Disruptor是什么?
Disruptor是外匯和加密貨幣交易所運(yùn)營(yíng)商 LMAX group 建立高性能的金融交易所的結(jié)果。用于解決生產(chǎn)者、消費(fèi)者及其數(shù)據(jù)存儲(chǔ)的設(shè)計(jì)問題的高性能隊(duì)列實(shí)現(xiàn)??梢詫?duì)標(biāo)JDK中的ArrayBlockingQueue。是目前單機(jī)且基于內(nèi)存存儲(chǔ)的最高性能的隊(duì)列實(shí)現(xiàn)。見 與ArrayBlockingQueue性能對(duì)比。
Disruptor高性能秘訣
使用CAS代替鎖
鎖非常昂貴,因?yàn)樗鼈冊(cè)诟?jìng)爭(zhēng)時(shí)需要仲裁。這種仲裁是通過到操作系統(tǒng)內(nèi)核的上下文切換來實(shí)現(xiàn)的,該內(nèi)核將掛起等待鎖的線程,直到它被釋放。系統(tǒng)提供的原子操作CAS(Compare And Swap/Set)是很好的鎖替代方案,Disruptor中同步就是使用的這種。
比如多生產(chǎn)者模式中com.lmax.disruptor.MultiProducerSequencer就是用了Java里sun.misc.Unsafe類基于CAS實(shí)現(xiàn)的API。

等待策略com.lmax.disruptor.BlockingWaitStrategy使用了基于CAS實(shí)現(xiàn)的ReentrantLock。

獨(dú)占緩存行
為了提高效率CPU硬件不會(huì)以字節(jié)或字為單位移動(dòng)內(nèi)存,而是以緩存行,通常大小為 32-256 字節(jié)的緩存行,最常見的緩存行是 64 字節(jié)。這意味著,如果兩個(gè)變量在同一個(gè)緩存行中,并且由不同的線程寫入,那么它們會(huì)出現(xiàn)與單個(gè)變量相同的寫入爭(zhēng)用問題。為了獲得高性能,如果要最小化爭(zhēng)用,那么確保獨(dú)立但同時(shí)寫入的變量不共享相同的緩存行是很重要的。
比如com.lmax.disruptor.RingBuffer中屬性前后都用未賦值的long來獨(dú)占。com.lmax.disruptor.SingleProducerSequencerPad也有相同處理方式。


環(huán)形隊(duì)列
- 使用有界隊(duì)列,減少線程爭(zhēng)用
隊(duì)列相比鏈表在訪問速度上占據(jù)優(yōu)勢(shì),而有界隊(duì)列相比可動(dòng)態(tài)擴(kuò)容的無界隊(duì)列則避免擴(kuò)容產(chǎn)生的同步問題效率更高。Disruptor和JDK中的ArrayBlockingQueue一樣使用有界隊(duì)列。隊(duì)列長(zhǎng)度要設(shè)為2的n次冪,有利于二進(jìn)制計(jì)算。
- 使用環(huán)形數(shù)組,避免生產(chǎn)和消費(fèi)速度差異導(dǎo)致隊(duì)列頭和尾爭(zhēng)用
Disruptor在邏輯上將數(shù)組的的頭尾看成是相連的,即一個(gè)環(huán)形數(shù)組(RingBuffer)。
- Sequence
生產(chǎn)和消費(fèi)都需要維護(hù)自增序列值(Sequence),從0開始。
生產(chǎn)方只維護(hù)一個(gè)代表生產(chǎn)的最后一個(gè)元素的序號(hào)。代表生產(chǎn)的最后一個(gè)元素的序號(hào)。每次向Disruptor發(fā)布一個(gè)元素都調(diào)用Sequenced.next()來獲取下個(gè)位置的寫入權(quán)。
在單生產(chǎn)者模式(SINGLE)由于不存在并發(fā)寫入,則不需要解決同步問題。在多生產(chǎn)者模式(MULTI)就需要借助JDK中基于CAS(Compare And Swap/Set)實(shí)現(xiàn)的API來保證線程安全。
多個(gè)消費(fèi)者各自維護(hù)自己的消費(fèi)序列值(Sequence)保存數(shù)組中。
而環(huán)形通過與運(yùn)算(sequence & indexMask)實(shí)現(xiàn)的,indexMask就是環(huán)形隊(duì)列的長(zhǎng)度-1。以環(huán)形隊(duì)列長(zhǎng)度8為例,第9個(gè)元素Sequence為8,8 & 7 = 0,剛好又回到了數(shù)組第1個(gè)位置。
見com.lmax.disruptor.RingBuffer.elementAt(long sequence)

預(yù)分配內(nèi)存
環(huán)形隊(duì)列存放的是Event對(duì)象,而且是在Disruptor創(chuàng)建的時(shí)候調(diào)用EventFactory創(chuàng)建并一次將隊(duì)列填滿。Event保存生產(chǎn)者生產(chǎn)的數(shù)據(jù),消費(fèi)也是通過Event獲取,后續(xù)生產(chǎn)則只需要替換掉Event中的屬性值。這種方式避免了重復(fù)創(chuàng)建對(duì)象,降低JVM的GC產(chǎn)頻率。
見com.lmax.disruptor.RingBuffer.fill(EventFactory<E> eventFactory)

消費(fèi)者8種等待策略
當(dāng)消費(fèi)速度大于生產(chǎn)速度情況下,消費(fèi)者執(zhí)行的等待策略。
| 策略類名 | 描述 |
|---|---|
| BlockingWaitStrategy(常用) | 使用ReentrantLock,失敗則進(jìn)入等待隊(duì)列等待喚醒重試。當(dāng)吞吐量和低延遲不如CPU資源重要時(shí)使用。 |
| YieldingWaitStrategy(常用) | 嘗試100次,全失敗后調(diào)用Thread.yield()讓出CPU。該策略將使用100%的CPU,如果其他線程請(qǐng)求CPU資源,這種策略更容易讓出CPU資源。 |
| SleepingWaitStrategy(常用) | 嘗試200次 。前100次直接重試,后100次每次失敗后調(diào)用Thread.yield()讓出CPU,全失敗線程睡眠(默認(rèn)100納秒 )。 |
| BusySpinWaitStrategy | 線程一直自旋等待,比較耗CPU。最好是將線程綁定到特定的CPU核心上使用。 |
| LiteBlockingWaitStrategy | 與BlockingWaitStrategy類似,區(qū)別在增加了原子變量signalNeeded,如果兩個(gè)線程同時(shí)分別訪問waitFor()和signalAllWhenBlocking(),可以減少ReentrantLock加鎖次數(shù)。 |
| LiteTimeoutBlockingWaitStrategy | 與LiteBlockingWaitStrategy類似,區(qū)別在于設(shè)置了阻塞時(shí)間,超過時(shí)間后拋異常。 |
| TimeoutBlockingWaitStrategy | 與BlockingWaitStrategy類似,區(qū)別在于設(shè)置了阻塞時(shí)間,超過時(shí)間后拋異常。 |
| PhasedBackoffWaitStrategy | 根據(jù)時(shí)間參數(shù)和傳入的等待策略來決定使用哪種等待策略。當(dāng)吞吐量和低延遲不如CPU資源重要時(shí),可以使用此策略。 |
消費(fèi)者序列
所有消費(fèi)者的消費(fèi)序列(Sequence)都放在一個(gè)數(shù)組中,見com.lmax.disruptor.AbstractSequencer,通過SEQUENCE_UPDATER來更新對(duì)應(yīng)的序列值。

調(diào)用更新的地方在com.lmax.disruptor.RingBuffer.addGatingSequences(Sequence... gatingSequences)。
消費(fèi)太慢隊(duì)列滿了怎么辦?
生產(chǎn)者線程被阻塞。生產(chǎn)者調(diào)用Sequenced.next()爭(zhēng)奪寫入權(quán)的時(shí)候需要判斷最小的消費(fèi)序列值進(jìn)行比較。如果寫入的位置還未消費(fèi)則會(huì)進(jìn)入循環(huán)不斷獲取最小消費(fèi)序列值進(jìn)行比較。
見包c(diǎn)om.lmax.disruptor下SingleProducerSequencer或MultiProducerSequencer中next(int n)方法。

Disruptor開發(fā)步驟
- 創(chuàng)建Event、EventFactory、EventHandler和ExceptionHandler類
Event是環(huán)形隊(duì)列(RingBuffer)中的元素,是生產(chǎn)者數(shù)據(jù)的載體;EventFactory是定義Event創(chuàng)建方式的工廠類;EventHandler則是Event的處理器,定義如何消費(fèi)Event中的數(shù)據(jù)。
另外有必要定義一個(gè)消費(fèi)異常處理器ExceptionHandler,它是和EventHandler綁定的。當(dāng)EventHandler.onEvent()執(zhí)行拋出異常時(shí)會(huì)執(zhí)行對(duì)應(yīng)的異?;卣{(diào)方法。
- 實(shí)例化Disruptor
創(chuàng)建Disruptor需要指定5個(gè)參數(shù)eventFactory、ringBufferSize、threadFactory、producerType、waitStrategy。
EventFactory是上面定義的Event工廠類;
ringBufferSize是環(huán)形隊(duì)列的長(zhǎng)度,這個(gè)值要是2的N次方;
threadFactory是定義消費(fèi)者線程創(chuàng)建方式的工廠類;
producerType是指明生產(chǎn)者是一個(gè)(SINGLE)還是多個(gè)(MULTI)。默認(rèn)是MULTI,會(huì)使用CAS(Compare And Swap/Set)保證線程安全。如果指定為SINGLE,則不使用沒必要的CAS,使單線程處理更高效。
waitStrategy指明消費(fèi)者等待生產(chǎn)時(shí)的策略。
- 設(shè)置消費(fèi)者
指明EventHandler并綁定ExceptionHandler。指定多個(gè)EventHandler時(shí),會(huì)為每個(gè)EventHandler分配一個(gè)線程,一個(gè)Event會(huì)被多個(gè)并行EventHandler處理。
也可以指明多個(gè)WorkHandler,每個(gè)WorkHandler分配一個(gè)線程并行消費(fèi)隊(duì)列中的Event,一個(gè)Event只會(huì)被一個(gè)WorkHandler處理。
- 創(chuàng)建/實(shí)例化EventTranslator
EventTranslator定義生產(chǎn)者數(shù)據(jù)轉(zhuǎn)換為Event的方式,不同數(shù)量參數(shù)有不同的接口用來實(shí)現(xiàn)。
- 最后用Disruptor.publishEvent() 來發(fā)布元素指明EventTranslator和參數(shù)
例子程序
- 先引入Maven依賴
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>
- Event
/**
* 事件
*
* @param <T>發(fā)布的數(shù)據(jù)類型
*/
public class MyEvent<T> {
private T data;
public T getData() {
return data;
}
public MyEvent<T> setData(T data) {
this.data = data;
return this;
}
}
- EventFactory
import com.lmax.disruptor.EventFactory;
/**
* 創(chuàng)建事件的工廠
*
* @param <T>發(fā)布的數(shù)據(jù)類型
*/
public class MyEventFactory<T> implements EventFactory<MyEvent<T>> {
@Override
public MyEvent<T> newInstance() {
return new MyEvent<>();
}
}
- EventHandler
import com.lmax.disruptor.EventHandler;
/**
* 事件消費(fèi)方法
*
* @param <T>發(fā)布的數(shù)據(jù)類型
*/
public class MyEventHandler<T> implements EventHandler<MyEvent<T>> {
@Override
public void onEvent(MyEvent<T> tMyEvent, long l, boolean b) throws Exception {
System.out.println(Thread.currentThread().getName() + "MyEventHandler消費(fèi):" + tMyEvent.getData());
}
}
- ExceptionHandler
import com.lmax.disruptor.ExceptionHandler;
/**
* 消費(fèi)者異常處理器
*
* @param <T>發(fā)布的數(shù)據(jù)類型
*/
public class MyExceptionHandler<T> implements ExceptionHandler<MyEvent<T>> {
@Override
public void handleEventException(Throwable ex, long sequence, MyEvent<T> event) {
System.out.println("handleEventException");
}
@Override
public void handleOnStartException(Throwable ex) {
System.out.println("handleOnStartException");
}
@Override
public void handleOnShutdownException(Throwable ex) {
System.out.println("handleOnShutdownException");
}
}
單消費(fèi)者
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
/**
* 單消費(fèi)者
*/
public class SingleConsumerSample {
public static void main(String[] args) {
// 環(huán)形數(shù)組長(zhǎng)度,必須是2的n次冪
int ringBufferSize = 1024;
// 創(chuàng)建事件(Event)對(duì)象的工廠
MyEventFactory<String> eventFactory = new MyEventFactory<>();
// 創(chuàng)建消費(fèi)者線程工廠
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 等待策略
WaitStrategy waitStrategy = new SleepingWaitStrategy();
Disruptor<MyEvent<String>> disruptor =
new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);
// 指定一個(gè)處理器
MyEventHandler<String> eventHandler = new MyEventHandler<>();
disruptor.handleEventsWith(eventHandler);
// 處理器異常處理器
ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
disruptor.setDefaultExceptionHandler(exceptionHandler);
disruptor.start();
// 通過事件轉(zhuǎn)換器(EventTranslator)來指明如何將發(fā)布的數(shù)據(jù)轉(zhuǎn)換到事件對(duì)象(Event)中
// 這里是一個(gè)參數(shù)的轉(zhuǎn)換器,另外還有兩個(gè)(EventTranslatorTwoArg)、三個(gè)(EventTranslatorThreeArg)
// 和多個(gè)(EventTranslatorVararg)參數(shù)的轉(zhuǎn)換器可以使用,參數(shù)類型可以不一樣
EventTranslatorOneArg<MyEvent<String>, String> eventTranslatorOneArg =
new EventTranslatorOneArg<MyEvent<String>, String>() {
@Override
public void translateTo(MyEvent<String> event, long sequence, String arg0) {
event.setData(arg0);
}
};
// 發(fā)布
for (int i = 0; i < 10; i++) {
disruptor.publishEvent(eventTranslatorOneArg, "One arg " + i);
}
disruptor.shutdown();
}
}
單消費(fèi)者Lambda寫法
這種只是迎合Java8 Lambda語法特性,代碼更簡(jiǎn)潔。
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
public class LambdaSample {
public static void main(String[] args) {
// 環(huán)形數(shù)組長(zhǎng)度,必須是2的n次冪
int ringBufferSize = 1024;
// 創(chuàng)建消費(fèi)者線程工廠
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 等待策略
WaitStrategy waitStrategy = new SleepingWaitStrategy();
Disruptor<MyEvent<String>> disruptor =
new Disruptor<>(MyEvent::new, ringBufferSize, threadFactory, SINGLE, waitStrategy);
// 指定一個(gè)處理器
EventHandler<MyEvent<String>> eventHandler = (event, sequence, endOfBatch) ->
System.out.println(Thread.currentThread().getName() + "MyEventHandler消費(fèi):" + event.getData());
disruptor.handleEventsWith(eventHandler);
// 處理器異常處理器
ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
disruptor.setDefaultExceptionHandler(exceptionHandler);
disruptor.start();
// 通過事件轉(zhuǎn)換器(EventTranslator)來指明如何將發(fā)布的數(shù)據(jù)轉(zhuǎn)換到事件對(duì)象(Event)中
// 一個(gè)參數(shù)的轉(zhuǎn)換器
disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg ");
// 兩個(gè)參數(shù)的轉(zhuǎn)換器
disruptor.publishEvent((event, sequence, pA, pB) -> event.setData(pA + pB), "Two arg ", 1);
// 三個(gè)參數(shù)的轉(zhuǎn)換器
disruptor.publishEvent((event, sequence, pA, pB, pC) -> event.setData(pA + pB + pC)
, "Three arg ", 1, false);
// 多個(gè)參數(shù)的轉(zhuǎn)換器
disruptor.getRingBuffer().publishEvent((event, sequence, params) -> {
List<String> paramList = Arrays.stream(params).map(Object::toString).collect(Collectors.toList());
event.setData("Var arg " + String.join(",", paramList));
}, "param1", "param2", "param3");
disruptor.shutdown();
}
}
多消費(fèi)者重復(fù)消費(fèi)元素
關(guān)鍵只在于指定多個(gè)EventHandler,并且EventHandler還可以分別綁定不同的ExceptionHandler。
每個(gè)EventHandler分配一個(gè)線程,一個(gè)Event會(huì)被每個(gè)EventHandler處理,適合兩個(gè)不同的業(yè)務(wù)都需要處理同一個(gè)元素的情況,類似廣播模式。
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
/**
* 一個(gè)元素多個(gè)消費(fèi)者重復(fù)消費(fèi)
*/
public class RepetitionConsumerSample {
public static void main(String[] args) {
// 環(huán)形數(shù)組長(zhǎng)度,必須是2的n次冪
int ringBufferSize = 1024;
// 創(chuàng)建事件(Event)對(duì)象的工廠
MyEventFactory<String> eventFactory = new MyEventFactory<>();
// 創(chuàng)建消費(fèi)者線程工廠
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 等待策略
WaitStrategy waitStrategy = new SleepingWaitStrategy();
Disruptor<MyEvent<String>> disruptor =
new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);
// 這里指定了2個(gè)消費(fèi)者,那就會(huì)產(chǎn)生2個(gè)消費(fèi)線程,一個(gè)事件會(huì)被消費(fèi)2次
EventHandler<MyEvent<String>> eventHandler = (event, sequence, endOfBatch) ->
System.out.println(Thread.currentThread().getName() + "MyEventHandler消費(fèi):" + event.getData());
EventHandler<MyEvent<String>> eventHandler2 = (event, sequence, endOfBatch) ->
System.out.println(Thread.currentThread().getName() + "MyEventHandler——2消費(fèi):" + event.getData());
disruptor.handleEventsWith(eventHandler, eventHandler2);
// 分別指定異常處理器
ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
disruptor.handleExceptionsFor(eventHandler).with(exceptionHandler);
disruptor.handleExceptionsFor(eventHandler2).with(exceptionHandler);
disruptor.start();
for (int i = 0; i < 10; i++) {
disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg " + i);
}
disruptor.shutdown();
}
}
多消費(fèi)者
關(guān)鍵只在于定義WorkHandler,然后實(shí)例化多個(gè)來消費(fèi)。
每個(gè)WorkHandler分配一個(gè)線程,一個(gè)元素只會(huì)被一個(gè)WorkHandler處理。
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
public class MultiConsumerSample {
public static void main(String[] args) {
// 環(huán)形數(shù)組長(zhǎng)度,必須是2的n次冪
int ringBufferSize = 1024;
// 創(chuàng)建事件(Event)對(duì)象的工廠
MyEventFactory<String> eventFactory = new MyEventFactory<>();
// 創(chuàng)建消費(fèi)者線程工廠
ThreadFactory threadFactory = Executors.defaultThreadFactory();
// 等待策略
WaitStrategy waitStrategy = new SleepingWaitStrategy();
Disruptor<MyEvent<String>> disruptor =
new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);
// 處理器異常處理器
ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
disruptor.setDefaultExceptionHandler(exceptionHandler);
// 設(shè)置2個(gè)消費(fèi)者,2個(gè)線程,一個(gè)Event只被一個(gè)消費(fèi)者消費(fèi)
WorkHandler<MyEvent<String>> workHandler = tMyEvent ->
System.out.println(Thread.currentThread().getName() + "WorkHandler消費(fèi):" + tMyEvent.getData());
disruptor.handleEventsWithWorkerPool(workHandler, workHandler2);
disruptor.start();
for (int i = 0; i < 10; i++) {
disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg " + i);
}
disruptor.shutdown();
}
}
參考鏈接
GitHub Disruptor Getting Started