netty系列之(一)——netty介紹

IO模型

阻塞式IO.png

非阻塞式IO.png

IO復(fù)用.png

信號驅(qū)動式.png

異步IO.png

圖片.png

上述5種IO模型,前4種模型-阻塞IO、非阻塞IO、IO復(fù)用、信號驅(qū)動IO都是同步I/O模型,因為其中真正的I/O操作(recvfrom)將阻塞進(jìn)程,在內(nèi)核數(shù)據(jù)copy到用戶空間時都是阻塞的。

一、NIO原理

Netty 是基于Java NIO 封裝的網(wǎng)絡(luò)通訊框架,只有充分理解了 Java NIO 才能理解好Netty的底層設(shè)計。Java NIO 由三個核心組件組件:
Buffer:固定數(shù)量的數(shù)據(jù)的容器。在 Java NIO 中,任何時候訪問 NIO 中的數(shù)據(jù),都需要通過緩沖區(qū)(Buffer)進(jìn)行操作。NIO 最常用的緩沖區(qū)則是 ByteBuffer。
Channel:是一個通道,它就像自來水管一樣,網(wǎng)絡(luò)數(shù)據(jù)通過 Channel 這根水管讀取和寫入。傳統(tǒng)的 IO 是基于流進(jìn)行操作的,Channle 和流類似,但又有些不同:

圖片.png

#傳統(tǒng)IO:FileInputStream
public static void method2(){
       InputStream in = null;
       try{
           in = new BufferedInputStream(new FileInputStream("src/nomal_io.txt"));
 
           byte [] buf = new byte[1024];
           int bytesRead = in.read(buf);
           while(bytesRead != -1)
           {
               for(int i=0;i<bytesRead;i++)
                   System.out.print((char)buf[i]);
               bytesRead = in.read(buf);
           }
       }catch (IOException e)
       {
           e.printStackTrace();
       }finally{
           try{
               if(in != null){
                   in.close();
               }
           }catch (IOException e){
               e.printStackTrace();
           }
       }
   }
