用Netty 自己實(shí)現(xiàn)dubbo RPC
RPC 的基本介紹
RPC (Remote Procedure Call) 遠(yuǎn)程過程調(diào)用,是一個(gè)計(jì)算機(jī)通信協(xié)議。該協(xié)議允許運(yùn)行于一臺(tái)計(jì)算機(jī)的程序調(diào)用另一臺(tái)計(jì)算機(jī)的子程序,而程序員無需額外的為這個(gè)交互編程。也就是說可以達(dá)到兩個(gè)或者多個(gè)應(yīng)用程序部署在不同的服務(wù)器上,他們之間的調(diào)用都像是本地方法調(diào)用一樣。RPC 的調(diào)用如下圖。

image-20200726210649763
常用的RPC 框架有阿里的dubbo,Google的gRPC,Go 語言的rpcx,Apache的thrift,Spring的Spring Cloud.
RPC 調(diào)用的過程
在RPC 中,Client 端叫做服務(wù)消費(fèi)者,Server 叫做服務(wù)提供者。

image-20200726211447144
調(diào)用流程說明
- 服務(wù)消費(fèi)方(client)以本地調(diào)用方式調(diào)用服務(wù)
- client stub 接收到調(diào)用后負(fù)責(zé)將方法、參數(shù)等封裝成能夠進(jìn)行網(wǎng)絡(luò)傳輸?shù)南Ⅲw
- client stub 將消息進(jìn)行編碼并發(fā)送到服務(wù)端
- server stub 接收到消息后進(jìn)行解碼
- server stub 根據(jù)解碼結(jié)果調(diào)用本地的服務(wù)
- 本地服務(wù)執(zhí)行并將結(jié)果返回給server stub
- server stub 將返回導(dǎo)入結(jié)果進(jìn)行編碼并發(fā)送給消費(fèi)方
- client stub 接收到消息并進(jìn)行解碼
- 服務(wù)消費(fèi)方(client) 得到結(jié)果
其中,RPC 框架的目標(biāo)就是把2-8 這些步驟封裝起來,用戶無需關(guān)心這些細(xì)節(jié),可以像調(diào)用本地方法一樣即可完成遠(yuǎn)程服務(wù)調(diào)用。
自己實(shí)現(xiàn)dubbo RPC
需求說明
- dubbo 底層使用了Netty 作為網(wǎng)絡(luò)通信框架,要求用netty 實(shí)現(xiàn)一個(gè)簡單的RPC框架。
- 模仿dubbo,消費(fèi)者和提供者約定接口和協(xié)議,消費(fèi)者遠(yuǎn)程調(diào)用提供者的服務(wù),提供者返回一個(gè)字符串,消費(fèi)者打印提供者返回的數(shù)據(jù)。底層網(wǎng)絡(luò)通信給予Netty 4.x
設(shè)計(jì)說明
- 創(chuàng)建一個(gè)接口,定義抽象方法。用于消費(fèi)者和提供者之間的約定。
- 創(chuàng)建一個(gè)提供者,該類需要監(jiān)聽消費(fèi)者的請求,并按照約定返回?cái)?shù)據(jù)。
- 創(chuàng)建一個(gè)消費(fèi)者,該類需要透明的調(diào)用自己不存在的方法,內(nèi)部需要使用netty請求提供者返回?cái)?shù)據(jù)
- 開發(fā)的分析圖如下:

