用Netty 自己實(shí)現(xiàn)dubbo RPC

用Netty 自己實(shí)現(xiàn)dubbo RPC

RPC 的基本介紹

RPC (Remote Procedure Call) 遠(yuǎn)程過程調(diào)用,是一個(gè)計(jì)算機(jī)通信協(xié)議。該協(xié)議允許運(yùn)行于一臺(tái)計(jì)算機(jī)的程序調(diào)用另一臺(tái)計(jì)算機(jī)的子程序,而程序員無需額外的為這個(gè)交互編程。也就是說可以達(dá)到兩個(gè)或者多個(gè)應(yīng)用程序部署在不同的服務(wù)器上,他們之間的調(diào)用都像是本地方法調(diào)用一樣。RPC 的調(diào)用如下圖。

image-20200726210649763

常用的RPC 框架有阿里的dubbo,Google的gRPC,Go 語言的rpcx,Apache的thrift,Spring的Spring Cloud.

RPC 調(diào)用的過程

在RPC 中,Client 端叫做服務(wù)消費(fèi)者,Server 叫做服務(wù)提供者。

image-20200726211447144

調(diào)用流程說明

  • 服務(wù)消費(fèi)方(client)以本地調(diào)用方式調(diào)用服務(wù)
  • client stub 接收到調(diào)用后負(fù)責(zé)將方法、參數(shù)等封裝成能夠進(jìn)行網(wǎng)絡(luò)傳輸?shù)南Ⅲw
  • client stub 將消息進(jìn)行編碼并發(fā)送到服務(wù)端
  • server stub 接收到消息后進(jìn)行解碼
  • server stub 根據(jù)解碼結(jié)果調(diào)用本地的服務(wù)
  • 本地服務(wù)執(zhí)行并將結(jié)果返回給server stub
  • server stub 將返回導(dǎo)入結(jié)果進(jìn)行編碼并發(fā)送給消費(fèi)方
  • client stub 接收到消息并進(jìn)行解碼
  • 服務(wù)消費(fèi)方(client) 得到結(jié)果

其中,RPC 框架的目標(biāo)就是把2-8 這些步驟封裝起來,用戶無需關(guān)心這些細(xì)節(jié),可以像調(diào)用本地方法一樣即可完成遠(yuǎn)程服務(wù)調(diào)用。

自己實(shí)現(xiàn)dubbo RPC

需求說明

  1. dubbo 底層使用了Netty 作為網(wǎng)絡(luò)通信框架,要求用netty 實(shí)現(xiàn)一個(gè)簡單的RPC框架。
  2. 模仿dubbo,消費(fèi)者和提供者約定接口和協(xié)議,消費(fèi)者遠(yuǎn)程調(diào)用提供者的服務(wù),提供者返回一個(gè)字符串,消費(fèi)者打印提供者返回的數(shù)據(jù)。底層網(wǎng)絡(luò)通信給予Netty 4.x

設(shè)計(jì)說明

  1. 創(chuàng)建一個(gè)接口,定義抽象方法。用于消費(fèi)者和提供者之間的約定。
  2. 創(chuàng)建一個(gè)提供者,該類需要監(jiān)聽消費(fèi)者的請求,并按照約定返回?cái)?shù)據(jù)。
  3. 創(chuàng)建一個(gè)消費(fèi)者,該類需要透明的調(diào)用自己不存在的方法,內(nèi)部需要使用netty請求提供者返回?cái)?shù)據(jù)
  4. 開發(fā)的分析圖如下:
image-20200726222733046

代碼實(shí)現(xiàn)

1.定義統(tǒng)一的接口
//這個(gè)是接口,是服務(wù)提供方和 服務(wù)消費(fèi)方都需要
public interface HelloService {

    String hello(String mes);
}
2.服務(wù)的提供方
// 先寫一個(gè)實(shí)現(xiàn)剛剛接口的方法
public class HelloServiceImpl implements HelloService {

    private static int count = 0;
    //當(dāng)有消費(fèi)方調(diào)用該方法時(shí), 就返回一個(gè)結(jié)果
    @Override
    public String hello(String mes) {
        System.out.println("收到客戶端消息=" + mes);
        //根據(jù)mes 返回不同的結(jié)果
        if(mes != null) {
            return "你好客戶端, 我已經(jīng)收到你的消息 [" + mes + "] 第" + (++count) + " 次";
        } else {
            return "你好客戶端, 我已經(jīng)收到你的消息 ";
        }
    }
}
//然后緊接著寫基于netty的服務(wù)端的處理handler以及nettyServer   這里服務(wù)器這邊handler比較簡單
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //獲取客戶端發(fā)送的消息,并調(diào)用服務(wù)
        System.out.println("msg=" + msg);
        //客戶端在調(diào)用服務(wù)器的api 時(shí),我們需要定義一個(gè)協(xié)議
        //比如我們要求 每次發(fā)消息是都必須以某個(gè)字符串開頭 "HelloService#hello#你好"
        if(msg.toString().startsWith(ClientBootstrap.providerName)) {

            String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            ctx.writeAndFlush(result);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}
