概述
Netty的IO事件分別為讀事件(OP_READ)、寫(xiě)事件(OP_WRITE)、接收事件(OP_ACCEPT)、連接事件(OP_CONNECT)。其中讀、寫(xiě)事件可以發(fā)生在客戶端與服務(wù)端。接收事件只發(fā)生在服務(wù)端,服務(wù)端啟動(dòng)后會(huì)注冊(cè)接收事件監(jiān)聽(tīng)客戶端連接。連接事件只發(fā)生在客戶端,客戶端啟動(dòng)時(shí)會(huì)連接服務(wù)端。Netty任務(wù)分為普通任務(wù)(通過(guò)execute(Runnable task) 執(zhí)行)與定時(shí)任務(wù)(通過(guò)schedule(Runnable task,long delay,TimeUnit unit)執(zhí)行)。無(wú)論是IO事件還是任務(wù),都是通過(guò)NioEventLoop中對(duì)應(yīng)的線程來(lái)進(jìn)行處理。
NioEventLoop創(chuàng)建過(guò)程
在實(shí)例化NioEventLoopGroup時(shí),默認(rèn)會(huì)創(chuàng)建2倍CPU核心數(shù)的NioEventLoop。對(duì)于bossGroup來(lái)說(shuō),雖然會(huì)創(chuàng)建這么多NioEventLoop,但是如果只綁定一個(gè)端口進(jìn)行事件監(jiān)聽(tīng),實(shí)際上只會(huì)用到一個(gè)NioEventLoop,也就是說(shuō)只有一個(gè)線程在循環(huán)處理事件與任務(wù)。
EventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup UML圖:

