單機(jī)最快的隊(duì)列Disruptor解析和使用

前言

介紹高性能隊(duì)列Disruptor原理以及使用例子。

Disruptor是什么?

Disruptor是外匯和加密貨幣交易所運(yùn)營(yíng)商 LMAX group 建立高性能的金融交易所的結(jié)果。用于解決生產(chǎn)者、消費(fèi)者及其數(shù)據(jù)存儲(chǔ)的設(shè)計(jì)問題的高性能隊(duì)列實(shí)現(xiàn)??梢詫?duì)標(biāo)JDK中的ArrayBlockingQueue。是目前單機(jī)且基于內(nèi)存存儲(chǔ)的最高性能的隊(duì)列實(shí)現(xiàn)。見 與ArrayBlockingQueue性能對(duì)比

Disruptor高性能秘訣

使用CAS代替鎖

鎖非常昂貴,因?yàn)樗鼈冊(cè)诟?jìng)爭(zhēng)時(shí)需要仲裁。這種仲裁是通過到操作系統(tǒng)內(nèi)核的上下文切換來實(shí)現(xiàn)的,該內(nèi)核將掛起等待鎖的線程,直到它被釋放。系統(tǒng)提供的原子操作CAS(Compare And Swap/Set)是很好的鎖替代方案,Disruptor中同步就是使用的這種。

比如多生產(chǎn)者模式中com.lmax.disruptor.MultiProducerSequencer就是用了Java里sun.misc.Unsafe類基于CAS實(shí)現(xiàn)的API。

image.png

等待策略com.lmax.disruptor.BlockingWaitStrategy使用了基于CAS實(shí)現(xiàn)的ReentrantLock。

image.png

獨(dú)占緩存行

為了提高效率CPU硬件不會(huì)以字節(jié)或字為單位移動(dòng)內(nèi)存,而是以緩存行,通常大小為 32-256 字節(jié)的緩存行,最常見的緩存行是 64 字節(jié)。這意味著,如果兩個(gè)變量在同一個(gè)緩存行中,并且由不同的線程寫入,那么它們會(huì)出現(xiàn)與單個(gè)變量相同的寫入爭(zhēng)用問題。為了獲得高性能,如果要最小化爭(zhēng)用,那么確保獨(dú)立但同時(shí)寫入的變量不共享相同的緩存行是很重要的。

比如com.lmax.disruptor.RingBuffer中屬性前后都用未賦值的long來獨(dú)占。com.lmax.disruptor.SingleProducerSequencerPad也有相同處理方式。

image.png
image.png

環(huán)形隊(duì)列

  • 使用有界隊(duì)列,減少線程爭(zhēng)用

隊(duì)列相比鏈表在訪問速度上占據(jù)優(yōu)勢(shì),而有界隊(duì)列相比可動(dòng)態(tài)擴(kuò)容的無界隊(duì)列則避免擴(kuò)容產(chǎn)生的同步問題效率更高。Disruptor和JDK中的ArrayBlockingQueue一樣使用有界隊(duì)列。隊(duì)列長(zhǎng)度要設(shè)為2的n次冪,有利于二進(jìn)制計(jì)算。

  • 使用環(huán)形數(shù)組,避免生產(chǎn)和消費(fèi)速度差異導(dǎo)致隊(duì)列頭和尾爭(zhēng)用

Disruptor在邏輯上將數(shù)組的的頭尾看成是相連的,即一個(gè)環(huán)形數(shù)組(RingBuffer)。

  • Sequence

生產(chǎn)和消費(fèi)都需要維護(hù)自增序列值(Sequence),從0開始。

生產(chǎn)方只維護(hù)一個(gè)代表生產(chǎn)的最后一個(gè)元素的序號(hào)。代表生產(chǎn)的最后一個(gè)元素的序號(hào)。每次向Disruptor發(fā)布一個(gè)元素都調(diào)用Sequenced.next()來獲取下個(gè)位置的寫入權(quán)。

在單生產(chǎn)者模式(SINGLE)由于不存在并發(fā)寫入,則不需要解決同步問題。在多生產(chǎn)者模式(MULTI)就需要借助JDK中基于CAS(Compare And Swap/Set)實(shí)現(xiàn)的API來保證線程安全。

