對于Netty框架,你是如何進行邏輯分析和環(huán)境客戶端邏輯處理的呢

前言

首先在使用Netty框架的時候需要了解Netty是一個什么東西。

Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅(qū)動的網(wǎng)絡應用程序框架和工具,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡服務器和客戶端程序。也就是說,Netty 是一個基于NIO的客戶、服務器端編程框架。

使用Netty 可以確保你快速和簡單的開發(fā)出一個網(wǎng)絡應用,例如實現(xiàn)了某種協(xié)議的客戶,服務端應用。Netty相當簡化和流線化了網(wǎng)絡應用的編程開發(fā)過程,例如,TCP和UDP的socket服務開發(fā)。

“快速”和“簡單”并不用產(chǎn)生維護性或性能上的問題。Netty 是一個吸收了多種協(xié)議的實現(xiàn)經(jīng)驗,這些協(xié)議包括FTP,SMTP,HTTP,各種二進制,文本協(xié)議,并經(jīng)過相當精心設計的項目,最終,Netty 成功的找到了一種方式,在保證易于開發(fā)的同時還保證了其應用的性能,穩(wěn)定性和伸縮性。

下面就來看看第一個使用Netty搭建的應用

開發(fā)環(huán)境

首先我們需要準備對應的全套的Netty的jar包,這些可以參考Netty的官方網(wǎng)站

Netty的官網(wǎng)是:http://netty.io

當然也可以訪問第三方的中文網(wǎng)站?

http://ifeve.com/netty5-user-guide/

準備Netty4+

Eclipse或者IDEA

我這里使用的是IDEA

Netty客戶端和服務器端描述

一般情況下,在實際使用的時候更多的關注于服務器端的開發(fā),而很少關注客戶端的開發(fā),因為對于不同的業(yè)務邏輯會有不同的客戶端實現(xiàn),但是對于這些客戶端來說服務器端只有一個。服務器會寫數(shù)據(jù)到客戶端并且處理多個客戶端的并發(fā)連接。從理論上來說,限制程序性能的因素只有系統(tǒng)資源和JVM。為了方便理解,這里舉了個生活例子,在山谷或高山上大聲喊,你會聽見回聲,回聲是山返回的;

在這個例子中,你是客戶端,山是服務器。喊的行為就類似于一個Netty客戶端將數(shù)據(jù)發(fā)送到服務器,聽到回聲就類似于服務器將相同的數(shù)據(jù)返回給你,你離開山谷就斷開了連接,但是你可以返回進行重連服務器并且可以發(fā)送更多的數(shù)據(jù)。

邏輯分析

首先Netty是一個網(wǎng)絡框架,所以說就要涉及到客戶端和服務器端,這樣的話就需要分析一下客戶端和服務器端各自具有什么樣子的作用,根據(jù)具體的作用設計對應的代碼。

客戶端連接到服務器

建立連接后,發(fā)送或接收數(shù)據(jù)

服務器處理所有的客戶端連接

這個是建立一個網(wǎng)絡應用必須的三個步驟。下面就是具體的按照這三個步驟編寫

開始創(chuàng)建應用

首先創(chuàng)建一個服務器端的程序代碼如下

服務器端代碼

public class ServerHelloWorld {

//創(chuàng)建線程組,監(jiān)聽客戶端的請求

private EventLoopGroup acceptorGroup = null;

//處理客戶端相關操作線程組,負責處理與客戶端端的請求操作。

private EventLoopGroup clientGroup = null;

//服務器啟動相關配置信息

private ServerBootstrap bootstrap = null;

public ServerHelloWorld() {

init();

}

//初始化

private void init() {

//初始化線程組

acceptorGroup = new NioEventLoopGroup();

//處理客戶端邏輯

clientGroup = new NioEventLoopGroup();

//初始化配置信息

bootstrap = new ServerBootstrap();

//綁定監(jiān)聽線程組

bootstrap.group(acceptorGroup, clientGroup);

//設置通信模式為NIO模式同步非阻塞

bootstrap.channel(NioServerSocketChannel.class);

//設定緩存區(qū)的大小,單位是字節(jié)

bootstrap.option(ChannelOption.SO_BACKLOG, 1024);

//SO_SNDBUF 表示發(fā)送緩沖區(qū),SO_RCVBUF 表示接收緩存區(qū),SO_KEEPALIVE 表示是否開啟心跳檢查,保證連接有效

bootstrap.option(ChannelOption.SO_SNDBUF, 16 * 1024)

.option(ChannelOption.SO_RCVBUF, 16 * 1024)

.option(ChannelOption.SO_KEEPALIVE, true);

}

/**

* 監(jiān)聽處理邏輯

* @param port 監(jiān)聽端口

* @param acceptorHandlers 處理器

* @return

* @throws InterruptedException

*/

public ChannelFuture doAccept(int port, final ChannelHandler... acceptorHandlers) throws InterruptedException {

/**

* childHandler 是服務端的BootStrap獨有的方法是用于提供處理對象,提供處理對象可以一次性的增加若干個處理邏輯

* 類似責任鏈模式的處理邏輯,也就是說你增加A 和B兩個處理邏輯,在處理邏輯的時候會按照A 和 B 的順序進行依次處理

*

* ChannelInitializer 用于提供處理器的一個模型對象,這個模型對象,其中定義了一個方法initChannel

*

* initChannel 這個方法適用于初始化處理邏輯責任鏈條的。可以保證服務端的BootStrap只初始化一次處理器,盡量提供處理器的重用,

* 減少了反復創(chuàng)建處理器的操作

*/

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(acceptorHandlers);

}

});