NioEventLoopGroup無(wú)參構(gòu)造方法最終會(huì)調(diào)到下面的構(gòu)造方法:
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
/**
* nThreads:創(chuàng)建線程的數(shù)量,如果不傳,后續(xù)會(huì)默認(rèn)為2倍CPU核心數(shù)
* executor:默認(rèn)為null,在NioEventLoopGroup父類中會(huì)進(jìn)行初始化
* selectorProvider:用于創(chuàng)建Java NIO的Selector對(duì)象
* selectStrategyFactory:IO多路復(fù)用器策略工廠,值為DefaultSelectStrategyFactory
* RejectedExecutionHandlers.reject():拒絕策略,當(dāng)線程池任務(wù)隊(duì)列滿了后在往其中添加任務(wù)會(huì)觸發(fā)該拒絕策略
*/
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
NioEventLoopGroup構(gòu)造方法中會(huì)調(diào)用其父類MultithreadEventLoopGroup的構(gòu)造方法,該構(gòu)造方法會(huì)初始化默認(rèn)的線程數(shù)量,常量DEFAULT_EVENT_LOOP_THREADS值為2倍CPU核心數(shù):
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
MultithreadEventLoopGroup構(gòu)造方法中會(huì)調(diào)用其父類MultithreadEventExecutorGroup的構(gòu)造方法,該構(gòu)造方法會(huì)初始化線程執(zhí)行選擇器工廠,常量DefaultEventExecutorChooserFactory.INSTANCE值為EventExecutorChooserFactory:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
最終會(huì)調(diào)用MultithreadEventExecutorGroup如下構(gòu)造方法,在該方法中會(huì)循環(huán)創(chuàng)建NioEventLoop:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (executor == null) {
// 創(chuàng)建線程執(zhí)行器
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 創(chuàng)建EventExecutor數(shù)組,用來(lái)保存NioEventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
// ...
// 創(chuàng)建NioEventLoop
children[i] = newChild(executor, args);
// ...
}
// 創(chuàng)建線程選擇器
chooser = chooserFactory.newChooser(children);
// ...
}
創(chuàng)建線程執(zhí)行器
線程執(zhí)行器ThreadPerTaskExecutor#execute方法內(nèi)部使用ThreadFactory來(lái)創(chuàng)建并啟動(dòng)線程,其中ThreadFactory就是調(diào)用其構(gòu)造方法傳入的DefaultThreadFactory,DefaultThreadFactory#newThread方法會(huì)創(chuàng)建線程,并設(shè)置線程屬性,如線程名稱等:
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
// 使用線程工廠創(chuàng)建并啟動(dòng)線程
threadFactory.newThread(command).start();
}
}
創(chuàng)建NioEventLoop
調(diào)用NioEventLoopGroup#newChild方法進(jìn)行NioEventLoop的創(chuàng)建:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
方法內(nèi)部會(huì)調(diào)用NioEventLoop的構(gòu)造方法進(jìn)行創(chuàng)建:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 調(diào)用父類構(gòu)造方法,創(chuàng)建任務(wù)隊(duì)列以及初始化父類屬性
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 創(chuàng)建IO多路復(fù)用器
selector = openSelector();
selectStrategy = strategy;
}
該構(gòu)造方法中有兩個(gè)比較重要的操作:一是調(diào)用父類構(gòu)造方法創(chuàng)建任務(wù)隊(duì)列等,二是調(diào)用openSelector方法創(chuàng)建IO多路復(fù)用器,我們先看openSelector方法:
private Selector openSelector() {
final Selector selector;
try {
// 創(chuàng)建Java Nio的Selector對(duì)象
selector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 是否禁用對(duì)Java Selector的優(yōu)化,如果禁用則標(biāo)識(shí)不優(yōu)化直接返回Java的Selector,默認(rèn)為false
if (DISABLE_KEYSET_OPTIMIZATION) {
return selector;
}
// 創(chuàng)建SelectedSelectionKeySet,底層是數(shù)組實(shí)現(xiàn),用于替換Java Selector底層selectedKeys的數(shù)據(jù)結(jié)構(gòu)
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
// 通過(guò)反射獲取到Selector的實(shí)現(xiàn)類
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (ClassNotFoundException e) {
// ...
}
}
});
// ...
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
// 獲取selectedKeys、publicSelectedKeys屬性
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
// 通過(guò)反射將新的key結(jié)構(gòu)替換原生結(jié)構(gòu)
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
// ...
}
}
});
// 保存selectedKeys屬性,后續(xù)處理IO事件信息時(shí)可以直接通過(guò)該屬性獲取事件信息
selectedKeys = selectedKeySet;
return selector;
}
該方法會(huì)創(chuàng)建Java Selector,并通過(guò)反射替換其對(duì)應(yīng)的selectedKeys、publicSelectedKeys屬性。這里Netty對(duì)原生Selector數(shù)據(jù)結(jié)構(gòu)進(jìn)行了優(yōu)化,由原本HashSet實(shí)現(xiàn)的數(shù)據(jù)結(jié)構(gòu)替換為了Netty基于數(shù)組實(shí)現(xiàn)的數(shù)據(jù)結(jié)構(gòu):SelectedSelectionKeySet。
接下來(lái)繼續(xù)跟進(jìn),該構(gòu)造方法中會(huì)調(diào)用父類的構(gòu)造方法,對(duì)父類屬性進(jìn)行初始化,先看下NioEventLoop UML圖:

