netty系列之(二)——netty服務(wù)端啟動(dòng)分析

一、netty服務(wù)啟動(dòng)分析

EventLoopGroup boss = new NioEventLoopGroup();//類圖,繼承線程池ScheduledExecutorService
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);//利用反射構(gòu)造NioServerSocketChannel實(shí)例
//backlog指定了內(nèi)核為此套接口排隊(duì)的最大連接個(gè)數(shù),對(duì)于給定的監(jiān)聽套接口,內(nèi)核要維護(hù)兩個(gè)隊(duì)列:未鏈接隊(duì)列和已連接隊(duì)列,根據(jù)TCP三路握手過程中三個(gè)分節(jié)來分隔這兩個(gè)隊(duì)列
bootstrap.option(ChannelOption.SO_BACKLOG, 2048);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.handler(new LoggingServerHandler());//handler與childHandler不同
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new MyChannelHandler1());
        ch.pipeline().addLast(new MyChannelHandler2());
        ch.pipeline().addLast(new MyChannelHandler3());
    }
});
ChannelFuture f = bootstrap.bind(port).sync();//bind方法實(shí)現(xiàn)
f.addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                //啟動(dòng)成功
            }
});
f.channel().closeFuture().sync();

圖片.png
創(chuàng)建 ServerBootstrap 實(shí)例
設(shè)置并綁定 Reactor 線程池
設(shè)置并綁定服務(wù)端 Channel
創(chuàng)建并初始化 ChannelPipeline
添加并設(shè)置 ChannelHandler
綁定并啟動(dòng)監(jiān)聽端口

二、netty服務(wù)啟動(dòng)代碼分析

1、創(chuàng)建兩個(gè)EventLoopGroup

  EventLoopGroup bossGroup = new NioEventLoopGroup();
  EventLoopGroup workerGroup = new NioEventLoopGroup();

bossGroup 為 BOSS 線程組,用于服務(wù)端接受客戶端的連接, workerGroup 為 worker 線程組,用于進(jìn)行 SocketChannel 的網(wǎng)絡(luò)讀寫。當(dāng)然也可以創(chuàng)建一個(gè)線程組,共享使用。

2、創(chuàng)建ServerBootstrap實(shí)例

ServerBootStrap為Netty服務(wù)端的啟動(dòng)引導(dǎo)類,用于幫助用戶快速配置、啟動(dòng)服務(wù)端服務(wù)。提供的方法如下:


圖片.png

ServerBootStrap底層采用裝飾者模式。

3、設(shè)置并綁定Reactor線程池

b.group(bossGroup, workerGroup)

EventLoopGroup 為 Netty 線程池,它實(shí)際上就是 EventLoop 的數(shù)組容器。EventLoop 的職責(zé)是處理所有注冊(cè)到本線程多路復(fù)用器 Selector 上的 Channel,Selector 的輪詢操作由綁定的 EventLoop 線程 run 方法驅(qū)動(dòng),在一個(gè)循環(huán)體內(nèi)循環(huán)執(zhí)行。通俗點(diǎn)講就是一個(gè)死循環(huán),不斷的檢測(cè) I/O 事件、處理 I/O 事件。
這里設(shè)置了兩個(gè)group,這個(gè)其實(shí)有點(diǎn)兒像我們工作一樣。需要兩類型的工人,一個(gè)老板(bossGroup),一個(gè)工人(workerGroup),老板負(fù)責(zé)從外面接活,工人則負(fù)責(zé)死命干活。所以這里 bossGroup 的作用就是不斷地接收新的連接,接收之后就丟給 workerGroup 來處理,workerGroup 負(fù)責(zé)干活就行(負(fù)責(zé)客戶端連接的 IO 操作)。

4、設(shè)置并綁定服務(wù)端Channel

.channel(NioServerSocketChannel.class)

調(diào)用 ServerBootstrap.channel 方法用于設(shè)置服務(wù)端使用的 Channel,傳遞一個(gè) NioServerSocketChannel Class對(duì)象,Netty通過工廠類,利用反射創(chuàng)建NioServerSocketChannel 對(duì)象,如下:

    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

//最終調(diào)用構(gòu)造函數(shù)
public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

