工作時(shí)候,有用過Netty寫過網(wǎng)絡(luò)庫(kù)。最近想研究下RPC框架,就想著寫幾篇博客,梳理下Netty的源碼。(研究的源碼版本是4.1.x)
最簡(jiǎn)單的示例
首先還是拿Netty 官網(wǎng)的UserGuide做例子
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
/**
* Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new DiscardServer(port).run();
}
}
Netty的源碼還是很復(fù)雜的,所以研究時(shí)候有種無處下手的感覺。但是Netty作為一個(gè)NIO的網(wǎng)絡(luò)框架,其底層肯定是使用了java jdk 的nio編程。所以我們可以通過對(duì)比java nio編程 與 Netty源碼,來了解Netty究竟對(duì)java 的nio做了什么樣的封裝。
一個(gè)java nio編程的例子
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
while(true){
SocketChannel socketChannel =
serverSocketChannel.accept();
if(socketChannel != null){
//do something with socketChannel...
}
}
例子是網(wǎng)上隨便粘的,當(dāng)然簡(jiǎn)化了很多很多東西,這篇博客只會(huì)分析服務(wù)端channel的創(chuàng)建,所以這些暫時(shí)也夠了。
總結(jié)下:
1)創(chuàng)建ServerSocketChannel
2)綁定某個(gè)端口
3)設(shè)置非阻塞
4)監(jiān)聽新的連接
后面分析源碼也會(huì)試著找下Netty對(duì)應(yīng)的源碼在哪
創(chuàng)建一個(gè)NioServerSocketChannel
具體的調(diào)用流程如下圖

private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//省略....
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
}
//省略...
}
ReflectiveChannelFactory
public T newChannel() {
try {
return clazz.newInstance(); //這里的clazz就是當(dāng)時(shí)從ServerBootstrap.channel方法里傳進(jìn)去的NioServerSocketChannel
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
代碼流程還是很清晰的,總結(jié)下就是 Netty會(huì)在ServerBootstrap里,通過工廠創(chuàng)建一個(gè)ServerChannel
在本例中,工廠是默認(rèn)的 ReflectiveChannelFactory(通過反射創(chuàng)建Channel),創(chuàng)造出的Channel類型是NioServerSocketChannel
NioServerSocketChannel 的初始化
分析完了NioServerSocketChannel接下來看看NioServerSocketChannel究竟在初始化時(shí)候做了啥。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a >#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
1、創(chuàng)建一個(gè)對(duì)應(yīng)的jdk channel
通過 SelectorProvider.openServerSocketChannel() 創(chuàng)建了一個(gè)ServerSocketChannel 這個(gè)ServerSocketChannel就是 上文,nio編程的例子里的jdk channel
2、AbstractNioChannel構(gòu)造方法
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
1)NioServerSocketChannel最終會(huì)調(diào)用父類AbstractNioChannel的構(gòu)造函數(shù)
2)綁定一個(gè) NioServerSocketChannelConfig
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
1)保存所創(chuàng)建的jdk channel
2)配置configureBlocking 為false
調(diào)用父類 AbstractChannel的構(gòu)造方法
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId(); //綁定一個(gè)id
unsafe = newUnsafe(); //綁定一個(gè)NioSocketChannelUnsafe
pipeline = newChannelPipeline(); //綁定一個(gè)DefaultChannelPipeline
}
這一步非常重要:
1)綁定一個(gè) id
2)綁定一個(gè) NioSocketChannelUnsafe
3)綁定一個(gè)DefaultChannelPipeline
這幾個(gè)組件,在Netty中起著非常重要的作用,后面博客里講到了會(huì)詳細(xì)分析下
總結(jié)下,ServerSocketChannel做了這樣幾件事
1)創(chuàng)建一個(gè)jdk channel,并記錄下來
2)配置jdk channel 的 configureBlocking 為false
3)綁定一個(gè)NioServerSocketChannelConfig
4)綁定id、NioSocketChannelUnsafe、DefaultChannelPipeline 三個(gè)組件
初始化ServerChannel
再回到ServerBootStrap里,在創(chuàng)建了ServerChannel(即我們這的NioServerSocketChannel)后,下面一步,是對(duì)其進(jìn)行初始化
@Override
void init(Channel channel) throws Exception {
//獲取我們?cè)赾lient代碼里配置的channel
final Map<ChannelOption<?>, Object> options = options0();
//最終調(diào)用 channel.config().setOption((ChannelOption<Object>) option, value) 即把配置設(shè)置到ChannelConfig里
synchronized (options) {
setChannelOptions(channel, options, logger);
}
//獲取client里配置的attrs
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
//設(shè)置childOptions和childAttrs,最終傳到 ServerBootstrapAcceptor里
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler(); //client里設(shè)置的handler被傳進(jìn)來了
if (handler != null) {
pipeline.addLast(handler);
}
// We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
// In this case the initChannel(...) method will only be called after this method returns. Because
// of this we need to ensure we add our handler in a delayed fashion so all the users handler are
// placed in front of the ServerBootstrapAcceptor.
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//currentChildGroup、currentChildHandler、currentChildOptions、currentChildAttrs
// 對(duì)應(yīng)childGroup 我們配置的childHandler 配置的childOptions 配置的childAttrs
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
流程還挺長(zhǎng)的,總結(jié)下:
1)把client端設(shè)置的option緩存到NioServerSocketChannelConfig里
2)把client端設(shè)置的attr設(shè)置到channel里

3)保存配置的childOptions,配置的childAttrs 到ServerBootstrapAcceptor里
4)往NioSocketChannel的pipeline中添加一個(gè)ServerBootstrapAcceptor
注冊(cè) ServerChannel
ServerChannel的注冊(cè)流程比較長(zhǎng),下面一篇博客再單獨(dú)介紹吧。
總結(jié)
這篇博客分析了下Netty的服務(wù)端啟動(dòng)流程,總結(jié)下:
1)Netty會(huì)在ServerBootstrap中創(chuàng)建一個(gè)ServerChannel
2)ServerChannel底層會(huì)綁定一個(gè)jdk 的channel
3)ServerChannel創(chuàng)建時(shí),會(huì)綁定一個(gè)id、NioSocketChannelUnsafe、DefaultChannelPipeline、NioServerSocketChannelConfig
4)初始化ServerChannel時(shí),會(huì)把我們自定義配置的option、attr設(shè)置進(jìn)去
5)最終會(huì)往 NioSocketChannel 的pipeline里添加一個(gè) ServerBootstrapAcceptor
6)注冊(cè)ServerChannel(下面一篇博客介紹)