多個(gè)消費(fèi)者各自維護(hù)自己的消費(fèi)序列值(Sequence)保存數(shù)組中。

而環(huán)形通過與運(yùn)算(sequence & indexMask)實(shí)現(xiàn)的,indexMask就是環(huán)形隊(duì)列的長(zhǎng)度-1。以環(huán)形隊(duì)列長(zhǎng)度8為例,第9個(gè)元素Sequence為8,8 & 7 = 0,剛好又回到了數(shù)組第1個(gè)位置。

見com.lmax.disruptor.RingBuffer.elementAt(long sequence)

image.png

預(yù)分配內(nèi)存

環(huán)形隊(duì)列存放的是Event對(duì)象,而且是在Disruptor創(chuàng)建的時(shí)候調(diào)用EventFactory創(chuàng)建并一次將隊(duì)列填滿。Event保存生產(chǎn)者生產(chǎn)的數(shù)據(jù),消費(fèi)也是通過Event獲取,后續(xù)生產(chǎn)則只需要替換掉Event中的屬性值。這種方式避免了重復(fù)創(chuàng)建對(duì)象,降低JVM的GC產(chǎn)頻率。

見com.lmax.disruptor.RingBuffer.fill(EventFactory<E> eventFactory)

image.png

消費(fèi)者8種等待策略

當(dāng)消費(fèi)速度大于生產(chǎn)速度情況下,消費(fèi)者執(zhí)行的等待策略。

策略類名 描述
BlockingWaitStrategy(常用) 使用ReentrantLock,失敗則進(jìn)入等待隊(duì)列等待喚醒重試。當(dāng)吞吐量和低延遲不如CPU資源重要時(shí)使用。
YieldingWaitStrategy(常用) 嘗試100次,全失敗后調(diào)用Thread.yield()讓出CPU。該策略將使用100%的CPU,如果其他線程請(qǐng)求CPU資源,這種策略更容易讓出CPU資源。
SleepingWaitStrategy(常用) 嘗試200次 。前100次直接重試,后100次每次失敗后調(diào)用Thread.yield()讓出CPU,全失敗線程睡眠(默認(rèn)100納秒 )。
BusySpinWaitStrategy 線程一直自旋等待,比較耗CPU。最好是將線程綁定到特定的CPU核心上使用。
LiteBlockingWaitStrategy 與BlockingWaitStrategy類似,區(qū)別在增加了原子變量signalNeeded,如果兩個(gè)線程同時(shí)分別訪問waitFor()和signalAllWhenBlocking(),可以減少ReentrantLock加鎖次數(shù)。
LiteTimeoutBlockingWaitStrategy 與LiteBlockingWaitStrategy類似,區(qū)別在于設(shè)置了阻塞時(shí)間,超過時(shí)間后拋異常。
TimeoutBlockingWaitStrategy 與BlockingWaitStrategy類似,區(qū)別在于設(shè)置了阻塞時(shí)間,超過時(shí)間后拋異常。
PhasedBackoffWaitStrategy 根據(jù)時(shí)間參數(shù)和傳入的等待策略來決定使用哪種等待策略。當(dāng)吞吐量和低延遲不如CPU資源重要時(shí),可以使用此策略。

消費(fèi)者序列

所有消費(fèi)者的消費(fèi)序列(Sequence)都放在一個(gè)數(shù)組中,見com.lmax.disruptor.AbstractSequencer,通過SEQUENCE_UPDATER來更新對(duì)應(yīng)的序列值。

image.png

調(diào)用更新的地方在com.lmax.disruptor.RingBuffer.addGatingSequences(Sequence... gatingSequences)。

消費(fèi)太慢隊(duì)列滿了怎么辦?

生產(chǎn)者線程被阻塞。生產(chǎn)者調(diào)用Sequenced.next()爭(zhēng)奪寫入權(quán)的時(shí)候需要判斷最小的消費(fèi)序列值進(jìn)行比較。如果寫入的位置還未消費(fèi)則會(huì)進(jìn)入循環(huán)不斷獲取最小消費(fèi)序列值進(jìn)行比較。

見包c(diǎn)om.lmax.disruptor下SingleProducerSequencer或MultiProducerSequencer中next(int n)方法。

