前言
Disruptor是英國(guó)外匯交易公司LMAX開發(fā)的一個(gè)高性能隊(duì)列,研發(fā)的初衷是解決內(nèi)存隊(duì)列的延遲問(wèn)題?;贒isruptor開發(fā)的系統(tǒng)單線程能支撐每秒600萬(wàn)訂單,2010年在QCon演講后,獲得了業(yè)界關(guān)注。
其實(shí)Disruptor與其說(shuō)是一個(gè)框架,不如說(shuō)是一種設(shè)計(jì)思路,這個(gè)設(shè)計(jì)思路對(duì)于存在“并發(fā)、緩沖區(qū)、生產(chǎn)者—消費(fèi)者模型、事務(wù)處理”這些元素的程序來(lái)說(shuō),Disruptor提出了一種大幅提升性能(TPS)的方案。
老司機(jī)帶帶我
-
來(lái)一張全鏈路流程圖
disruptor設(shè)計(jì)理念很超前,解決了傳統(tǒng)隊(duì)列的痛點(diǎn)
1、false-sharding:CPU偽共享問(wèn)題
2、無(wú)鎖編程的極致體驗(yàn)-CAS
3、兩個(gè)獨(dú)立的線程之間高效交換數(shù)據(jù)
一、鎖的代價(jià)
Disruptor論文中講述一個(gè)實(shí)驗(yàn),一個(gè)計(jì)數(shù)器循環(huán)自增5億次
- 場(chǎng)景1:?jiǎn)尉€程無(wú)鎖時(shí),程序耗時(shí)300ms
- 場(chǎng)景2:?jiǎn)尉€程有鎖,程序需要耗時(shí)10000ms
- 場(chǎng)景3:雙線程有鎖,耗時(shí)224000ms
-
.........
Why?
簡(jiǎn)而言之就是多線程鎖競(jìng)爭(zhēng)導(dǎo)致的上下文切換時(shí)間成本遠(yuǎn)遠(yuǎn)大于了線程持有鎖的性能損耗
- ArrayBlockQueue偽代碼分析
public void put(E e) {
final ReentrantLock lock = this.lock;
//加鎖
lock.lock();
//當(dāng)隊(duì)列滿時(shí),調(diào)用notFull.await()方法,阻塞寫線程。
while (count == items.length) {
notFull.await(); //Condition條件阻塞
}
//把元素 e 插入到隊(duì)尾
insert(e);
//解鎖
lock.unlock();
//若隊(duì)列為空,激活讀線程
notEmpty.signal();
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//加鎖
lock.lock();
//當(dāng)隊(duì)列空時(shí),阻塞讀線程
while (count == 0) {
notEmpty.await();
}
//取出隊(duì)頭元素
E x = extract();
//若隊(duì)列full,激活寫線程
notFull.signal();
return x;
//解鎖
finally{
lock.unlock();
}
}

