netty源碼分析之揭開reactor線程的面紗(一)

netty最核心的就是reactor線程,對應(yīng)項目中使用廣泛的NioEventLoop,那么NioEventLoop里面到底在干些什么事?netty是如何保證事件循環(huán)的高效輪詢和任務(wù)的及時執(zhí)行?又是如何來優(yōu)雅地fix掉jdk的nio bug?帶著這些疑問,本篇文章將庖丁解牛,帶你逐步了解netty reactor線程的真相[源碼基于4.1.6.Final]

reactor 線程的啟動

NioEventLoop的run方法是reactor線程的主體,在第一次添加任務(wù)的時候被啟動

NioEventLoop 父類 SingleThreadEventExecutor 的execute方法

@Override
public void execute(Runnable task) {
    ...
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        ...
    }
    ...
}

外部線程在往任務(wù)隊列里面添加任務(wù)的時候執(zhí)行 startThread() ,netty會判斷reactor線程有沒有被啟動,如果沒有被啟動,那就啟動線程再往任務(wù)隊列里面添加任務(wù)

private void startThread() {
    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
            doStartThread();
        }
    }
}

SingleThreadEventExecutor 在執(zhí)行doStartThread的時候,會調(diào)用內(nèi)部執(zhí)行器executor的execute方法,將調(diào)用NioEventLoop的run方法的過程封裝成一個runnable塞到一個線程中去執(zhí)行

private void doStartThread() {
    ...
    executor.execute(new Runnable() {
        @Override
        public void run() {
            thread = Thread.currentThread();
            ...
                SingleThreadEventExecutor.this.run();
            ...
        }
    }
}

該線程就是executor創(chuàng)建,對應(yīng)netty的reactor線程實體。executor 默認(rèn)是ThreadPerTaskExecutor

默認(rèn)情況下,ThreadPerTaskExecutor 在每次執(zhí)行execute 方法的時候都會通過DefaultThreadFactory創(chuàng)建一個FastThreadLocalThread線程,而這個線程就是netty中的reactor線程實體

ThreadPerTaskExecutor

public void execute(Runnable command) {
    threadFactory.newThread(command).start();
}

關(guān)于為啥是 ThreadPerTaskExecutorDefaultThreadFactory的組合來new一個FastThreadLocalThread,這里就不再詳細(xì)描述,通過下面幾段代碼來簡單說明

標(biāo)準(zhǔn)的netty程序會調(diào)用到NioEventLoopGroup的父類MultithreadEventExecutorGroup的如下代碼

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
}

然后通過newChild的方式傳遞給NioEventLoop

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}

關(guān)于reactor線程的創(chuàng)建和啟動就先講這么多,我們總結(jié)一下:netty的reactor線程在添加一個任務(wù)的時候被創(chuàng)建,該線程實體為 FastThreadLocalThread(這玩意以后會開篇文章重點(diǎn)講講),最后線程執(zhí)行主體為NioEventLooprun方法。

reactor 線程的執(zhí)行

那么下面我們就重點(diǎn)剖析一下 NioEventLoop 的run方法

@Override
protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }
            processSelectedKeys();
            runAllTasks(...);
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        ...
    }

我們抽取出主干,reactor線程做的事情其實很簡單,用下面一幅圖就可以說明

reactor action

reactor線程大概做的事情分為對三個步驟不斷循環(huán)

1.首先輪詢注冊到reactor線程對用的selector上的所有的channel的IO事件

select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
    selector.wakeup();
}

2.處理產(chǎn)生網(wǎng)絡(luò)IO事件的channel

processSelectedKeys();

3.處理任務(wù)隊列

runAllTasks(...);

下面對每個步驟詳細(xì)說明

select操作

select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
      selector.wakeup();
}

wakenUp 表示是否應(yīng)該喚醒正在阻塞的select操作,可以看到netty在進(jìn)行一次新的loop之前,都會將wakeUp 被設(shè)置成false,標(biāo)志新的一輪loop的開始,具體的select操作我們也拆分開來看

1.定時任務(wù)截止事時間快到了,中斷本次輪詢

int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