image.png

Disruptor開發(fā)步驟

  • 創(chuàng)建Event、EventFactory、EventHandler和ExceptionHandler類

Event是環(huán)形隊(duì)列(RingBuffer)中的元素,是生產(chǎn)者數(shù)據(jù)的載體;EventFactory是定義Event創(chuàng)建方式的工廠類;EventHandler則是Event的處理器,定義如何消費(fèi)Event中的數(shù)據(jù)。

另外有必要定義一個(gè)消費(fèi)異常處理器ExceptionHandler,它是和EventHandler綁定的。當(dāng)EventHandler.onEvent()執(zhí)行拋出異常時(shí)會(huì)執(zhí)行對(duì)應(yīng)的異?;卣{(diào)方法。

  • 實(shí)例化Disruptor

創(chuàng)建Disruptor需要指定5個(gè)參數(shù)eventFactory、ringBufferSize、threadFactory、producerType、waitStrategy。

EventFactory是上面定義的Event工廠類;

ringBufferSize是環(huán)形隊(duì)列的長(zhǎng)度,這個(gè)值要是2的N次方;

threadFactory是定義消費(fèi)者線程創(chuàng)建方式的工廠類;

producerType是指明生產(chǎn)者是一個(gè)(SINGLE)還是多個(gè)(MULTI)。默認(rèn)是MULTI,會(huì)使用CAS(Compare And Swap/Set)保證線程安全。如果指定為SINGLE,則不使用沒必要的CAS,使單線程處理更高效。

waitStrategy指明消費(fèi)者等待生產(chǎn)時(shí)的策略。

  • 設(shè)置消費(fèi)者

指明EventHandler并綁定ExceptionHandler。指定多個(gè)EventHandler時(shí),會(huì)為每個(gè)EventHandler分配一個(gè)線程,一個(gè)Event會(huì)被多個(gè)并行EventHandler處理。

也可以指明多個(gè)WorkHandler,每個(gè)WorkHandler分配一個(gè)線程并行消費(fèi)隊(duì)列中的Event,一個(gè)Event只會(huì)被一個(gè)WorkHandler處理。

  • 創(chuàng)建/實(shí)例化EventTranslator

EventTranslator定義生產(chǎn)者數(shù)據(jù)轉(zhuǎn)換為Event的方式,不同數(shù)量參數(shù)有不同的接口用來實(shí)現(xiàn)。

  • 最后用Disruptor.publishEvent() 來發(fā)布元素指明EventTranslator和參數(shù)

例子程序

  • 先引入Maven依賴
<dependency>
  <groupId>com.lmax</groupId>
  <artifactId>disruptor</artifactId>
  <version>3.4.4</version>
</dependency>
  • Event
/**
 * 事件
 *
 * @param <T>發(fā)布的數(shù)據(jù)類型
 */
public class MyEvent<T> {

    private T data;

    public T getData() {
        return data;
    }

    public MyEvent<T> setData(T data) {
        this.data = data;
        return this;
    }
}
  • EventFactory
import com.lmax.disruptor.EventFactory;

/**
 * 創(chuàng)建事件的工廠
 *
 * @param <T>發(fā)布的數(shù)據(jù)類型
 */
public class MyEventFactory<T> implements EventFactory<MyEvent<T>> {

    @Override
    public MyEvent<T> newInstance() {
        return new MyEvent<>();
    }
}
  • EventHandler
import com.lmax.disruptor.EventHandler;

/**
 * 事件消費(fèi)方法
 *
 * @param <T>發(fā)布的數(shù)據(jù)類型
 */
public class MyEventHandler<T> implements EventHandler<MyEvent<T>> {

    @Override
    public void onEvent(MyEvent<T> tMyEvent, long l, boolean b) throws Exception {
        System.out.println(Thread.currentThread().getName() + "MyEventHandler消費(fèi):" + tMyEvent.getData());
    }
}
  • ExceptionHandler
import com.lmax.disruptor.ExceptionHandler;

/**
 * 消費(fèi)者異常處理器
 *
 * @param <T>發(fā)布的數(shù)據(jù)類型
 */
public class MyExceptionHandler<T> implements ExceptionHandler<MyEvent<T>> {

    @Override
    public void handleEventException(Throwable ex, long sequence, MyEvent<T> event) {
        System.out.println("handleEventException");
    }

    @Override
    public void handleOnStartException(Throwable ex) {
        System.out.println("handleOnStartException");
    }

    @Override
    public void handleOnShutdownException(Throwable ex) {
        System.out.println("handleOnShutdownException");
    }
}

單消費(fèi)者

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

/**
 * 單消費(fèi)者
 */
public class SingleConsumerSample {

    public static void main(String[] args) {
        // 環(huán)形數(shù)組長(zhǎng)度,必須是2的n次冪
        int ringBufferSize = 1024;
        // 創(chuàng)建事件(Event)對(duì)象的工廠
        MyEventFactory<String> eventFactory = new MyEventFactory<>();
        // 創(chuàng)建消費(fèi)者線程工廠
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);

        // 指定一個(gè)處理器
        MyEventHandler<String> eventHandler = new MyEventHandler<>();
        disruptor.handleEventsWith(eventHandler);
        // 處理器異常處理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
        disruptor.setDefaultExceptionHandler(exceptionHandler);

        disruptor.start();

        // 通過事件轉(zhuǎn)換器(EventTranslator)來指明如何將發(fā)布的數(shù)據(jù)轉(zhuǎn)換到事件對(duì)象(Event)中
        // 這里是一個(gè)參數(shù)的轉(zhuǎn)換器,另外還有兩個(gè)(EventTranslatorTwoArg)、三個(gè)(EventTranslatorThreeArg)
        // 和多個(gè)(EventTranslatorVararg)參數(shù)的轉(zhuǎn)換器可以使用,參數(shù)類型可以不一樣
        EventTranslatorOneArg<MyEvent<String>, String> eventTranslatorOneArg =
                new EventTranslatorOneArg<MyEvent<String>, String>() {
                    @Override
                    public void translateTo(MyEvent<String> event, long sequence, String arg0) {
                        event.setData(arg0);
                    }
                };

        // 發(fā)布
        for (int i = 0; i < 10; i++) {
            disruptor.publishEvent(eventTranslatorOneArg, "One arg " + i);
        }

        disruptor.shutdown();
    }
}