/**

* bind 方法 用來綁定處理端口,ServerBootstrap可以綁定多個監(jiān)聽端口。多次調(diào)用bind方法即可

*

* sync 方法開始啟動監(jiān)聽邏輯,返回ChannelFuture 返回結果是監(jiān)聽成功后的未來結果,可以使用

* 這個ChannelFuture實現(xiàn)后續(xù)的服務器與客戶端的交互所以要獲取這個ChannelFuture對象

*/

ChannelFuture future = bootstrap.bind(port).sync();

//ChannelFuture future = bootstrap.bind(port).sync();

//ChannelFuture future = bootstrap.bind(port).sync();

return future;

}

/**

* 回收方法

* shutdownGracefully 是一個安全關閉的方法,可以保證不放棄任何一個以接收的客戶端請求

*/

public void release(){

this.acceptorGroup.shutdownGracefully();

this.clientGroup.shutdownGracefully();

}

public static void main(String[] args) {

ChannelFuture future = null;

ServerHelloWorld server = null;

try{

server = new ServerHelloWorld();

//建立連接

future = server.doAccept(8081, new ServerHandler());

System.out.println("server started.");

//關閉連接,回收資源

future.channel().closeFuture();

} catch (InterruptedException e) {

e.printStackTrace();

}finally {

if (null != future){

try {

future.channel().closeFuture().sync();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

if (null!=server){

server.release();

}

}

}

}


對于服務器端的程序來說,創(chuàng)建了兩個線程組EventLoopGroup,這里需要通過原理圖來詳細的說明一下。

首先對于上面的圖來說我們可以看到,在服務器端創(chuàng)建了兩個線程組,這個兩個線程組一個用來接收請求,一個用來處理業(yè)務邏輯,之前提到Netty是使用NIO來設計的,也就是說是同步非阻塞的一種IO。對于NIO、AIO、BIO等來說這些概念在后面的更新中會有總結到,現(xiàn)在就是簡單的提供這樣的一個概念。

在建立服務器端的時候,會看到一個ServerBootstrap 對象,這個表示服務器端,當然在客戶端有一個與這個類對等的類BootStrap類,這兩個類為鏈接的建立提供了很多的配置項信息。

服務端業(yè)務處理邏輯

**

*

* @ChannelHandler.Sharable 這個注解表示當前是一個可以分享的處理器,,服務注冊此Handler,可以分享給多個客戶端使用

* 如果不使用這個注解的話,每次客戶端請求時,必須為客戶端重新創(chuàng)建一個新的處理器Handler對象

*

* bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

* @Override

* protected void initChannel(SocketChannel ch) throws Exception {

* ch.pipeline().addLast(new XXXHandler());

* }

* });

* 建議自己開發(fā)的時候就會使的Handler是可共享的

*

* 如果Handler是一個可分享的,一定避免定義一個可以寫的實例變量。不安全

*

*/

@ChannelHandler.Sharable