5、添加并設(shè)置ChannelHandler

 .handler(new LoggingServerHandler())
.childHandler(new ChannelInitializer(){
    //省略代碼
})
圖片.png

ServerBootstrap 中的 Handler(childHandler()) 是 NioServerSocketChannel 使用的,所有連接該監(jiān)聽端口的客戶端都會(huì)執(zhí)行它,父類 AbstractBootstrap 中的 Handler 是一個(gè)工廠類,它為每一個(gè)新接入的客戶端都創(chuàng)建一個(gè)新的 Handler。
handler在server初始化它時(shí)就會(huì)執(zhí)行,而childHandler會(huì)在客戶端成功connect后才執(zhí)行,這是兩者的區(qū)別。

6、綁定端口,啟動(dòng)服務(wù)

ChannelFuture future = b.bind(port).sync();

主要步驟:
負(fù)責(zé)創(chuàng)建服務(wù)端的NioServerSocketChannel實(shí)例;
為NioServerSocketChannel的pipeline添加handler;
注冊(cè)NioServerSocketChannel到selector;

二、源碼詳解

AbstractBootstrap類doBind方法,綁定端口入口

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister(); //初始化與注冊(cè)
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
}

initAndRegister方法,創(chuàng)建和初始化channel

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel(); //創(chuàng)建服務(wù)端Channel
            init(channel);//初始化Channel
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel); //注冊(cè)selector
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
 }

#channelFactory.newChannel()通過反射創(chuàng)建實(shí)例
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
//clazz由AbstractBootstrap.channel方法傳入,bootstrap.channel(NioServerSocketChannel.class);
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(clazz) + ".class";
    }
}

#AbstractBootstrap
public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

#NioServerSocketChannel構(gòu)造函數(shù)
public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);//調(diào)用AbstractNioChannel構(gòu)造函數(shù)
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

#AbstractNioChannel構(gòu)造函數(shù)
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);//非阻塞方式
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

#init方法,初始化channel參數(shù),添加handler
   void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            channel.config().setOptions(options);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {//服務(wù)端Channel屬性
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();//傳入hander
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                // We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
                // In this case the initChannel(...) method will only be called after this method returns. Because
                // of this we need to ensure we add our handler in a delayed fashion so all the users handler are
                // placed in front of the ServerBootstrapAcceptor.
        //默認(rèn)添加的ServerBootstrapAcceptor的hander,連接接入器處理新鏈接接入時(shí),初始化Options和Attrs
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

register方法,注冊(cè)selector

#unsafe類register方法
//注冊(cè)到Reactor線程的多路復(fù)用器上監(jiān)聽新客戶端的接入
public final void register(final ChannelPromise promise) {
            if (eventLoop.inEventLoop()) {//是否在當(dāng)前eventLoop中
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {//不在當(dāng)前eventLoop中,異步執(zhí)行
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    promise.setFailure(t);
                }
            }
}

#unsafe類register0方法
private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!ensureOpen(promise)) {
                    return;
                }
                doRegister();
                registered = true;
                promise.setSuccess();
                pipeline.fireChannelRegistered();
                if (isActive()) {
                    pipeline.fireChannelActive();
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                if (!promise.tryFailure(t)) {
                    logger.warn(
                            "Tried to fail the registration promise, but it is complete already. " +
                                    "Swallowing the cause of the registration failure:", t);
                }
            }
        }

#AbstractNioChannel類doRegister方法
protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
               //獲取selectionKey ,通過SelectionKey的interestOps(int ops)方法可以修改監(jiān)聽操作位,注冊(cè)O(shè)P_ACCEPT(16)到多路復(fù)用器上
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

附錄:
ChannelOption參數(shù)說明

1、ChannelOption.SO_BACKLOG
    ChannelOption.SO_BACKLOG對(duì)應(yīng)的是tcp/ip協(xié)議listen函數(shù)中的backlog參數(shù),函數(shù)listen(int socketfd,int backlog)用來初始化服務(wù)端可連接隊(duì)列,
    服務(wù)端處理客戶端連接請(qǐng)求是順序處理的,所以同一時(shí)間只能處理一個(gè)客戶端連接,多個(gè)客戶端來的時(shí)候,服務(wù)端將不能處理的客戶端連接請(qǐng)求放在隊(duì)列中等待處理,backlog參數(shù)指定了隊(duì)列的大小