image-20200726222733046
代碼實(shí)現(xiàn)
1.定義統(tǒng)一的接口
//這個(gè)是接口,是服務(wù)提供方和 服務(wù)消費(fèi)方都需要
public interface HelloService {
String hello(String mes);
}
2.服務(wù)的提供方
// 先寫一個(gè)實(shí)現(xiàn)剛剛接口的方法
public class HelloServiceImpl implements HelloService {
private static int count = 0;
//當(dāng)有消費(fèi)方調(diào)用該方法時(shí), 就返回一個(gè)結(jié)果
@Override
public String hello(String mes) {
System.out.println("收到客戶端消息=" + mes);
//根據(jù)mes 返回不同的結(jié)果
if(mes != null) {
return "你好客戶端, 我已經(jīng)收到你的消息 [" + mes + "] 第" + (++count) + " 次";
} else {
return "你好客戶端, 我已經(jīng)收到你的消息 ";
}
}
}
//然后緊接著寫基于netty的服務(wù)端的處理handler以及nettyServer 這里服務(wù)器這邊handler比較簡單
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//獲取客戶端發(fā)送的消息,并調(diào)用服務(wù)
System.out.println("msg=" + msg);
//客戶端在調(diào)用服務(wù)器的api 時(shí),我們需要定義一個(gè)協(xié)議
//比如我們要求 每次發(fā)消息是都必須以某個(gè)字符串開頭 "HelloService#hello#你好"
if(msg.toString().startsWith(ClientBootstrap.providerName)) {
String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(result);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
public class NettyServer {
public static void startServer(String hostName, int port) {
startServer0(hostName,port);
}
//編寫一個(gè)方法,完成對NettyServer的初始化和啟動(dòng)
private static void startServer0(String hostname, int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler()); //業(yè)務(wù)處理器
}
}
);
ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
System.out.println("服務(wù)提供方開始提供服務(wù)~~");
channelFuture.channel().closeFuture().sync();
}catch (Exception e) {
e.printStackTrace();
}
finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
// 最后寫一個(gè)服務(wù)端的啟動(dòng)類
//ServerBootstrap 會(huì)啟動(dòng)一個(gè)服務(wù)提供者,就是 NettyServer
public class ServerBootstrap {
public static void main(String[] args) {
//代碼代填..
NettyServer.startServer("127.0.0.1", 7000);
}
}
3.服務(wù)的消費(fèi)方
public class NettyClient {
//創(chuàng)建線程池
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler client;
private int count = 0;
//編寫方法使用代理模式,獲取一個(gè)代理對象
public Object getBean(final Class<?> serivceClass, final String providerName) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]{serivceClass}, (proxy, method, args) -> {
System.out.println("(proxy, method, args) 進(jìn)入...." + (++count) + " 次");
//{} 部分的代碼,客戶端每調(diào)用一次 hello, 就會(huì)進(jìn)入到該代碼
if (client == null) {
initClient();
}
//設(shè)置要發(fā)給服務(wù)器端的信息
//providerName 協(xié)議頭 args[0] 就是客戶端調(diào)用api hello(???), 參數(shù)
client.setPara(providerName + args[0]);
//
return executor.submit(client).get();
});
}
//初始化客戶端
private static void initClient() {
client = new NettyClientHandler();
//創(chuàng)建EventLoopGroup
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(client);
}
}
);
try {
bootstrap.connect("127.0.0.1", 7000).sync();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;//上下文
private String result; //返回的結(jié)果
private String para; //客戶端調(diào)用方法時(shí),傳入的參數(shù)
//與服務(wù)器的連接創(chuàng)建后,就會(huì)被調(diào)用, 這個(gè)方法是第一個(gè)被調(diào)用(1)
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(" channelActive 被調(diào)用 ");
context = ctx; //因?yàn)槲覀冊谄渌椒〞?huì)使用到 ctx
}
//收到服務(wù)器的數(shù)據(jù)后,調(diào)用方法 (4)
//
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(" channelRead 被調(diào)用 ");
result = msg.toString();
notify(); //喚醒等待的線程
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
//被代理對象調(diào)用, 發(fā)送數(shù)據(jù)給服務(wù)器,-> wait -> 等待被喚醒(channelRead) -> 返回結(jié)果 (3)-》5
@Override
public synchronized Object call() throws Exception {
System.out.println(" call1 被調(diào)用 ");
context.writeAndFlush(para);
//進(jìn)行wait
wait(); //等待channelRead 方法獲取到服務(wù)器的結(jié)果后,喚醒
System.out.println(" call2 被調(diào)用 ");
return result; //服務(wù)方返回的結(jié)果
}
//(2)
void setPara(String para) {
System.out.println(" setPara ");
this.para = para;
}
}
// client 端的啟動(dòng)類
public class ClientBootstrap {
//這里定義協(xié)議頭
public static final String providerName = "HelloService#hello#";
public static void main(String[] args) throws Exception{
//創(chuàng)建一個(gè)消費(fèi)者
NettyClient customer = new NettyClient();
//創(chuàng)建代理對象
HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);
for (;; ) {
Thread.sleep(2 * 1000);
//通過代理對象調(diào)用服務(wù)提供者的方法(服務(wù))
String res = service.hello("你好 dubbo~");
System.out.println("調(diào)用的結(jié)果 res= " + res);
}
}
}