- tail(takeIndex)和head(putIndex)指針都是鎖競(jìng)爭(zhēng)的沖突點(diǎn)
隊(duì)列的目的就是為生產(chǎn)者和消費(fèi)者提供一個(gè)地方存放要交互的數(shù)據(jù),緩沖上下游的消息,實(shí)際場(chǎng)景中緩沖常常是滿的(生產(chǎn)者比消費(fèi)者快)或者空的(消費(fèi)者比生產(chǎn)者快)。生產(chǎn)者和消費(fèi)者能夠步調(diào)一致的情況非常少見。 - ArrayBlockQueue是悲觀鎖的一種體現(xiàn),讀寫線程都假設(shè)存在沖突, 多線程并發(fā)場(chǎng)景下,性能很差
- 接下來(lái)讓我們看看Disruptor的實(shí)現(xiàn)
disruptor根本就不用鎖,取而代之-CAS,嚴(yán)格意義上說(shuō)仍然是使用鎖, 因?yàn)镃AS本質(zhì)上也是一種樂觀鎖 - Java悲觀鎖和樂觀鎖邏輯上類似Mysql的鎖
CAS:Compare And Swap/Set 顧名思義比較和交換
CPU級(jí)別的指令,cpu去更新一個(gè)值,但如果跟新過(guò)程中值發(fā)生了變化,操作就失敗,然后重試,直到更新成功!
Disruptor的sequence的自增就是CAS的自旋自增,對(duì)應(yīng)的,ArrayBlockQueue的數(shù)組索引index是互斥自增!
-
樂觀鎖設(shè)計(jì)思想:假設(shè)沒有沖突
CAS原理圖 -
悲觀鎖設(shè)計(jì)思想:假設(shè)存在沖突
悲觀鎖 CAS 比較適宜持有鎖的時(shí)間較短的并發(fā)場(chǎng)景(自增、簡(jiǎn)單更新),
反之持有鎖時(shí)間較長(zhǎng)的場(chǎng)景如秒殺,下單,會(huì)導(dǎo)致自循環(huán)次數(shù)過(guò)多,線程饑餓程度增加
二、disrupter核心數(shù)據(jù)結(jié)構(gòu)-ringbuffer
-
隊(duì)列上下游的緩沖容器
-
首尾相接的環(huán)形數(shù)組
數(shù)組長(zhǎng)度2^n,通過(guò)位運(yùn)算,加快定位的速度。下標(biāo)采取遞增的形式。不用擔(dān)心index溢出的問(wèn)題。index是long類型,即使100萬(wàn)QPS的處理速度,也需要30萬(wàn)年才能用完。
環(huán)持續(xù)向 buffer 中寫入數(shù)據(jù),這個(gè)序號(hào)會(huì)一直增長(zhǎng),直到繞過(guò)整個(gè)環(huán)
-
環(huán)形數(shù)組結(jié)構(gòu)
新產(chǎn)生的sequence只覆蓋,相對(duì)于傳統(tǒng)隊(duì)列不需要頻繁GC
private E dequeue() {
final Object[] items = this.items;
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;/當(dāng)前擁有元素個(gè)數(shù)減1
if (itrs != null)
itrs.elementDequeued();
notFull.signal();//有一個(gè)元素取出成功,那肯定隊(duì)列不滿
return x;
}
ArrayBloackQueue出隊(duì)takeIndex索引所在元素設(shè)置為NULL,高吞吐量下隊(duì)列會(huì)產(chǎn)生大量GC
- CAS維護(hù)了一個(gè)sequence,無(wú)鎖自旋增長(zhǎng)
每個(gè)生產(chǎn)者或者消費(fèi)者線程,會(huì)先申請(qǐng)可以操作的元素在數(shù)組中的位置,申請(qǐng)到之后,直接在該位置寫入或者讀取數(shù)據(jù)。
假設(shè)兩個(gè)生產(chǎn)者都想申請(qǐng)第7號(hào)slot, 則它們會(huì)同時(shí)執(zhí)行CAS自增,執(zhí)行成功的人得到該序列號(hào)slot=7,另一個(gè)則重試?yán)^續(xù)申請(qǐng)下一個(gè)可用的slot=8,之后根據(jù)mod/size去環(huán)形數(shù)組中尋找自己的位置。
消費(fèi)者處理邏輯類似。
三、解決沖突—揭秘內(nèi)存屏障
-
關(guān)鍵字volatile:Java內(nèi)存模型將在寫操作后插入一個(gè)寫屏障指令,在讀操作前插入一個(gè)讀屏障指令
java內(nèi)存模型
RingBuffer的指針(cursor)屬于一個(gè)volatile變量,同時(shí)也是我們能夠不用鎖操作就能實(shí)現(xiàn)Disruptor的原因之一

生產(chǎn)者對(duì)RingBuffer更新序列號(hào),之后會(huì)對(duì)volatile字段(cursor)的寫操作創(chuàng)建了一個(gè)內(nèi)存屏障,這個(gè)屏障將刷新所有緩存里的值(緩存失效)
消費(fèi)者獲取RingBuffer序列號(hào),涉及到讀沖突的緩存失效,C2在C1之后,C2拿到C1更新過(guò)的序列號(hào)之后,C2才能獲取next序列號(hào)。內(nèi)存屏障保證了他們之前的執(zhí)行順序,消費(fèi)者總能獲取最新的序列號(hào)
- 內(nèi)存屏障作為另一個(gè)CPU級(jí)的指令,沒有鎖那樣大的開銷,volatile意味著你不用加鎖,就能讓獲得性能的提升
四、disruptor多線程并發(fā)讀寫過(guò)程淺析
讀寫并行簡(jiǎn)圖