單消費(fèi)者Lambda寫法

這種只是迎合Java8 Lambda語法特性,代碼更簡(jiǎn)潔。

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

public class LambdaSample {


    public static void main(String[] args) {
        // 環(huán)形數(shù)組長(zhǎng)度,必須是2的n次冪
        int ringBufferSize = 1024;
        // 創(chuàng)建消費(fèi)者線程工廠
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(MyEvent::new, ringBufferSize, threadFactory, SINGLE, waitStrategy);

        // 指定一個(gè)處理器
        EventHandler<MyEvent<String>> eventHandler = (event, sequence, endOfBatch) ->
                System.out.println(Thread.currentThread().getName() + "MyEventHandler消費(fèi):" + event.getData());
        disruptor.handleEventsWith(eventHandler);
        // 處理器異常處理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
        disruptor.setDefaultExceptionHandler(exceptionHandler);

        disruptor.start();

        // 通過事件轉(zhuǎn)換器(EventTranslator)來指明如何將發(fā)布的數(shù)據(jù)轉(zhuǎn)換到事件對(duì)象(Event)中
        // 一個(gè)參數(shù)的轉(zhuǎn)換器
        disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg ");
        // 兩個(gè)參數(shù)的轉(zhuǎn)換器
        disruptor.publishEvent((event, sequence, pA, pB) -> event.setData(pA + pB), "Two arg ", 1);
        // 三個(gè)參數(shù)的轉(zhuǎn)換器
        disruptor.publishEvent((event, sequence, pA, pB, pC) -> event.setData(pA + pB + pC)
                , "Three arg ", 1, false);
        // 多個(gè)參數(shù)的轉(zhuǎn)換器
        disruptor.getRingBuffer().publishEvent((event, sequence, params) -> {
            List<String> paramList = Arrays.stream(params).map(Object::toString).collect(Collectors.toList());
            event.setData("Var arg " + String.join(",", paramList));
        }, "param1", "param2", "param3");

        disruptor.shutdown();
    }
}

