Netty源碼分析——服務(wù)端channel的創(chuàng)建

工作時(shí)候,有用過Netty寫過網(wǎng)絡(luò)庫(kù)。最近想研究下RPC框架,就想著寫幾篇博客,梳理下Netty的源碼。(研究的源碼版本是4.1.x)

最簡(jiǎn)單的示例

首先還是拿Netty 官網(wǎng)的UserGuide做例子

/**
 * Handles a server-side channel.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
        // Discard the received data silently.
        ((ByteBuf) msg).release(); // (3)
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
/**
 * Discards any incoming data.
 */
public class DiscardServer {
    
    private int port;
    
    public DiscardServer(int port) {
        this.port = port;
    }
    
    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
    
            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(port).sync(); // (7)
    
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new DiscardServer(port).run();
    }
}

Netty的源碼還是很復(fù)雜的,所以研究時(shí)候有種無處下手的感覺。但是Netty作為一個(gè)NIO的網(wǎng)絡(luò)框架,其底層肯定是使用了java jdk 的nio編程。所以我們可以通過對(duì)比java nio編程 與 Netty源碼,來了解Netty究竟對(duì)java 的nio做了什么樣的封裝。

一個(gè)java nio編程的例子

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);

while(true){
    SocketChannel socketChannel =
            serverSocketChannel.accept();

    if(socketChannel != null){
        //do something with socketChannel...
    }
}

例子是網(wǎng)上隨便粘的,當(dāng)然簡(jiǎn)化了很多很多東西,這篇博客只會(huì)分析服務(wù)端channel的創(chuàng)建,所以這些暫時(shí)也夠了。

總結(jié)下:
1)創(chuàng)建ServerSocketChannel
2)綁定某個(gè)端口
3)設(shè)置非阻塞
4)監(jiān)聽新的連接

后面分析源碼也會(huì)試著找下Netty對(duì)應(yīng)的源碼在哪

創(chuàng)建一個(gè)NioServerSocketChannel

具體的調(diào)用流程如下圖

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

     //省略....
}

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
       }
      //省略...
}

ReflectiveChannelFactory

public T newChannel() {
    try {
        return clazz.newInstance();  //這里的clazz就是當(dāng)時(shí)從ServerBootstrap.channel方法里傳進(jìn)去的NioServerSocketChannel
    } catch (Throwable t) {
        throw new ChannelException("Unable to create Channel from class " + clazz, t);
    }
}

代碼流程還是很清晰的,總結(jié)下就是 Netty會(huì)在ServerBootstrap里,通過工廠創(chuàng)建一個(gè)ServerChannel

在本例中,工廠是默認(rèn)的 ReflectiveChannelFactory(通過反射創(chuàng)建Channel),創(chuàng)造出的Channel類型是NioServerSocketChannel

NioServerSocketChannel 的初始化

分析完了NioServerSocketChannel接下來看看NioServerSocketChannel究竟在初始化時(shí)候做了啥。

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
         *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
         *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
         *
         *  See <a >#2308</a>.
         */
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}   

1、創(chuàng)建一個(gè)對(duì)應(yīng)的jdk channel

通過 SelectorProvider.openServerSocketChannel() 創(chuàng)建了一個(gè)ServerSocketChannel 這個(gè)ServerSocketChannel就是 上文,nio編程的例子里的jdk channel

2、AbstractNioChannel構(gòu)造方法

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

1)NioServerSocketChannel最終會(huì)調(diào)用父類AbstractNioChannel的構(gòu)造函數(shù)
2)綁定一個(gè) NioServerSocketChannelConfig

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

1)保存所創(chuàng)建的jdk channel
2)配置configureBlocking 為false

調(diào)用父類 AbstractChannel的構(gòu)造方法

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId(); //綁定一個(gè)id
    unsafe = newUnsafe();  //綁定一個(gè)NioSocketChannelUnsafe
    pipeline = newChannelPipeline(); //綁定一個(gè)DefaultChannelPipeline
}

這一步非常重要:
1)綁定一個(gè) id
2)綁定一個(gè) NioSocketChannelUnsafe
3)綁定一個(gè)DefaultChannelPipeline
這幾個(gè)組件,在Netty中起著非常重要的作用,后面博客里講到了會(huì)詳細(xì)分析下

總結(jié)下,ServerSocketChannel做了這樣幾件事
1)創(chuàng)建一個(gè)jdk channel,并記錄下來
2)配置jdk channel 的 configureBlocking 為false
3)綁定一個(gè)NioServerSocketChannelConfig
4)綁定id、NioSocketChannelUnsafe、DefaultChannelPipeline 三個(gè)組件

初始化ServerChannel

再回到ServerBootStrap里,在創(chuàng)建了ServerChannel(即我們這的NioServerSocketChannel)后,下面一步,是對(duì)其進(jìn)行初始化

@Override
void init(Channel channel) throws Exception {
    //獲取我們?cè)赾lient代碼里配置的channel
    final Map<ChannelOption<?>, Object> options = options0();
    //最終調(diào)用 channel.config().setOption((ChannelOption<Object>) option, value) 即把配置設(shè)置到ChannelConfig里
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    //獲取client里配置的attrs
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;

    //設(shè)置childOptions和childAttrs,最終傳到 ServerBootstrapAcceptor里
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();  //client里設(shè)置的handler被傳進(jìn)來了
            if (handler != null) {
                pipeline.addLast(handler);
            }

            // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
            // In this case the initChannel(...) method will only be called after this method returns. Because
            // of this we need to ensure we add our handler in a delayed fashion so all the users handler are
            // placed in front of the ServerBootstrapAcceptor.
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    //currentChildGroup、currentChildHandler、currentChildOptions、currentChildAttrs
                    // 對(duì)應(yīng)childGroup      我們配置的childHandler        配置的childOptions    配置的childAttrs
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

流程還挺長(zhǎng)的,總結(jié)下:
1)把client端設(shè)置的option緩存到NioServerSocketChannelConfig里
2)把client端設(shè)置的attr設(shè)置到channel里



3)保存配置的childOptions,配置的childAttrs 到ServerBootstrapAcceptor里
4)往NioSocketChannel的pipeline中添加一個(gè)ServerBootstrapAcceptor

注冊(cè) ServerChannel

ServerChannel的注冊(cè)流程比較長(zhǎng),下面一篇博客再單獨(dú)介紹吧。

總結(jié)

這篇博客分析了下Netty的服務(wù)端啟動(dòng)流程,總結(jié)下:
1)Netty會(huì)在ServerBootstrap中創(chuàng)建一個(gè)ServerChannel
2)ServerChannel底層會(huì)綁定一個(gè)jdk 的channel
3)ServerChannel創(chuàng)建時(shí),會(huì)綁定一個(gè)id、NioSocketChannelUnsafe、DefaultChannelPipeline、NioServerSocketChannelConfig
4)初始化ServerChannel時(shí),會(huì)把我們自定義配置的option、attr設(shè)置進(jìn)去
5)最終會(huì)往 NioSocketChannel 的pipeline里添加一個(gè) ServerBootstrapAcceptor
6)注冊(cè)ServerChannel(下面一篇博客介紹)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容