java io 模型一共分為4種
- 傳統(tǒng)IO
- 偽異步IO
- NIO
- AIO
介紹
- netty是一個(gè)高性能,異步事件驅(qū)動(dòng)的NIO框架,它提供了對(duì)TCP、UDP和file傳輸?shù)闹С?netty所有的IO操作都為異步非阻塞,通過(guò)Future-Listener機(jī)制,用戶可以方便的主動(dòng)獲取或者通過(guò)通知機(jī)制獲得IO操作結(jié)果。
能做什么
- 開(kāi)發(fā)異步、非阻塞的TCP、UDP網(wǎng)絡(luò)應(yīng)用程序
- 開(kāi)發(fā)異步文件傳輸應(yīng)用程序
- 開(kāi)發(fā)異步HTTP服務(wù)端和客戶端應(yīng)用程序
- 提供對(duì)多種編解碼框架的集成,包括谷歌的Protobuf、Jboss marshalling、Java序列化、壓縮編解碼、XML解碼、字符串編解碼等,這些編解碼框架可以被用戶直接使用。
- 提供形式多樣的編解碼基礎(chǔ)類(lèi)庫(kù),可以非常方便的實(shí)現(xiàn)私有協(xié)議棧編解碼框架的二次定制和開(kāi)發(fā)。
- 基于職責(zé)鏈模式的Pipeline-Handler機(jī)制,用戶可以非常方便的對(duì)網(wǎng)絡(luò)事件進(jìn)行攔截和定制。
- 所有的IO操作都是異步的,用戶可以通過(guò)Future-Listener機(jī)制主動(dòng)Get結(jié)果或者由IO線程操作完成之后主動(dòng)Notify結(jié)果,用戶的業(yè)務(wù)線程不需要同步等待。
- IP黑白名單控制 。
- 打印消息碼流。
- 流量控制和整形。
- 性能統(tǒng)計(jì)。
- 基于鏈路空閑事件檢測(cè)的心跳檢測(cè)。
EventLoop,EventLoopGroup
- EventLoop目的是為Channel處理IO操作,一個(gè)EventLoop可以為多個(gè)Channel服務(wù),EventLoopGroup會(huì)包含多個(gè)EventLoop。
BootStrap,ServerBootstrap
- 一個(gè)Netty應(yīng)用通常由一個(gè)Bootstrap開(kāi)始,它主要作用是配置整個(gè)Netty程序,串聯(lián)起各個(gè)組件。
ChannelInitializer
- 當(dāng)一個(gè)鏈接建立時(shí),我們需要知道怎么來(lái)接收或者發(fā)送數(shù)據(jù),當(dāng)然,我們有各種各樣的Handler實(shí)現(xiàn)來(lái)處理它,那么ChannelInitializer便是用來(lái)配置這些Handler,它會(huì)提供一個(gè)ChannelPipeline,并把Handler加入到ChannelPipeline。
Handler
- 為了支持各種協(xié)議和處理數(shù)據(jù)的方式,便誕生了Handler組件。Handler主要用來(lái)處理各種事件,這里的事件很廣泛,比如可以是連接、數(shù)據(jù)接收、異常、數(shù)據(jù)轉(zhuǎn)換等。
ChannelInboundHandler
- 一個(gè)最常用的Handler。這個(gè)Handler的作用就是處理接收到數(shù)據(jù)時(shí)的事件,也就是說(shuō),我們的業(yè)務(wù)邏輯一般就是寫(xiě)在這個(gè)Handler里面的,ChannelInboundHandler就是用來(lái)處理我們的核心業(yè)務(wù)邏輯
Future
- 在Netty中所有的IO操作都是異步的,因此,你不能立刻得知消息是否被正確處理,但是我們可以過(guò)一會(huì)等它執(zhí)行完成或者直接注冊(cè)一個(gè)監(jiān)聽(tīng),具體的實(shí)現(xiàn)就是通過(guò)Future和ChannelFutures,他們可以注冊(cè)一個(gè)監(jiān)聽(tīng),當(dāng)操作執(zhí)行成功或失敗時(shí)監(jiān)聽(tīng)會(huì)自動(dòng)觸發(fā)??傊?,所有的操作都會(huì)返回一個(gè)ChannelFuture。
DEMO
import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
/**
* @author hejian
*/
public class MyService {
private final AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();
public static void main(String[] args) throws Exception {
new MyService().start();
}
private static int _BOOSEVENTNUM = 1;
private static int _PORT = 7878;
public void start() throws Exception {
EventLoopGroup boss = new NioEventLoopGroup(_BOOSEVENTNUM);
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 構(gòu)建
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new StringDecoder(), new StringEncoder(), new Myhandler());
//心跳機(jī)制
}
});
ChannelFuture future = bootstrap.bind(_PORT).sync();
System.out.println("start success is port " + _PORT);
future.channel().closeFuture().sync();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
class Myhandler extends SimpleChannelInboundHandler<String> {
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("服務(wù)端收到:" + msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");
ctx.writeAndFlush("Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n");
super.channelActive(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.READER_IDLE) {
System.out.println("5 秒沒(méi)有接收到客戶端的信息了,關(guān)閉不活躍鏈接通道");
//throw new Exception("idle exception");
ctx.channel().close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
- client
package com.java.netty;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
public class MyClient {
public static void main(String[] args) throws Exception{
new MyClient().start();
}
public void start() throws Exception {
EventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("ping", new IdleStateHandler(0, new Random().nextInt(10), 0, TimeUnit.SECONDS));
//解碼器
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new MyClientHandler());
}
});
Channel ch = bootstrap.connect("127.0.0.1", 7878).sync().channel();
//等待窗口數(shù)據(jù)寫(xiě)入
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
for (;;) {
String line = in.readLine();
if (null == line) {
continue;
}
ch.writeAndFlush(line + "\r\n");
}
} finally {
worker.shutdownGracefully();
}
}
}
/**
* 業(yè)務(wù)處理器
* @author hejian
*
*/
class MyClientHandler extends SimpleChannelInboundHandler<String> {
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(new Random().nextInt(10) + "客戶端心跳信息",
CharsetUtil.UTF_8));
@Override
protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(">>>>>>>> client msg " + msg);
//回寫(xiě)service 應(yīng)答數(shù)據(jù)
ctx.writeAndFlush("client is ask");
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
// if(currentTime <= TRY_TIMES){
// System.out.println("currentTime:"+currentTime);
// currentTime++;
ctx.channel().writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
// }
}
}
}
}