多消費(fèi)者重復(fù)消費(fèi)元素

關(guān)鍵只在于指定多個(gè)EventHandler,并且EventHandler還可以分別綁定不同的ExceptionHandler。

每個(gè)EventHandler分配一個(gè)線程,一個(gè)Event會(huì)被每個(gè)EventHandler處理,適合兩個(gè)不同的業(yè)務(wù)都需要處理同一個(gè)元素的情況,類似廣播模式。

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

/**
 * 一個(gè)元素多個(gè)消費(fèi)者重復(fù)消費(fèi)
 */
public class RepetitionConsumerSample {

    public static void main(String[] args) {
        // 環(huán)形數(shù)組長(zhǎng)度,必須是2的n次冪
        int ringBufferSize = 1024;
        // 創(chuàng)建事件(Event)對(duì)象的工廠
        MyEventFactory<String> eventFactory = new MyEventFactory<>();
        // 創(chuàng)建消費(fèi)者線程工廠
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);


        // 這里指定了2個(gè)消費(fèi)者,那就會(huì)產(chǎn)生2個(gè)消費(fèi)線程,一個(gè)事件會(huì)被消費(fèi)2次
        EventHandler<MyEvent<String>> eventHandler = (event, sequence, endOfBatch) ->
                System.out.println(Thread.currentThread().getName() + "MyEventHandler消費(fèi):" + event.getData());
        EventHandler<MyEvent<String>> eventHandler2 = (event, sequence, endOfBatch) ->
                System.out.println(Thread.currentThread().getName() + "MyEventHandler——2消費(fèi):" + event.getData());
        disruptor.handleEventsWith(eventHandler, eventHandler2);
        // 分別指定異常處理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
        disruptor.handleExceptionsFor(eventHandler).with(exceptionHandler);
        disruptor.handleExceptionsFor(eventHandler2).with(exceptionHandler);

        disruptor.start();

        for (int i = 0; i < 10; i++) {
            disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg " + i);
        }

        disruptor.shutdown();
    }
}

多消費(fèi)者

關(guān)鍵只在于定義WorkHandler,然后實(shí)例化多個(gè)來消費(fèi)。

每個(gè)WorkHandler分配一個(gè)線程,一個(gè)元素只會(huì)被一個(gè)WorkHandler處理。

import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

import static com.lmax.disruptor.dsl.ProducerType.SINGLE;

public class MultiConsumerSample {

    public static void main(String[] args) {
        // 環(huán)形數(shù)組長(zhǎng)度,必須是2的n次冪
        int ringBufferSize = 1024;
        // 創(chuàng)建事件(Event)對(duì)象的工廠
        MyEventFactory<String> eventFactory = new MyEventFactory<>();
        // 創(chuàng)建消費(fèi)者線程工廠
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        // 等待策略
        WaitStrategy waitStrategy = new SleepingWaitStrategy();
        Disruptor<MyEvent<String>> disruptor =
                new Disruptor<>(eventFactory, ringBufferSize, threadFactory, SINGLE, waitStrategy);

        // 處理器異常處理器
        ExceptionHandler<MyEvent<String>> exceptionHandler = new MyExceptionHandler<>();
        disruptor.setDefaultExceptionHandler(exceptionHandler);

        // 設(shè)置2個(gè)消費(fèi)者,2個(gè)線程,一個(gè)Event只被一個(gè)消費(fèi)者消費(fèi)
        WorkHandler<MyEvent<String>> workHandler = tMyEvent ->
                System.out.println(Thread.currentThread().getName() + "WorkHandler消費(fèi):" + tMyEvent.getData());
        disruptor.handleEventsWithWorkerPool(workHandler, workHandler2);

        disruptor.start();

        for (int i = 0; i < 10; i++) {
            disruptor.publishEvent((event, sequence, param) -> event.setData(param), "One arg " + i);
        }

        disruptor.shutdown();
    }
}

參考鏈接

Disruptor 主頁(yè)

Disruptor 技術(shù)文檔

GitHub Disruptor

GitHub Disruptor Getting Started

Maven Repository Disruptor Framework

LMAX 官網(wǎng)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容