多個(gè)生產(chǎn)者的情況下,會(huì)遇到“多個(gè)線程重復(fù)寫同一個(gè)元素”的問(wèn)題,解決方法是,每個(gè)線程獲取不同的一段數(shù)組空間進(jìn)行操作,這個(gè)通過(guò)CAS很容易達(dá)到。只需要在分配元素的時(shí)候,通過(guò)CAS無(wú)腦自增即可判斷。
Disruptor在多個(gè)生產(chǎn)者的情況下,引入了一個(gè)與Ring Buffer大小相同的buffer:AvailableBuffer。當(dāng)某個(gè)位置寫入成功的時(shí)候,便把Availble Buffer相應(yīng)的位置置位,標(biāo)記為寫入成功。讀取的時(shí)候,會(huì)遍歷available Buffer,來(lái)判斷元素是否已經(jīng)就緒。
消費(fèi)者保持一個(gè)自己的序列,每次累加后nextSequence,去獲取可訪問(wèn)的最大序列。對(duì)于一個(gè)生產(chǎn)者,就是nextSequence到RingBuffer當(dāng)前游標(biāo)的序列。對(duì)于多個(gè)生產(chǎn)者,就是nextSequence到RingBuffer當(dāng)前游標(biāo)之間,最大的連續(xù)的序列集。
消費(fèi)端部分源碼分析
public long waitFor(final long sequence){
checkAlert();
//獲取最大的可消費(fèi)的序列,依賴等待策略,策略設(shè)計(jì)模式的一種體現(xiàn)
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence) {
return availableSequence;
}
return sequencer.getHighestPublishedSequence(sequence,availableSequence);
}
1、讀寫不存在沖突:消費(fèi)者讀取到序號(hào) x 位置元素都被生產(chǎn)者寫入成功,消費(fèi)者消費(fèi)這一段區(qū)間數(shù)據(jù)。
2、讀寫存在沖突:消費(fèi)者讀取到序號(hào)x位置生產(chǎn)者正在寫入,也就是下圖availble Buffer中標(biāo)記為-1的位置,則消費(fèi)者返回該序號(hào)x,并執(zhí)行一段等待策略
- 常見的等待策略
BlockingWaitStrategy:通過(guò)線程阻塞的方式,等待生產(chǎn)者喚醒
BusySpinWaitStrategy:線程一直自旋等待,比較耗CPU
YieldingWaitStrategy: 自旋 + yield + 自旋(折中方案)
等等 -
多個(gè)生產(chǎn)者情況下,消費(fèi)者消費(fèi)過(guò)程示意圖
- disruptor寫線程源碼片段分析
do
{
current = cursor.get();
next = current + n;
if (!hasAvailableCapacity(gatingSequences, n, current))
{
throw InsufficientCapacityException.INSTANCE;
}
}
while (!cursor.compareAndSet(current, next));
//next 類比于ArrayBlockQueue的數(shù)組索引index
return next;
多線程環(huán)境下,多個(gè)生產(chǎn)者通過(guò)do/while循環(huán)的條件CAS,來(lái)判斷每次申請(qǐng)的空間是否已經(jīng)被其他生產(chǎn)者占據(jù)。假如已經(jīng)被占據(jù),該函數(shù)會(huì)返回失敗,While循環(huán)重新執(zhí)行,申請(qǐng)寫入空間。
-
多個(gè)生產(chǎn)者情況下,生產(chǎn)者生產(chǎn)過(guò)程示意圖
五、聊一聊緩存?zhèn)喂蚕?/h1>
計(jì)算機(jī)系統(tǒng)中為了解決主內(nèi)存與CPU運(yùn)行速度的差距,在CPU與主內(nèi)存之間添加(Cache)CPU硬件級(jí)別緩存系統(tǒng)中是以緩存行(cache line)為單位存儲(chǔ)的,當(dāng)多線程修改互相獨(dú)立的變量時(shí),如果這些變量共享同一個(gè)緩存行,就會(huì)無(wú)意中影響彼此的性能,這就是偽共享,多線程環(huán)境下會(huì)導(dǎo)致緩存命中率很低!
-
偽共享問(wèn)題的產(chǎn)生
如上圖變量x,y同時(shí)被放到了CPU的一級(jí)和二級(jí)緩存,當(dāng)線程1使用CPU1對(duì)變量x進(jìn)行更新時(shí)候,首先會(huì)修改cpu1的一級(jí)緩存變量x所在緩存行,這時(shí)候緩存一致性協(xié)議會(huì)導(dǎo)致cpu2中變量x對(duì)應(yīng)的緩存行失效,那么線程2寫入變量x的時(shí)候就只能去二級(jí)緩存去查找,這就破壞了一級(jí)緩存,而一級(jí)緩存比二級(jí)緩存更快。更壞的情況下如果cpu只有一級(jí)緩存,那么會(huì)導(dǎo)致頻繁的直接訪問(wèn)主內(nèi)存,增大系統(tǒng)開銷。
-
ArrayBlockingQueue有三個(gè)成員變量
takeIndex:出隊(duì)元素下標(biāo)
putIndex:入隊(duì)元素的下標(biāo)
count:隊(duì)列中元素的數(shù)量
這三個(gè)變量很容易放到一個(gè)緩存行中,但是之間修改沒有太多的關(guān)聯(lián)。所以每次修改,都會(huì)使之前緩存的數(shù)據(jù)失效,需要從主存中重新讀取,從而不能完全達(dá)到共享的效果。
ArrayBlockingQueue偽共享示意圖
- 連續(xù)內(nèi)存塊巧妙規(guī)避偽共享問(wèn)題
緩存行以64個(gè)字節(jié)為單位(,long 類型是 8 字節(jié),因此在一個(gè)緩存行中可以存 8 個(gè) long 類型的變量,如果你訪問(wèn)一個(gè) long 數(shù)組,當(dāng)數(shù)組中的一個(gè)值被加載到緩存中,它會(huì)額外加載另外 7 個(gè),以致你能非常快地遍歷這個(gè)數(shù)組(緩存共享的免費(fèi)福利)
Long[] arr = new Long[64 * 1024 * 1024];
long start = System.nanoTime();
for (int i = 0; i < arr.length; i++) {
arr[i] *= 3;
}
System.out.println(System.nanoTime() - start);
long start2 = System.nanoTime();
for (int i = 0; i < arr.length; i += 8) {
arr[i] *= 3;
}
System.out.println(System.nanoTime() - start2);
- 表面上看,循環(huán)二工作量為第循環(huán)一的1/8;但是執(zhí)行時(shí)間是相差不大的,因?yàn)槊?8個(gè)Long占用8*8=64字節(jié),正好一個(gè)cache,也就是說(shuō)這兩個(gè)循環(huán)訪問(wèn)內(nèi)存的次數(shù)是一致的,導(dǎo)致耗時(shí)相差不大(訪問(wèn)cache時(shí)間rt忽略不計(jì))
- disruptor怎么解決偽共享問(wèn)題
其中一個(gè)解決思路,就是讓不同線程操作的對(duì)象處于不同的緩存行即可即緩存行填充(Padding),使一個(gè)對(duì)象占用的內(nèi)存大小剛好為64bytes或它的整數(shù)倍,這樣就保證了一個(gè)緩存行里不會(huì)有多個(gè)對(duì)象,這其實(shí)是一種以空間換時(shí)間的方案。 - disruptor sequence偽代碼
//在序號(hào)實(shí)際value變量(long型)左邊填充7個(gè)long變量
class LhsPadding
{
protected long p1, p2, p3, p4, p5, p6, p7;
}
class Value extends LhsPadding
{
protected volatile long value;
}
//在序號(hào)實(shí)際value變量(long型)右邊填充7個(gè)long變量
class RhsPadding extends Value
{
protected long p9, p10, p11, p12, p13, p14, p15;
}
public class Sequence extends RhsPadding
Sequence實(shí)際value變量的左右均被填充了7個(gè)long型變量,其自身也是long型變量,一個(gè)long型變量占據(jù)8個(gè)字節(jié),所以序號(hào)與他上一個(gè)/下一個(gè)序號(hào)之間的最小內(nèi)存距離為:158=120byte,加上對(duì)象頭的8個(gè)字節(jié),可以確保sequence大小128byte=264byte(有的CPU緩存行是128byte)
這樣直接的代價(jià)就是增大的15倍的內(nèi)存消耗空間,這樣的設(shè)計(jì)導(dǎo)致不可能有兩個(gè)cursor出現(xiàn)在同一個(gè)cpu cache line中, 就解決了”偽共享”問(wèn)題!
六、demo應(yīng)用
- 定義事件,Event就是通過(guò) Disruptor 進(jìn)行交換的數(shù)據(jù)類型(事件監(jiān)聽模式)
public class LongEvent
{
private long value;
public void set(long value)
{
this.value = value;
}
}
- 定義事件工廠
{
public LongEvent newInstance()
{
return new LongEvent();
}
}
- 事件的具體實(shí)現(xiàn)
public class LongEventHandler implements EventHandler<LongEvent>
{
//sequence是上圖環(huán)形數(shù)組中的序列號(hào)
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("Event: " + event);
}
}
- 啟動(dòng)disruptor
EventFactory<LongEvent> eventFactory = new LongEventFactory();
ExecutorService executor = Executors.newSingleThreadExecutor();
int ringBufferSize = 1024 * 1024; // RingBuffer 大小,必須是 2 的 N 次方;
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory,
ringBufferSize, executor, ProducerType.SINGLE,
new YieldingWaitStrategy());
EventHandler<LongEvent> eventHandler = new LongEventHandler();
disruptor.handleEventsWith(eventHandler);
disruptor.start();
- 發(fā)布事件
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
long sequence = ringBuffer.next();//請(qǐng)求下一個(gè)事件序號(hào);
try {
LongEvent event = ringBuffer.get(sequence);//獲取該序號(hào)對(duì)應(yīng)的事件對(duì)象;
long data = getEventData();//獲取要通過(guò)事件傳遞的業(yè)務(wù)數(shù)據(jù);
event.set(data);
} finally{
ringBuffer.publish(sequence);//發(fā)布事件;
}
六 應(yīng)用場(chǎng)景
- wacai-zipkin鏈路跟蹤服務(wù)端kafka日志處理
- Log4J2
三克油
fudata技術(shù)沙龍還需要伙計(jì)們繼續(xù)努力添磚加瓦!