for (;;) {
    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
    if (timeoutMillis <= 0) {
        if (selectCnt == 0) {
            selector.selectNow();
            selectCnt = 1;
        }
        break;
    }
    ....
}

我們可以看到,NioEventLoop中reactor線程的select操作也是一個for循環(huán),在for循環(huán)第一步中,如果發(fā)現(xiàn)當(dāng)前的定時任務(wù)隊列中有任務(wù)的截止事件快到了(<=0.5ms),就跳出循環(huán)。此外,跳出之前如果發(fā)現(xiàn)目前為止還沒有進(jìn)行過select操作(if (selectCnt == 0)),那么就調(diào)用一次selectNow(),該方法會立即返回,不會阻塞

這里說明一點(diǎn),netty里面定時任務(wù)隊列是按照延遲時間從小到大進(jìn)行排序, delayNanos(currentTimeNanos)方法即取出第一個定時任務(wù)的延遲時間

protected long delayNanos(long currentTimeNanos) {
    ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
    if (scheduledTask == null) {
        return SCHEDULE_PURGE_INTERVAL;
    }
    return scheduledTask.delayNanos(currentTimeNanos);
 }

關(guān)于netty的任務(wù)隊列(包括普通任務(wù),定時任務(wù),tail task)相關(guān)的細(xì)節(jié)后面會另起一片文章,這里不過多展開

2.輪詢過程中發(fā)現(xiàn)有任務(wù)加入,中斷本次輪詢

for (;;) {
    // 1.定時任務(wù)截至事時間快到了,中斷本次輪詢
    ...
    // 2.輪詢過程中發(fā)現(xiàn)有任務(wù)加入,中斷本次輪詢
    if (hasTasks() && wakenUp.compareAndSet(false, true)) {
        selector.selectNow();
        selectCnt = 1;
        break;
    }
    ....
}

netty為了保證任務(wù)隊列能夠及時執(zhí)行,在進(jìn)行阻塞select操作的時候會判斷任務(wù)隊列是否為空,如果不為空,就執(zhí)行一次非阻塞select操作,跳出循環(huán)

3.阻塞式select操作

for (;;) {
    // 1.定時任務(wù)截至事時間快到了,中斷本次輪詢
    ...
    // 2.輪詢過程中發(fā)現(xiàn)有任務(wù)加入,中斷本次輪詢
    ...
    // 3.阻塞式select操作
    int selectedKeys = selector.select(timeoutMillis);
    selectCnt ++;
    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
        break;
    }
    ....
}

執(zhí)行到這一步,說明netty任務(wù)隊列里面隊列為空,并且所有定時任務(wù)延遲時間還未到(大于0.5ms),于是,在這里進(jìn)行一次阻塞select操作,截止到第一個定時任務(wù)的截止時間

這里,我們可以問自己一個問題,如果第一個定時任務(wù)的延遲非常長,比如一個小時,那么有沒有可能線程一直阻塞在select操作,當(dāng)然有可能!But,只要在這段時間內(nèi),有新任務(wù)加入,該阻塞就會被釋放

外部線程調(diào)用execute方法添加任務(wù)

@Override
public void execute(Runnable task) { 
    ...
    wakeup(inEventLoop); // inEventLoop為false
    ...
}

調(diào)用wakeup方法喚醒selector阻塞

protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
        selector.wakeup();
    }
}

可以看到,在外部線程添加任務(wù)的時候,會調(diào)用wakeup方法來喚醒 selector.select(timeoutMillis)

