netty 入門(mén)雜談

java io 模型一共分為4種

  • 傳統(tǒng)IO
  • 偽異步IO
  • NIO
  • AIO

介紹
  • netty是一個(gè)高性能,異步事件驅(qū)動(dòng)的NIO框架,它提供了對(duì)TCP、UDP和file傳輸?shù)闹С?netty所有的IO操作都為異步非阻塞,通過(guò)Future-Listener機(jī)制,用戶可以方便的主動(dòng)獲取或者通過(guò)通知機(jī)制獲得IO操作結(jié)果。
能做什么
  • 開(kāi)發(fā)異步、非阻塞的TCP、UDP網(wǎng)絡(luò)應(yīng)用程序
  • 開(kāi)發(fā)異步文件傳輸應(yīng)用程序
  • 開(kāi)發(fā)異步HTTP服務(wù)端和客戶端應(yīng)用程序
  • 提供對(duì)多種編解碼框架的集成,包括谷歌的Protobuf、Jboss marshalling、Java序列化、壓縮編解碼、XML解碼、字符串編解碼等,這些編解碼框架可以被用戶直接使用。
  • 提供形式多樣的編解碼基礎(chǔ)類(lèi)庫(kù),可以非常方便的實(shí)現(xiàn)私有協(xié)議棧編解碼框架的二次定制和開(kāi)發(fā)。
  • 基于職責(zé)鏈模式的Pipeline-Handler機(jī)制,用戶可以非常方便的對(duì)網(wǎng)絡(luò)事件進(jìn)行攔截和定制。
  • 所有的IO操作都是異步的,用戶可以通過(guò)Future-Listener機(jī)制主動(dòng)Get結(jié)果或者由IO線程操作完成之后主動(dòng)Notify結(jié)果,用戶的業(yè)務(wù)線程不需要同步等待。
  • IP黑白名單控制 。
  • 打印消息碼流。
  • 流量控制和整形。
  • 性能統(tǒng)計(jì)。
  • 基于鏈路空閑事件檢測(cè)的心跳檢測(cè)。
EventLoop,EventLoopGroup
  • EventLoop目的是為Channel處理IO操作,一個(gè)EventLoop可以為多個(gè)Channel服務(wù),EventLoopGroup會(huì)包含多個(gè)EventLoop。
BootStrap,ServerBootstrap
  • 一個(gè)Netty應(yīng)用通常由一個(gè)Bootstrap開(kāi)始,它主要作用是配置整個(gè)Netty程序,串聯(lián)起各個(gè)組件。
ChannelInitializer
  • 當(dāng)一個(gè)鏈接建立時(shí),我們需要知道怎么來(lái)接收或者發(fā)送數(shù)據(jù),當(dāng)然,我們有各種各樣的Handler實(shí)現(xiàn)來(lái)處理它,那么ChannelInitializer便是用來(lái)配置這些Handler,它會(huì)提供一個(gè)ChannelPipeline,并把Handler加入到ChannelPipeline。
Handler
  • 為了支持各種協(xié)議和處理數(shù)據(jù)的方式,便誕生了Handler組件。Handler主要用來(lái)處理各種事件,這里的事件很廣泛,比如可以是連接、數(shù)據(jù)接收、異常、數(shù)據(jù)轉(zhuǎn)換等。
ChannelInboundHandler
  • 一個(gè)最常用的Handler。這個(gè)Handler的作用就是處理接收到數(shù)據(jù)時(shí)的事件,也就是說(shuō),我們的業(yè)務(wù)邏輯一般就是寫(xiě)在這個(gè)Handler里面的,ChannelInboundHandler就是用來(lái)處理我們的核心業(yè)務(wù)邏輯
Future
  • 在Netty中所有的IO操作都是異步的,因此,你不能立刻得知消息是否被正確處理,但是我們可以過(guò)一會(huì)等它執(zhí)行完成或者直接注冊(cè)一個(gè)監(jiān)聽(tīng),具體的實(shí)現(xiàn)就是通過(guò)Future和ChannelFutures,他們可以注冊(cè)一個(gè)監(jiān)聽(tīng),當(dāng)操作執(zhí)行成功或失敗時(shí)監(jiān)聽(tīng)會(huì)自動(dòng)觸發(fā)??傊?,所有的操作都會(huì)返回一個(gè)ChannelFuture。

DEMO

import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;

/**
 * @author hejian
 */
public class MyService {
    
    private final AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();  
    
    public static void main(String[] args) throws Exception {
        new MyService().start();
    }

    private static int _BOOSEVENTNUM = 1;
    
    private static int _PORT = 7878;

    public void start() throws Exception {
        EventLoopGroup boss = new NioEventLoopGroup(_BOOSEVENTNUM);
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            // 構(gòu)建
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 128)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS)); 
                            ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new Myhandler());
                            //心跳機(jī)制
                        }
                    });
            
            ChannelFuture future = bootstrap.bind(_PORT).sync();
            System.out.println("start success is port " + _PORT);
            future.channel().closeFuture().sync();
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
        
    }
}

class Myhandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("服務(wù)端收到:" + msg);
        
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");
        ctx.writeAndFlush("Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n");
        super.channelActive(ctx);
    }
    
    @Override  
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
        if (evt instanceof IdleStateEvent) {  
            IdleState state = ((IdleStateEvent) evt).state();  
            if (state == IdleState.READER_IDLE) {  
                System.out.println("5 秒沒(méi)有接收到客戶端的信息了,關(guān)閉不活躍鏈接通道");  
                //throw new Exception("idle exception");  
                ctx.channel().close();  
            }  
        } else {  
            super.userEventTriggered(ctx, evt);  
        }  
    }  

}
  • client
package com.java.netty;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;

public class MyClient {
    
    public static void main(String[] args) throws Exception{
        new MyClient().start();
    }
    
    
    public void start() throws Exception {
        EventLoopGroup worker = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast("ping", new IdleStateHandler(0, new Random().nextInt(10), 0, TimeUnit.SECONDS));  
                    //解碼器
                    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
                    pipeline.addLast("decoder", new StringDecoder());
                    pipeline.addLast("encoder", new StringEncoder());
                    pipeline.addLast("handler", new MyClientHandler());
                }
            }); 
            Channel ch = bootstrap.connect("127.0.0.1", 7878).sync().channel();
            
            //等待窗口數(shù)據(jù)寫(xiě)入
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            for (;;) {
                String line = in.readLine();
                if (null == line) {
                    continue;
                }
                ch.writeAndFlush(line + "\r\n");
            }
        } finally {
            worker.shutdownGracefully();
        }
    }
}

/**
 * 業(yè)務(wù)處理器
 * @author hejian
 *
 */
class MyClientHandler extends SimpleChannelInboundHandler<String> {
    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(new Random().nextInt(10) + "客戶端心跳信息",  
            CharsetUtil.UTF_8));  

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(">>>>>>>> client msg " +  msg);
        //回寫(xiě)service 應(yīng)答數(shù)據(jù)
        ctx.writeAndFlush("client is ask");
    }
    
     @Override  
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
            if (evt instanceof IdleStateEvent) {  
                IdleStateEvent event = (IdleStateEvent) evt;  
                if (event.state() == IdleState.WRITER_IDLE) {  
                   // if(currentTime <= TRY_TIMES){  
                      //  System.out.println("currentTime:"+currentTime);  
                       // currentTime++;  
                        ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());  
                   // }  
                }  
            }  
        }  
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容