2、ChannelOption.SO_REUSEADDR
    ChanneOption.SO_REUSEADDR對(duì)應(yīng)于套接字選項(xiàng)中的SO_REUSEADDR,這個(gè)參數(shù)表示允許重復(fù)使用本地地址和端口,
    比如,某個(gè)服務(wù)器進(jìn)程占用了TCP的80端口進(jìn)行監(jiān)聽,此時(shí)再次監(jiān)聽該端口就會(huì)返回錯(cuò)誤,使用該參數(shù)就可以解決問題,該參數(shù)允許共用該端口,這個(gè)在服務(wù)器程序中比較常使用,
    比如某個(gè)進(jìn)程非正常退出,該程序占用的端口可能要被占用一段時(shí)間才能允許其他進(jìn)程使用,而且程序死掉以后,內(nèi)核一需要一定的時(shí)間才能夠釋放此端口,不設(shè)置SO_REUSEADDR
    就無法正常使用該端口。
3、ChannelOption.SO_KEEPALIVE
    Channeloption.SO_KEEPALIVE參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的SO_KEEPALIVE,該參數(shù)用于設(shè)置TCP連接,當(dāng)設(shè)置該選項(xiàng)以后,連接會(huì)測(cè)試鏈接的狀態(tài),這個(gè)選項(xiàng)用于可能長(zhǎng)時(shí)間沒有數(shù)據(jù)交流的
    連接。當(dāng)設(shè)置該選項(xiàng)以后,如果在兩小時(shí)內(nèi)沒有數(shù)據(jù)的通信時(shí),TCP會(huì)自動(dòng)發(fā)送一個(gè)活動(dòng)探測(cè)數(shù)據(jù)報(bào)文。
4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF
    ChannelOption.SO_SNDBUF參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的SO_SNDBUF,ChannelOption.SO_RCVBUF參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的SO_RCVBUF這兩個(gè)參數(shù)用于操作接收緩沖區(qū)和發(fā)送緩沖區(qū)
    的大小,接收緩沖區(qū)用于保存網(wǎng)絡(luò)協(xié)議站內(nèi)收到的數(shù)據(jù),直到應(yīng)用程序讀取成功,發(fā)送緩沖區(qū)用于保存發(fā)送數(shù)據(jù),直到發(fā)送成功。
5、ChannelOption.SO_LINGER
    ChannelOption.SO_LINGER參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的SO_LINGER,Linux內(nèi)核默認(rèn)的處理方式是當(dāng)用戶調(diào)用close()方法的時(shí)候,函數(shù)返回,在可能的情況下,盡量發(fā)送數(shù)據(jù),不一定保證
    會(huì)發(fā)生剩余的數(shù)據(jù),造成了數(shù)據(jù)的不確定性,使用SO_LINGER可以阻塞close()的調(diào)用時(shí)間,直到數(shù)據(jù)完全發(fā)送
6、ChannelOption.TCP_NODELAY
    ChannelOption.TCP_NODELAY參數(shù)對(duì)應(yīng)于套接字選項(xiàng)中的TCP_NODELAY,該參數(shù)的使用與Nagle算法有關(guān)
    Nagle算法是將小的數(shù)據(jù)包組裝為更大的幀然后進(jìn)行發(fā)送,而不是輸入一次發(fā)送一次,因此在數(shù)據(jù)包不足的時(shí)候會(huì)等待其他數(shù)據(jù)的到了,組裝成大的數(shù)據(jù)包進(jìn)行發(fā)送,雖然該方式有效提高網(wǎng)絡(luò)的有效
    負(fù)載,但是卻造成了延時(shí),而該參數(shù)的作用就是禁止使用Nagle算法,使用于小數(shù)據(jù)即時(shí)傳輸,于TCP_NODELAY相對(duì)應(yīng)的是TCP_CORK,該選項(xiàng)是需要等到發(fā)送的數(shù)據(jù)量最大的時(shí)候,一次性發(fā)送
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容