public class NettyServer {


    public static void startServer(String hostName, int port) {
        startServer0(hostName,port);
    }

    //編寫一個(gè)方法,完成對NettyServer的初始化和啟動(dòng)

    private static void startServer0(String hostname, int port) {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                                      @Override
                                      protected void initChannel(SocketChannel ch) throws Exception {
                                          ChannelPipeline pipeline = ch.pipeline();
                                          pipeline.addLast(new StringDecoder());
                                          pipeline.addLast(new StringEncoder());
                                          pipeline.addLast(new NettyServerHandler()); //業(yè)務(wù)處理器

                                      }
                                  }

                    );

            ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
            System.out.println("服務(wù)提供方開始提供服務(wù)~~");
            channelFuture.channel().closeFuture().sync();

        }catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}
// 最后寫一個(gè)服務(wù)端的啟動(dòng)類
//ServerBootstrap 會(huì)啟動(dòng)一個(gè)服務(wù)提供者,就是 NettyServer
public class ServerBootstrap {
    public static void main(String[] args) {

        //代碼代填..
        NettyServer.startServer("127.0.0.1", 7000);
    }
}
3.服務(wù)的消費(fèi)方
public class NettyClient {

    //創(chuàng)建線程池
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static NettyClientHandler client;
    private int count = 0;

    //編寫方法使用代理模式,獲取一個(gè)代理對象

    public Object getBean(final Class<?> serivceClass, final String providerName) {

        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class<?>[]{serivceClass}, (proxy, method, args) -> {

                    System.out.println("(proxy, method, args) 進(jìn)入...." + (++count) + " 次");
                    //{}  部分的代碼,客戶端每調(diào)用一次 hello, 就會(huì)進(jìn)入到該代碼
                    if (client == null) {
                        initClient();
                    }

                    //設(shè)置要發(fā)給服務(wù)器端的信息
                    //providerName 協(xié)議頭 args[0] 就是客戶端調(diào)用api hello(???), 參數(shù)
                    client.setPara(providerName + args[0]);

                    //
                    return executor.submit(client).get();

                });
    }

    //初始化客戶端
    private static void initClient() {
        client = new NettyClientHandler();
        //創(chuàng)建EventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(
                        new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new StringDecoder());
                                pipeline.addLast(new StringEncoder());
                                pipeline.addLast(client);
                            }
                        }
                );

        try {
            bootstrap.connect("127.0.0.1", 7000).sync();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private ChannelHandlerContext context;//上下文
    private String result; //返回的結(jié)果
    private String para; //客戶端調(diào)用方法時(shí),傳入的參數(shù)


    //與服務(wù)器的連接創(chuàng)建后,就會(huì)被調(diào)用, 這個(gè)方法是第一個(gè)被調(diào)用(1)
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(" channelActive 被調(diào)用  ");
        context = ctx; //因?yàn)槲覀冊谄渌椒〞?huì)使用到 ctx
    }

    //收到服務(wù)器的數(shù)據(jù)后,調(diào)用方法 (4)
    //
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(" channelRead 被調(diào)用  ");
        result = msg.toString();
        notify(); //喚醒等待的線程
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    //被代理對象調(diào)用, 發(fā)送數(shù)據(jù)給服務(wù)器,-> wait -> 等待被喚醒(channelRead) -> 返回結(jié)果 (3)-》5
    @Override
    public synchronized Object call() throws Exception {
        System.out.println(" call1 被調(diào)用  ");
        context.writeAndFlush(para);
        //進(jìn)行wait
        wait(); //等待channelRead 方法獲取到服務(wù)器的結(jié)果后,喚醒
        System.out.println(" call2 被調(diào)用  ");
        return  result; //服務(wù)方返回的結(jié)果

    }
    //(2)
    void setPara(String para) {
        System.out.println(" setPara  ");
        this.para = para;
    }
}
// client 端的啟動(dòng)類
public class ClientBootstrap {


    //這里定義協(xié)議頭
    public static final String providerName = "HelloService#hello#";

    public static void main(String[] args) throws  Exception{

        //創(chuàng)建一個(gè)消費(fèi)者
        NettyClient customer = new NettyClient();

        //創(chuàng)建代理對象
        HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);

        for (;; ) {
            Thread.sleep(2 * 1000);
            //通過代理對象調(diào)用服務(wù)提供者的方法(服務(wù))
            String res = service.hello("你好 dubbo~");
            System.out.println("調(diào)用的結(jié)果 res= " + res);
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容