3_netty_Bootstrap

接著上篇例子看

    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
              .channel(NioServerSocketChannel.class)
              .option(ChannelOption.SO_BACKLOG, 100)
              .handler(new LoggingHandler(LogLevel.INFO))
              .childHandler(new ChannelInitializer<SocketChannel>() {
                      @Override
                      public void initChannel(SocketChannel ch) throws Exception {
                          ChannelPipeline p = ch.pipeline();
                          if (sslCtx != null) {
                              p.addLast(sslCtx.newHandler(ch.alloc()));
                          }
                          //p.addLast(new LoggingHandler(LogLevel.INFO));
                          p.addLast(serverHandler);
                      }
                  });

          ChannelFuture f = b.bind(PORT).sync();
          f.channel().closeFuture().sync();

先看構(gòu)造方法。

    public ServerBootstrap() { }

    private ServerBootstrap(ServerBootstrap bootstrap) {
        super(bootstrap);
        childGroup = bootstrap.childGroup;
        childHandler = bootstrap.childHandler;
        synchronized (bootstrap.childOptions) {
            childOptions.putAll(bootstrap.childOptions);
        }
        synchronized (bootstrap.childAttrs) {
            childAttrs.putAll(bootstrap.childAttrs);
        }
    }

提供了兩個(gè)構(gòu)造方法,如例子中,我們用的是空構(gòu)造。接著調(diào)用group方法

 //這里將boss和worker一起傳入
    public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }

調(diào)用父類(lèi)的構(gòu)造方法,傳入bossGroup

    //EventLoopGroup為將要?jiǎng)?chuàng)建的Channel處理所有的events
    public B group(EventLoopGroup group) {
        if (group == null) {
            throw new NullPointerException("group");
        }
        if (this.group != null) {
            throw new IllegalStateException("group set already");
        }
        this.group = group;
        return (B) this;
    }

接著設(shè)置channel

    //這里創(chuàng)建了對(duì)應(yīng)的NioServerSocketChannel
    //到后面對(duì)應(yīng)的Channel再分析
    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

接著是option方法,這里傳入的參數(shù)是ChannelOption.SO_BACKLOG,值為100。這是個(gè)socket的標(biāo)準(zhǔn)參數(shù),表示當(dāng)服務(wù)器請(qǐng)求處理線程全滿(mǎn)時(shí),用于臨時(shí)存放已完成三次握手的請(qǐng)求的隊(duì)列的最大長(zhǎng)度。這里還有很多參數(shù)可以設(shè)置,就不一一列舉了。

    public <T> B option(ChannelOption<T> option, T value) {
        if (option == null) {
            throw new NullPointerException("option");
        }
        if (value == null) {
            synchronized (options) {
                options.remove(option);
            }
        } else {
            synchronized (options) {
                options.put(option, value);
            }
        }
        return (B) this;
    }

下面顯示設(shè)置handler,這里添加了一個(gè)日志處理handler,用于記錄所有的event。再接著調(diào)用childHandler方法,這里使用new ChannelInitializer<SocketChannel>(),并實(shí)現(xiàn)initChannel方法,這里就是初始化channel,里面指定了業(yè)務(wù)處理器。
再下來(lái)就是重點(diǎn),服務(wù)器啟動(dòng)

// Start the server.
ChannelFuture f = b.bind(PORT).sync();

AbstractBootstrap,bind方法用于創(chuàng)建新的Channel并且綁定

    public ChannelFuture bind(int inetPort) {
        return bind(new InetSocketAddress(inetPort));
    }
    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }
public ChannelFuture bind(SocketAddress localAddress) {
        //驗(yàn)證參數(shù)
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

private ChannelFuture doBind(final SocketAddress localAddress) {
        //初始化和注冊(cè)Channel
        final ChannelFuture regFuture = initAndRegister();
        //獲取當(dāng)前Channel
        final Channel channel = regFuture.channel();
        //判斷注冊(cè)是否失敗了
        if (regFuture.cause() != null) {
            return regFuture;
        }
        //等待注冊(cè)工作完成
        if (regFuture.isDone()) {
            // 經(jīng)過(guò)前面的兩個(gè)判斷可知注冊(cè)一定成功了
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

看看初始化和注冊(cè)

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            //ReflectiveChannelFactory的newChannel方法,直接使用
            //clazz.newInstance()創(chuàng)建了NioServerSocketChannel
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

    void init(Channel channel) throws Exception {
        //之前傳入的SO_BACKLG就在options里面
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }
        //設(shè)置屬性
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        //pipeline在channel實(shí)例化的時(shí)候,構(gòu)造函數(shù)里面默認(rèn)生成
        //DefaultChannelPipeline
        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
        //為NioServerSocketChannel的pipeline添加handler
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
          //新增一個(gè)任務(wù),為pipeline增加ServerBootstrapAcceptor
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

處理完init后,會(huì)調(diào)用register方法

//NioEventLoopGroup的register方法,注冊(cè)channel
config().group().register(channel);

initAndRegister方法內(nèi):

  1. 通過(guò)channelFactory創(chuàng)建了channel(NioServerSocketChannel)
  2. 初始化channel,為channel增加option和attr,并且在其pipeline上添加了handler。
  3. 初始化完后進(jìn)行注冊(cè)。

再回到doBind方法,初始化和注冊(cè)channel后返回的regFuture。這個(gè)regFuture表示注冊(cè)結(jié)果,regFuture.isDone()會(huì)等待注冊(cè)是否完成,注冊(cè)成功完成后,會(huì)調(diào)用doBind0方法。

   private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // 這個(gè)方法在channelRegistered()被觸發(fā)之前調(diào)用
        // 有了一次改變channelRegistered()實(shí)現(xiàn)中的pipeline的機(jī)會(huì)
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

那么總結(jié)下服務(wù)端綁定流程

  1. 初始化和注冊(cè)channel,包括channel的創(chuàng)建
  2. 初始化后返回ChannelFuture,表示注冊(cè)的結(jié)果
  3. 注冊(cè)成功后,調(diào)用bind0,通過(guò)eventloop執(zhí)行線程,線程使用channel.bind(localAddress, promise),最后返回一個(gè)ChannelPromise表示綁定的結(jié)果。

下面看下客戶(hù)端的啟動(dòng)。

Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host, port))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(
                                    new EchoClientHandler()
                            );
                        }
                    });
            ChannelFuture f = b.connect().sync();
            f.channel().closeFuture().sync();

Bootstrap的connect方法

    public ChannelFuture connect() {
        validate();
        SocketAddress remoteAddress = this.remoteAddress;
        if (remoteAddress == null) {
            throw new IllegalStateException("remoteAddress not set");
        }

        return doResolveAndConnect(remoteAddress, config.localAddress());
    }

 private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        //返回的是NioSocketChannel
        final Channel channel = regFuture.channel();

        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    // Direclty obtain the cause and do a null check so we only need one volatile read in case of a
                    // failure.
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

    private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
                                               final SocketAddress localAddress, final ChannelPromise promise) {
        try {
            final EventLoop eventLoop = channel.eventLoop();
            final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

            if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
                // Resolver has no idea about what to do with the specified remote address or it's resolved already.
                doConnect(remoteAddress, localAddress, promise);
                return promise;
            }

            final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

            if (resolveFuture.isDone()) {
                final Throwable resolveFailureCause = resolveFuture.cause();

                if (resolveFailureCause != null) {
                    // Failed to resolve immediately
                    channel.close();
                    promise.setFailure(resolveFailureCause);
                } else {
                    // Succeeded to resolve immediately; cached? (or did a blocking lookup)
                    doConnect(resolveFuture.getNow(), localAddress, promise);
                }
                return promise;
            }

            // Wait until the name resolution is finished.
            resolveFuture.addListener(new FutureListener<SocketAddress>() {
                @Override
                public void operationComplete(Future<SocketAddress> future) throws Exception {
                    if (future.cause() != null) {
                        channel.close();
                        promise.setFailure(future.cause());
                    } else {
                        doConnect(future.getNow(), localAddress, promise);
                    }
                }
            });
        } catch (Throwable cause) {
            promise.tryFailure(cause);
        }
        return promise;
    }

    private static void doConnect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        final Channel channel = connectPromise.channel();
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (localAddress == null) {
                    channel.connect(remoteAddress, connectPromise);
                } else {
                    channel.connect(remoteAddress, localAddress, connectPromise);
                }
                connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        });
    }
  1. 初始化和注冊(cè)Channel
  2. 解析地址
  3. 通過(guò)channel的eventLoop來(lái)執(zhí)行channel.connect(remoteAddress, connectPromise)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容