#NIO
public static void method1(){
        RandomAccessFile aFile = null;
        try{
            aFile = new RandomAccessFile("src/nio.txt","rw");
            FileChannel fileChannel = aFile.getChannel();
            ByteBuffer buf = ByteBuffer.allocate(1024);
 
            int bytesRead = fileChannel.read(buf);
            System.out.println(bytesRead);
 
            while(bytesRead != -1)
            {
                buf.flip();
                while(buf.hasRemaining())
                {
                    System.out.print((char)buf.get());
                }
 
                buf.compact();
                bytesRead = fileChannel.read(buf);
            }
        }catch (IOException e){
            e.printStackTrace();
        }finally{
            try{
                if(aFile != null){
                    aFile.close();
                }
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }
//使用Buffer一般遵循下面幾個步驟:
//分配空間(ByteBuffer buf = ByteBuffer.allocate(1024); 還有一種allocateDirector后面再陳述)
//寫入數(shù)據(jù)到Buffer(int bytesRead = fileChannel.read(buf);)
//調(diào)用filp()方法( buf.flip();)
//從Buffer中讀取數(shù)據(jù)(System.out.print((char)buf.get());)
//調(diào)用clear()方法或者compact()方法

Channel 必須要配合 Buffer 一起使用,通過從 Channel 讀取數(shù)據(jù)到 Buffer 中或者從 Buffer 寫入數(shù)據(jù)到 Channel 中,如下:

圖片.png

Selector
多路復(fù)用器 Selector,它是 Java NIO 編程的基礎(chǔ),Selector 提供了詢問Channel是否已經(jīng)準(zhǔn)備好執(zhí)行每個 I/O 操作的能力。簡單來講,Selector 會不斷地輪詢注冊在其上的 Channel,如果某個 Channel 上面發(fā)生了讀或者寫事件,這個 Channel 就處于就緒狀態(tài),會被 Selector 輪詢出來,然后通過 SelectionKey 可以獲取就緒 Channel 的集合,進(jìn)行后續(xù)的 I/O 操作。
image.png

  • Acceptor為服務(wù)端Channel注冊Selector,監(jiān)聽accept事件
  • 當(dāng)客戶端連接后,觸發(fā)accept事件
  • 服務(wù)器構(gòu)建對應(yīng)的客戶端Channel,并在其上注冊Selector,監(jiān)聽讀寫事件
  • 當(dāng)發(fā)生讀寫事件后,進(jìn)行相應(yīng)的讀寫處理

TCP服務(wù)端實例-NIO實現(xiàn)

NIO客戶端代碼(連接)
//獲取socket通道
SocketChannel channel = SocketChannel.open();        
channel.configureBlocking(false);
//獲得通道管理器
selector=Selector.open();        
channel.connect(new InetSocketAddress(serverIp, port));
//為該通道注冊SelectionKey.OP_CONNECT事件
channel.register(selector, SelectionKey.OP_CONNECT);

NIO客戶端代碼(監(jiān)聽)
while(true){
    //選擇注冊過的io操作的事件(第一次為SelectionKey.OP_CONNECT)
   selector.select();
   while(SelectionKey key : selector.selectedKeys()){
       if(key.isConnectable()){
           SocketChannel channel=(SocketChannel)key.channel();
           if(channel.isConnectionPending()){
               channel.finishConnect();//如果正在連接,則完成連接
           }
           channel.register(selector, SelectionKey.OP_READ);
       }else if(key.isReadable()){ //有可讀數(shù)據(jù)事件。
           SocketChannel channel = (SocketChannel)key.channel();
           ByteBuffer buffer = ByteBuffer.allocate(10);
           channel.read(buffer);
           byte[] data = buffer.array();
           String message = new String(data);
           System.out.println("recevie message from server:, size:"
               + buffer.position() + " msg: " + message);
       }
   }
}

NIO服務(wù)端代碼(連接)
//獲取一個ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
//獲取通道管理器
selector = Selector.open();
//將通道管理器與通道綁定,并為該通道注冊SelectionKey.OP_ACCEPT事件,
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

NIO服務(wù)端代碼(監(jiān)聽)
while(true){
    //當(dāng)有注冊的事件到達(dá)時,方法返回,否則阻塞。
   selector.select();
   for(SelectionKey key : selector.selectedKeys()){
       if(key.isAcceptable()){
           ServerSocketChannel server =
                (ServerSocketChannel)key.channel();
           SocketChannel channel = server.accept();
           channel.write(ByteBuffer.wrap(
            new String("send message to client").getBytes()));
           //在與客戶端連接成功后,為客戶端通道注冊SelectionKey.OP_READ事件。
           channel.register(selector, SelectionKey.OP_READ);
       }else if(key.isReadable()){//有可讀數(shù)據(jù)事件
           SocketChannel channel = (SocketChannel)key.channel();
           ByteBuffer buffer = ByteBuffer.allocate(10);
           int read = channel.read(buffer);
           byte[] data = buffer.array();
           String message = new String(data);
           System.out.println("receive message from client, size:"
               + buffer.position() + " msg: " + message);
       }
   }
}

二、netty

1、netty特點

  • 一個高性能、異步事件驅(qū)動的NIO框架,它提供了對TCP、UDP和文件傳輸?shù)闹С?/li>
  • 使用更高效的socket底層,對epoll空輪詢引起的cpu占用飆升在內(nèi)部進(jìn)行了處理,避免了直接使用NIO的陷阱,簡化了NIO的處理方式。
  • 采用多種decoder/encoder 支持,對TCP粘包/分包進(jìn)行自動化處理
  • 可使用接受/處理線程池,提高連接效率,對重連、心跳檢測的簡單支持
  • 可配置IO線程數(shù)、TCP參數(shù), TCP接收和發(fā)送緩沖區(qū)使用直接內(nèi)存代替堆內(nèi)存,通過內(nèi)存池的方式循環(huán)利用ByteBuf
  • 通過引用計數(shù)器及時申請釋放不再引用的對象,降低了GC頻率
  • 使用單線程串行化的方式,高效的Reactor線程模型
  • 大量使用了volitale、使用了CAS和原子類、線程安全類的使用、讀寫鎖的使用

2、netty線程模型

netty基于Reactor模型,是對NIO模型的一種改進(jìn)。

  • 單線程Reactor模型


    image.png

    這個模型和上面的NIO流程很類似,只是將消息相關(guān)處理獨立到了Handler中去了!雖然上面說到NIO一個線程就可以支持所有的IO處理。但是瓶頸也是顯而易見的!我們看一個客戶端的情況,如果這個客戶端多次進(jìn)行請求,如果在Handler中的處理速度較慢,那么后續(xù)的客戶端請求都會被積壓,導(dǎo)致響應(yīng)變慢!所以引入了Reactor多線程模型!

  • 多線程Reactor模型


    image.png

    Reactor多線程模型就是將Handler中的IO操作和非IO操作分開,操作IO的線程稱為IO線程,非IO操作的線程稱為工作線程!這樣的話,客戶端的請求會直接被丟到線程池中,客戶端發(fā)送請求就不會堵塞!

3、netty核心組件

netty服務(wù)端 代碼示例

EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
//EventLoopGroup繼承線程池ScheduledExecutorService
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);//利用反射構(gòu)造NioServerSocketChannel實例
bootstrap.option(ChannelOption.SO_BACKLOG, 2048);//backlog指定了內(nèi)核為此套接口排隊的最大連接個數(shù)
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.handler(new LoggingServerHandler());//handler與childHandler不同
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new MyChannelHandler1());
        ch.pipeline().addLast(new MyChannelHandler2());
        ch.pipeline().addLast(new MyChannelHandler3());
    }
});
ChannelFuture f = bootstrap.bind(port).sync();//bind方法實現(xiàn)
f.addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                //啟動成功
            }
});
f.channel().closeFuture().sync();