NioEventLoop父類為SingleThreadEventLoop,在其父類構(gòu)造方法中會(huì)創(chuàng)建一個(gè)MpscQueue類型的隊(duì)列:
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
// 用來(lái)存放收尾工作的任務(wù)隊(duì)列
tailTasks = newTaskQueue(maxPendingTasks);
}
SingleThreadEventLoop構(gòu)造方法中會(huì)調(diào)用其父類SingleThreadEventExecutor的構(gòu)造方法:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
// 最終會(huì)調(diào)用AbstractEventExecutor構(gòu)造函數(shù)保存parent,此處parent即為NioEventLoopGroup
super(parent);
// 默認(rèn)false,當(dāng)且僅當(dāng)調(diào)用addTask(Runnable)將喚醒執(zhí)行線程
this.addTaskWakesUp = addTaskWakesUp;
// 最大等待任務(wù)數(shù),默認(rèn)值為Integer.MAX_VALUE
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// 保存線程執(zhí)行器
this.executor = ObjectUtil.checkNotNull(executor, "executor");
// 創(chuàng)建MpscQueue類型任務(wù)隊(duì)列
taskQueue = newTaskQueue(this.maxPendingTasks);
// 保存拒絕策略
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
NioEventLoop線程啟動(dòng)
服務(wù)端NioEventLoop中的線程啟動(dòng)是在channel注冊(cè)時(shí)觸發(fā)的,我們?cè)賮?lái)回顧下:
#AbstractChannel#AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// ...
// 判斷當(dāng)前線程是否為IO線程,因?yàn)楫?dāng)前線程為主線程,且此時(shí)IO線程還未創(chuàng)建,所以會(huì)走到else方法
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 異步執(zhí)行任務(wù)
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// ...
}
}
}
NioEventLoop#execute方法實(shí)現(xiàn)在其父類SingleThreadEventExecutor中:
#SingleThreadEventExecutor
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 當(dāng)前線程為主線程,且此時(shí)IO線程還未創(chuàng)建,inEventLoop方法返回false
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
// 啟動(dòng)線程
startThread();
// 添加任務(wù)
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
// 喚醒線程
wakeup(inEventLoop);
}
}
execute方法會(huì)主要分為三步,第一步:調(diào)用startThread方法創(chuàng)建并啟動(dòng)線程;第二步:將任務(wù)添加到任務(wù)隊(duì)列中;第三步:?jiǎn)拘丫€程執(zhí)行任務(wù)。先看下startThread方法:
private void startThread() {
// 線程是否尚未啟動(dòng)
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
// 將ST_NOT_STARTED設(shè)置為ST_STARTED
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
// 啟動(dòng)線程
doStartThread();
}
}
}
startThread方法最終會(huì)調(diào)用doStartThread方法來(lái)啟動(dòng)線程:
private void doStartThread() {
assert thread == null;
// 使用線程執(zhí)行器創(chuàng)建并啟動(dòng)線程
executor.execute(new Runnable() {
@Override
public void run() {
// 保存創(chuàng)建的線程
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 處理IO事件與異步任務(wù)
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// ...
}
}
});
}
調(diào)用executor#execute方法創(chuàng)建并啟動(dòng)線程,這里的executor類型為:ThreadPerTaskExecutor,是實(shí)例化NioEventLoopGroup時(shí),在其父類MultithreadEventExecutorGroup中創(chuàng)建的,ThreadPerTaskExecutor#execute方法內(nèi)部通過(guò)線程工廠創(chuàng)建并啟動(dòng)線程。線程啟動(dòng)后主要做兩件事情,一:保存創(chuàng)建的線程,二:處理IO事件與異步任務(wù)(后面會(huì)講到)。
線程啟動(dòng)后,會(huì)調(diào)用addTask方法,將任務(wù)放到任務(wù)隊(duì)列中,等待線程處理:
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
addTask方法會(huì)嘗試將任務(wù)放入taskQueue中,如果放入失敗則會(huì)觸發(fā)拒絕策略。
任務(wù)添加到taskQueue中后,會(huì)調(diào)用wakeup方法喚醒線程,因?yàn)榇藭r(shí)線程可能因?yàn)闆](méi)有任務(wù)而進(jìn)入到阻塞狀態(tài),使用wakeup方法可以將線程從阻塞中喚醒,處理任務(wù)。
IO事件與任務(wù)處理流程
NioEventLoop中的線程啟動(dòng)后,會(huì)一直循環(huán)處理IO事件與異步任務(wù):
SingleThreadEventExecutor.this.run();
該方法會(huì)調(diào)用其子類NioEventLoop的run方法,看下NioEventLoop的run方法:
protected void run() {
for (;;) {
try {
// 檢測(cè)IO事件
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio控制IO事件與非IO事件執(zhí)行時(shí)間占比
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// 處理IO事件
processSelectedKeys();
} finally {
// 處理普通任務(wù)與定時(shí)任務(wù)
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// 根據(jù)比率計(jì)算非IO事件任務(wù)處理事件
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
IO事件與任務(wù)處理主要分為三步,第一:檢測(cè)IO事件,第二:處理IO事件,第三:處理普通任務(wù)與定時(shí)任務(wù)。
檢測(cè)IO事件
通過(guò)SelectStrategy#calculateStrategy方法計(jì)算走什么策略,SelectStrategy是在創(chuàng)建NioEventLoop時(shí)通過(guò)IO多路復(fù)用器策略工廠DefaultSelectStrategyFactory進(jìn)行創(chuàng)建的:
#DefaultSelectStrategy
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
hasTasks值為true,標(biāo)識(shí)taskQueue或者tailQueue中有任務(wù),則調(diào)用IntSupplier#get方法,該方法內(nèi)部會(huì)調(diào)用Selector#selectNow方法,selectNow方法是一個(gè)非阻塞方法,不管有沒(méi)有IO事件都會(huì)立即返回。如果任務(wù)隊(duì)列中沒(méi)有任務(wù),則直接返回SelectStrategy.SELECT。
如果SelectStrategy#calculateStrategy方法返回SelectStrategy.SELECT,則會(huì)嘗試將wakenUp屬性設(shè)置為false,并調(diào)用select方法。因?yàn)镾elector#wakeup方法是一個(gè)比較耗時(shí)的操作,而用戶線程和IO線程都有可能操作該屬性,因此使用原子操作防止多個(gè)線程重復(fù)喚醒。接著看下select方法:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 計(jì)算最近一次要執(zhí)行的定時(shí)任務(wù)的最后期限,如果定時(shí)任務(wù)隊(duì)列中沒(méi)有任務(wù)則返回當(dāng)前事件+1s
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 計(jì)算定時(shí)任務(wù)將要執(zhí)行的事件與當(dāng)前時(shí)間的時(shí)間差
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
// 如果時(shí)間差<0.5s,且沒(méi)有調(diào)用select方法阻塞過(guò),則調(diào)用selectNow方法,然后退出循環(huán)
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 如果任務(wù)隊(duì)列有任務(wù),且嘗試設(shè)置wakenUp屬性為true成功,則調(diào)用selectNow方法,然后退出循環(huán)
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 阻塞等待IO事件
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
// 如果有其中一種情況則退出循環(huán):有IO事件、外部線程喚醒、任務(wù)對(duì)立有任務(wù)、定時(shí)任務(wù)隊(duì)列有任務(wù)
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
if (Thread.interrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
// 重建selector,解決JDK空輪詢bug
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
} catch (CancelledKeyException e) {
// ...
}
}
IO事件處理
processSelectedKeys方法,用來(lái)處理所有IO事件:
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
Netty默認(rèn)會(huì)開(kāi)啟對(duì)Selector的優(yōu)化,所以會(huì)進(jìn)入processSelectedKeysOptimized方法處理IO事件:
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
// 獲取事件
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys[i] = null;
// 獲取attachment
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 處理IO事件
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
// ...
}
}
最終會(huì)調(diào)用processSelectedKey方法進(jìn)行IO事件的處理:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// ...
try {
int readyOps = k.readyOps();
// 處理OP_CONNECT事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 處理OP_WRITE事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 處理OP_READ、OP_ACCEPT事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
方法內(nèi)部會(huì)調(diào)用Unsafe類的方法進(jìn)行處理,以O(shè)P_ACCEPT事件為例,我們看下其read方法,OP_ACCEPT事件的read方法是在AbstractNioMessageChannel類的內(nèi)部類NioMessageUnsafe中:
#AbstractNioMessageChannel
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 處理OP_ACCEPT事件
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 觸發(fā)channelRead事件
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
// 觸發(fā)channelReadComplete事件
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
NioMessageUnsafe#read方法主要做了兩件事,一是:調(diào)用NioServerSocketChannel#doReadMessages方法處理事件,二是:調(diào)用ChannelPipeline發(fā)送channelRead、channelReadComplete事件。
NioServerSocketChannel#doReadMessages方法中會(huì)調(diào)用Java的ServerSocketChannel方法建立連接:
#NioServerSocketChannel
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
到此IO事件處理流程就結(jié)束了,真正的事件處理還是由不同的Unsafe類調(diào)用對(duì)應(yīng)的channel中的方法來(lái)進(jìn)行處理。
任務(wù)處理
runAllTasks方法用來(lái)處理普通任務(wù)與定時(shí)任務(wù):
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
// 從定時(shí)任務(wù)隊(duì)列中獲取可調(diào)度的任務(wù)放入到taskQueue中
fetchedAll = fetchFromScheduledTaskQueue();
// 從taskQueue中取出任務(wù),執(zhí)行其run方法
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
runAllTasks方法還是比較容易理解,方法主要做了兩件事,一是:將可調(diào)度的定時(shí)任務(wù)從scheduledTaskQueue隊(duì)列放入到taskQueue隊(duì)列中,然后循環(huán)取出taskQueue中的任務(wù),執(zhí)行其run方法。