一、功能概述
前兩篇我們分別介紹了EventLoopGroup和EventLoop在netty中的作用。但是僅僅知道這些,可能對netty如何完成一整個網(wǎng)絡(luò)事件監(jiān)控到任務(wù)分發(fā)處理還是有些模糊。本篇我們要分析一下netty的啟動流程。在我們使用netty編程的時候,我們的使用ServerBootstrap和Bootstrap來實現(xiàn)服務(wù)端和客戶端的啟動。我們先來看一下這兩個類的相關(guān)類圖:

netty定義了抽象類AbstractBootstrap,然后在此基礎(chǔ)上實現(xiàn)了ServerBootstrap和Bootstrap分別作為服務(wù)端和客戶端的啟動類。本篇,我們以ServerBootstrap為例,分析一下服務(wù)端的啟動流程。
我們根據(jù)下面的測試代碼分析一下具體的流程,這也是我們創(chuàng)建netty服務(wù)端的基本流程:
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {.
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
二、ServerBootstrap啟動的過程
我們看一下上面測試代碼的內(nèi)容,先是創(chuàng)建了兩個EventLoopGroup的對象,bossGroup、workerGroup。然后又創(chuàng)建了一個ServerBootstrap對象,將這兩個EventLoopGroup注冊到ServerBootstrap中,然后設(shè)置一系列的參數(shù),這些方法其實都是簡單地設(shè)置ServerBootstrap的一些屬性。設(shè)置完這些屬性調(diào)用bind()方法實現(xiàn)端口的綁定,也就是整個netty服務(wù)端啟動的核心過程。我們來分析一下bind()方法的的過程:
public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
bind()方法在父類AbstractBootstrap中,這里我們有理由懷疑,這部分有用到了模板方法模式。首先調(diào)用了validate()方法,來驗證參數(shù)的完整性,我們來看一下都驗證了那些內(nèi)容:
/**
* Validate all the parameters. Sub-classes may override this, but should
* call the super method in that case.
*/
public B validate() {
if (group == null) {
throw new IllegalStateException("group not set");
}
if (channelFactory == null) {
throw new IllegalStateException("channel or channelFactory not set");
}
return self();
}
這里驗證了group和channelFactory這兩個屬性。在我們調(diào)用b.group(bossGroup, workerGroup)的時候,bossGroup就是validate()方法中驗證的對象。AbstractBootstrap這個抽象類只持有了這一個EventLoopGroup對象,而workerGroup是我們ServerBootstrap擴展出來的。我們可以看到AbstractBootstrap.validate的方法注釋中寫道,子類可以重寫這個方法,但是必須要調(diào)用super method。所以我們看一下ServerBootstrap有么有重寫這個方法:
@Override
public ServerBootstrap validate() {
super.validate();
if (childHandler == null) {
throw new IllegalStateException("childHandler not set");
}
if (childGroup == null) {
logger.warn("childGroup is not set. Using parentGroup instead.");
childGroup = config.group();
}
return this;
}
果不其然,ServerBootstrap也重寫了這個方法,這里驗證了childGroup,也就是workerGroup。也驗證了childHandler。這個childHandler我們后面在分析。
做完驗證之后,調(diào)用了doBind(final SocketAddress localAddress)方法。我們看一下這個方法的內(nèi)容。這個方法中,先是調(diào)用initAndRegister()創(chuàng)建并且注冊Channel,并返回ChannelFuture。這個方法我們后面在看,我們先把doBind方法的主要流程理清楚。接下來,判斷創(chuàng)建的Channel是否注冊完成,如果完成就直接調(diào)用doBind0(regFuture, channel, localAddress, promise),否則就添加一個監(jiān)聽器,等到Channel是否注冊完成后再調(diào)用doBind0(regFuture, channel, localAddress, promise)。
doBind0(regFuture, channel, localAddress, promise)做了什么呢?其實也很簡單,獲取channel綁定的eventLoop事件處理器,然后提交一個任務(wù),任務(wù)的內(nèi)容就是調(diào)用channel的bind(SocketAddress localAddress, ChannelPromise promise)方法。其實這不是真正的將我們的服務(wù)綁定到指定端口的方法,這里的bind只是執(zhí)行了一個生名周期中的回調(diào)方法而已,調(diào)用所有注冊到ChannelPipeline當中的ChannelOutboundHandler對象的bind方法。
那么真正的將我們的服務(wù)綁定到指定端口的操作在哪里呢?我們目前還有initAndRegister()沒有分析,所以,真正的綁定端口的操作,一定在這個方法里!我們看一下:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
首先,通過我們設(shè)置的channelFactory來創(chuàng)建一個Channel。然后調(diào)用init方法來初始化Channel。我們可以看一下init方法。
abstract void init(Channel channel) throws Exception
是一個抽象方法,交給子類去實現(xiàn),這印證了我們剛開始猜測的·AbstractBootstrap.bind·是一個模板方法。我們先不看ServerBootstrap是怎么初始化channel的。我們還是先看一下initAndRegister()整體的流程。在初始化完channel后,通過調(diào)用config().group().register(channel)將channel注冊到EventLoopGroup中,這個EventLoopGroup到底是哪個EventLoopGroup呢?其實是我們傳入的bossGroup。這個過程做了什么操作呢?我們以傳入的NioServerSocketChannel為例:結(jié)合我們前兩偏分析的NioEventLoopGroup、NioEventLoop,我們可以知道,這個操作就是把NioServerSocketChannel持有的java的SelectableChannel注冊到NioEventLoop持有的java的Selector中去,讓Selector來監(jiān)聽IO事件。注冊完之后initAndRegister()的處理過程就結(jié)束了。我們回過頭去看ServerBootstrap是怎么實現(xiàn)abstract void init(Channel channel) throws Exception的:
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
首先一通操作,將ServerBootstrap設(shè)置的屬性都應(yīng)用在channel上。這個不是我們分析的重點。接下來的一段代碼看似很長,但是就只做了一件事,就是將channel的ChannelPipeline中添加一個事件處理器:ServerBootstrapAcceptor。ServerBootstrapAcceptor本身繼承了ChannelInboundHandlerAdapter。那么ServerBootstrapAcceptor做了什么呢?我們看一下它的channelRead方法:
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
我們可以看到就是將傳入的對象轉(zhuǎn)成Channel對象,然后將這個channel注冊到childGroup中。到此整個bind的過程就結(jié)束了,這是什么操作?怎么這么迷呢?
到這里,把前兩篇分析的NioEventLoop、NioEventLoopGroup再串起來一起看。
首先,在啟動的過程中,ServerBootstrap創(chuàng)建了NioServerSocketChannel,并且把NioServerSocketChannel持有的SelectableChannel注冊到bossGroup中的NioEventLoop持有的Selector中。NioEventLoop一直在做select操作,監(jiān)聽SelectableChannel的IO事件。當監(jiān)聽到處理的事件之后,會根據(jù)SelectionKey被attach的對象來調(diào)用不同的processSelectedKey的重載方法處理。(NioEventLoop的邏輯)。那我們在回過頭來看一看,在啟動的時候,Channel注冊到EventLoop返回的SelectionKey被attach的是什么:
//AbstractNioChannel的doRegister方法
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
我們可以看到attach的對象就是自身,也就是AbstractNioChannel類型的對象。所以接下來會調(diào)用processSelectedKey(SelectionKey k, AbstractNioChannel ch)進行處理,而我們的NioServerSocketChannel感興趣的事件只有ACCEPT事件,所以方法最終會走到下面這個邏輯:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
這里的unsafe是NioMessageUnsafe的對象。我們看看read方法究竟干了什么:
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
read方法首先是調(diào)用doReadMessages來把數(shù)據(jù)讀到readBuf中。而doReadMessages就是調(diào)用ServerSocketChannel的accept方法獲取和客戶端通信的SocketChannel對象。然后將這個對象交給pipeline去處理,調(diào)用pipeline.fireChannelRead()方法。這意味著什么呢?
這意味著這個事件會流轉(zhuǎn)到我們上面提到的ServerBootstrapAcceptor處理。也就是會把和客戶端通信的SocketChannel注冊到我們注冊的workerGroup當中。由workerGroup去監(jiān)聽SocketChannel后續(xù)的事件并且處理相關(guān)的事件!到這里,大家是不是對ServerBootstrap啟動時做的事情比較清晰了?
三、復(fù)盤
我們本篇講了ServerBootstrap在啟動過程中做的事情。ServerBootstrap接受了兩個EventLoopGroup的參數(shù),我們這里分別叫它bossGroup和workerGroup。兩個EventLoopGroup通過ServerBootstrapAcceptor為橋梁建立起了聯(lián)系。bossGroup負責監(jiān)聽ACCEPT事件,監(jiān)聽到ACCEPT事件之后,將用于和客戶端通信的SocketChannel注冊到workerGroup,由workerGroup監(jiān)聽后續(xù)的讀事件并且做業(yè)務(wù)邏輯處理。這里我們可以看到,其實bossGroup做的事情是非常少的,業(yè)務(wù)邏輯最后都會交給workerGroup去處理。所以在我們創(chuàng)建bossGroup,我們不需要指定太多的線程。反而workerGroup要創(chuàng)建跟多的線程去處理業(yè)務(wù)邏輯。
講到這里,我們再來理一下netty的線程模型:
首先,對于EventLoop我們可以看一下,我們可以用于生產(chǎn)的EventLoop的實現(xiàn)類都是繼承了SingleThreadEventLoop的。所以一個EventLoop的對象只會綁定一個線程。一個EventLoopGroup可以管理多個EventLoop。而一個Channel只會被綁定到一個EventLoop上。

接下來,我想提一下Reactor模型。什么是Reactor模型?簡單來說,Reactor模型就是IO多路復(fù)用+線程池。網(wǎng)絡(luò)模型使用IO多路復(fù)用,處理任務(wù)使用線程池。而Reactor模型又分為單Reactor單線程、單Reactor多線程、多Reactor多線程。我們使用netty的時候,可以通過控制bossGroup和workerGroup的數(shù)量來靈活的實現(xiàn)上述三種Reactor模式。而且我們可以通過bossGroup和workerGroup使用同一個對象來實現(xiàn)IO和事件處理使用同一個EventLoopGroup。ServerBootstrap也提供了只有一個參數(shù)的group方法實現(xiàn)了這個功能。
除此之外,假如我們調(diào)用ServerBootstrap的group方法時,傳的參數(shù)是EventLoop對象可不可以呢?完全是可以的,因為EventLoop繼承了EventLoopGroup。這兩個接口有相同的功能窗口。這也是為什么EventLoop要繼承EventLoopGroup的另一個原因。使EventLoop可以脫離EventLoopGroup單獨使用!