class MyChannelHandler1 extends ChannelHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        
    }
}

Channel
Channel 是 Netty 網(wǎng)絡(luò)操作抽象類,它除了包括基本的 I/O 操作,如 bind、connect、read、write 之外,還包括了 Netty 框架相關(guān)的一些功能。
EventLoop
Netty 基于事件驅(qū)動模型,使用不同的事件來通知我們狀態(tài)的改變或者操作狀態(tài)的改變。它定義了在整個連接的生命周期里當(dāng)有事件發(fā)生的時候處理的核心抽象。
Channel 為Netty 網(wǎng)絡(luò)操作抽象類,EventLoop 主要是為Channel 處理 I/O 操作,兩者配合參與 I/O 操作。

圖片.png

上圖為Channel、EventLoop、Thread、EventLoopGroup之間的關(guān)系。一個 EventLoop 在它的生命周期內(nèi)只能與一個Thread綁定,一個 EventLoop 可被分配至一個或多個 Channel ,輪流處理。

ChannelFuture
Netty 為異步非阻塞,即所有的 I/O 操作都為異步的,因此,我們不能立刻得知消息是否已經(jīng)被處理了。Netty 提供了 ChannelFuture 接口,通過該接口的 addListener() 方法注冊一個 ChannelFutureListener,當(dāng)操作執(zhí)行成功或者失敗時,監(jiān)聽就會自動觸發(fā)返回結(jié)果。

ChannelFuture f = bootstrap.bind(port).sync();
f.addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                //啟動成功
            }
});
f.channel().closeFuture().sync();

ChannelHandler
ChannelHandler 為 Netty 中最核心的組件,它充當(dāng)了所有處理入站和出站數(shù)據(jù)的應(yīng)用程序邏輯的容器。ChannelHandler 主要用來處理各種事件,這里的事件很廣泛,比如可以是連接、數(shù)據(jù)接收、異常、數(shù)據(jù)轉(zhuǎn)換等。
ChannelHandler 有兩個核心子類 ChannelInboundHandler 和 ChannelOutboundHandler,其中 ChannelInboundHandler 用于接收、處理入站數(shù)據(jù)和事件,而 ChannelOutboundHandler 則相反。

ChannelPipeline
ChannelPipeline 為 ChannelHandler 鏈,提供了一個容器并定義了用于沿著鏈傳播入站和出站事件流的 API。一個數(shù)據(jù)或者事件可能會被多個 Handler 處理,在這個過程中,數(shù)據(jù)或者事件經(jīng)流 ChannelPipeline,由 ChannelHandler 處理。在這個處理過程中,一個 ChannelHandler 接收數(shù)據(jù)后處理完成后交給下一個 ChannelHandler,或者什么都不做直接交給下一個 ChannelHandler。

圖片.png

當(dāng)一個數(shù)據(jù)流進(jìn)入 ChannlePipeline 時,它會從 ChannelPipeline 頭部開始傳給第一個 ChannelInboundHandler ,當(dāng)?shù)谝粋€處理完后再傳給下一個,一直傳遞到管道的尾部。與之相對應(yīng)的是,當(dāng)數(shù)據(jù)被寫出時,它會從管道的尾部開始,先經(jīng)過管道尾部的 “最后” 一個ChannelOutboundHandler,當(dāng)它處理完成后會傳遞給前一個 ChannelOutboundHandler 。

附錄


TCP.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容