阻塞select操作結(jié)束之后,netty又做了一系列的狀態(tài)判斷來決定是否中斷本次輪詢,中斷本次輪詢的條件有

  • 輪詢到IO事件 (selectedKeys != 0
  • oldWakenUp 參數(shù)為true
  • 任務(wù)隊列里面有任務(wù)(hasTasks
  • 第一個定時任務(wù)即將要被執(zhí)行 (hasScheduledTasks()
  • 用戶主動喚醒(wakenUp.get()

4.解決jdk的nio bug

關(guān)于該bug的描述見 http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6595055)

該bug會導(dǎo)致Selector一直空輪詢,最終導(dǎo)致cpu 100%,nio server不可用,嚴(yán)格意義上來說,netty沒有解決jdk的bug,而是通過一種方式來巧妙地避開了這個bug,具體做法如下

long currentTimeNanos = System.nanoTime();
for (;;) {
    // 1.定時任務(wù)截止事時間快到了,中斷本次輪詢
    ...
    // 2.輪詢過程中發(fā)現(xiàn)有任務(wù)加入,中斷本次輪詢
    ...
    // 3.阻塞式select操作
    selector.select(timeoutMillis);
    // 4.解決jdk的nio bug
    long time = System.nanoTime();
    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
        selectCnt = 1;
    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {

        rebuildSelector();
        selector = this.selector;
        selector.selectNow();
        selectCnt = 1;
        break;
    }
    currentTimeNanos = time; 
    ...
 }

netty 會在每次進(jìn)行 selector.select(timeoutMillis) 之前記錄一下開始時間currentTimeNanos,在select之后記錄一下結(jié)束時間,判斷select操作是否至少持續(xù)了timeoutMillis秒(這里將time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos改成time - currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)或許更好理解一些),
如果持續(xù)的時間大于等于timeoutMillis,說明就是一次有效的輪詢,重置selectCnt標(biāo)志,否則,表明該阻塞方法并沒有阻塞這么長時間,可能觸發(fā)了jdk的空輪詢bug,當(dāng)空輪詢的次數(shù)超過一個閥值的時候,默認(rèn)是512,就開始重建selector

空輪詢閥值相關(guān)的設(shè)置代碼如下

int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
    selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;

下面我們簡單描述一下netty 通過rebuildSelector來fix空輪詢bug的過程,rebuildSelector的操作其實很簡單:new一個新的selector,將之前注冊到老的selector上的的channel重新轉(zhuǎn)移到新的selector上。我們抽取完主要代碼之后的骨架如下

public void rebuildSelector() {
    final Selector oldSelector = selector;
    final Selector newSelector;
    newSelector = openSelector();

    int nChannels = 0;
     try {
        for (;;) {
                for (SelectionKey key: oldSelector.keys()) {
                    Object a = key.attachment();
                     if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                         continue;
                     }
                     int interestOps = key.interestOps();
                     key.cancel();
                     SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                     if (a instanceof AbstractNioChannel) {
                         ((AbstractNioChannel) a).selectionKey = newKey;
                      }
                     nChannels ++;
                }
                break;
        }
    } catch (ConcurrentModificationException e) {
        // Probably due to concurrent modification of the key set.
        continue;
    }
    selector = newSelector;
    oldSelector.close();
}

首先,通過openSelector()方法創(chuàng)建一個新的selector,然后執(zhí)行一個死循環(huán),只要執(zhí)行過程中出現(xiàn)過一次并發(fā)修改selectionKeys異常,就重新開始轉(zhuǎn)移

具體的轉(zhuǎn)移步驟為

  1. 拿到有效的key
  2. 取消該key在舊的selector上的事件注冊
  3. 將該key對應(yīng)的channel注冊到新的selector上
  4. 重新綁定channel和新的key的關(guān)系

轉(zhuǎn)移完成之后,就可以將原有的selector廢棄,后面所有的輪詢都是在新的selector進(jìn)行

最后,我們總結(jié)reactor線程select步驟做的事情:不斷地輪詢是否有IO事件發(fā)生,并且在輪詢的過程中不斷檢查是否有定時任務(wù)和普通任務(wù),保證了netty的任務(wù)隊列中的任務(wù)得到有效執(zhí)行,輪詢過程順帶用一個計數(shù)器避開了了jdk空輪詢的bug,過程清晰明了

由于篇幅原因,下面兩個過程將分別放到一篇文章中去講述,盡請期待

process selected keys

未完待續(xù)

run tasks

未完待續(xù)

最后,通過文章開頭一副圖,我們再次熟悉一下netty的reactor線程做的事兒


reactor action
  1. 輪詢IO事件
  2. 處理輪詢到的事件
  3. 執(zhí)行任務(wù)隊列中的任務(wù)

如果你想從零到一深入學(xué)習(xí) Netty,可以加我微信(備注:簡書專享)。

閃電俠

贈送一份目前正在掘金售賣的小冊一份

掘金小冊
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容