public class ServerHandler extends ChannelHandlerAdapter {

/**

* 業(yè)務處理邏輯 用于處理讀取數(shù)據(jù)請求的邏輯。它里面的方法和參數(shù)如下

* @param ctx 上下文對象,其中包含于客戶端建立連接的所有資源,比如說對應的Channel

* @param msg 讀取到的數(shù)據(jù),默認類型是bytebuf 這個ByteBuf 是對ByteBuffer的一個封裝。簡化了操作

* @throws Exception

*/

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

//強制類型轉換

ByteBuf readBuffer = (ByteBuf) msg;

// 創(chuàng)建一個字節(jié)數(shù)組,用于保存緩存中的數(shù)據(jù)。readableBytes 在原生的ByteBuffer也有同樣的功能的方法

byte[] tempDatas = new byte[readBuffer.readableBytes()];

//讀取到對應的數(shù)據(jù) 可以直接讀取,這個不需要考慮復位的問題,

readBuffer.readBytes(tempDatas);

String message = new String(tempDatas,"UTF-8");

System.out.println("from client :"+message);

if ("exit".equals(message)){

//如果客戶端斷開連接,則關閉上下文

ctx.close();

return;

}

String line = "server message to client!";

//寫操作自動釋放緩存,避免內(nèi)存溢出的問題。

ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));

/**

* 如果調(diào)用的是write方法,不會刷新緩存,緩存中的數(shù)據(jù)不會發(fā)送到客戶端,必須調(diào)用flush方法進行強制刷出

* ctx.write(msg);

* ctx.flush();

*/

}

/**

* 異常處理邏輯

* ChannelHandlerContext關閉代表當前與客戶端的連接處理邏輯,當客戶端異常退出的時候這個異常也會執(zhí)行

* @param ctx

* @param cause

* @throws Exception

*/

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

System.out.println("server exceptionCaught method run …… ");

ctx.close();

}

}

客戶端代碼

public class ClientHelloWorld {

//處理請求線程組

private EventLoopGroup group = null;

//服務啟動相關配置信息

private Bootstrap bootstrap = null;

public ClientHelloWorld(){

init();

}

private void init(){

group = new NioEventLoopGroup();

bootstrap = new Bootstrap();

//定義線程組

bootstrap.group(group);

bootstrap.channel(NioSocketChannel.class);

}

public ChannelFuture doRequest(String host, int port, final ChannelHandler ... handlers) throws InterruptedException {

this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {

/**

* 客戶端的Handler沒有childHandler方法,只有Handler方法

* 這個方法與服務器的方法是類似的。

* 客戶端必須綁定處理器,也就說必須調(diào)用Handler方法

* @param ch

* @throws Exception

*/

@Override

protected void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast(handlers);

}

});

//建立連接

ChannelFuture future = this.bootstrap.connect(host,port).sync();

return future;

}

public void release(){

this.group.shutdownGracefully();

}

public static void main(String[] args) {

ClientHelloWorld client = null;

ChannelFuture future = null;

try{

client = new ClientHelloWorld();

future = client.doRequest("localhost",8081,new ClientHandler());

Scanner scanner = null;

while (true){

scanner = new Scanner(System.in);

System.out.println("enter message send to server (enter exit close client)");

String line = scanner.nextLine();

if ("exit".equals(line)){

future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8"))).addListener(ChannelFutureListener.CLOSE);

break;

}

future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));

TimeUnit.SECONDS.sleep(1);

}

} catch (Exception e) {

e.printStackTrace();

}finally {

if (null!=client){

client.release();

}

if (null!=future){

try {

future.channel().closeFuture().sync();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

}

客戶端邏輯處理邏輯

public class ClientHandler extends ChannelHandlerAdapter {

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

try {

//強制類型轉換

ByteBuf readBuffer = (ByteBuf) msg;

// 創(chuàng)建一個字節(jié)數(shù)組,用于保存緩存中的數(shù)據(jù)。readableBytes 在原生的ByteBuffer也有同樣的功能的方法

byte[] tempDatas = new byte[readBuffer.readableBytes()];

//讀取到對應的數(shù)據(jù) 可以直接讀取,這個不需要考慮復位的問題,

readBuffer.readBytes(tempDatas);

System.out.println("from server " + new String(tempDatas, "UTF-8"));

}finally {

//用于避免內(nèi)存溢出

ReferenceCountUtil.release(msg);

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

System.out.println("client exceptionCaught method run …… ");

ctx.close();

}

}

總結

Netty是一個比較高效的網(wǎng)絡應用框架,在很多的項目中都使用到了Netty。彌補了原生的的IO的很多的缺陷,但是也有很多不足的地方,需要更具具體的使用情況進行開發(fā)。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容