前言
在微服務當?shù)赖慕裉?,分布式系統(tǒng)越來越重要,實現(xiàn)服務化首先就要考慮服務之間的通信問題。這里面涉及序列化、反序列化、尋址、連接等等問題。不過,有了 RPC 框架,我們就無需苦惱。
一、什么是 RPC?
RPC(Remote Procedure Call)— 遠程過程調(diào)用,是一個計算機通信協(xié)議。該協(xié)議允許運行于一臺計算機的程序調(diào)用另一臺計算機的程序,而程序員無需額外地為這個交互作用編程。
值得注意是,兩個或多個應用程序都分布在不同的服務器上,它們之間的調(diào)用都像是本地方法調(diào)用一樣。

RPC框架有很多,比較知名的如阿里的 Dubbo、google 的 gRPC、Go 語言的 rpcx、Apache 的 thrift。當然了,還有Spring Cloud,不過對于 Spring Cloud 來說,RPC 只是它的一個功能模塊。
如果要實現(xiàn)一個基本功能、簡單的 RPC,要涉及哪些東西呢?
- 動態(tài)代理
- 反射
- 序列化、反序列化
- 網(wǎng)絡通信
- 編解碼
- 服務發(fā)現(xiàn)與注冊
- 心跳與鏈路檢測
- ......
下面,我們一起通過代碼來分析,怎么把技術(shù)點串到一起,實現(xiàn)我們自己的 RPC。
二、環(huán)境準備
在開始之前,筆者先介紹一下所用到的軟件環(huán)境。
SpringBoot、Netty、zookeeper、zkclient、fastjson
- SpringBoot 項目的基礎(chǔ)框架
- Netty 通信服務器
- zookeeper 服務發(fā)現(xiàn)與注冊
- zkclient zookeeper客戶端
- fastjson 序列化、反序列化
三、RPC 生產(chǎn)者
1、服務接口API
整個 RPC 系統(tǒng),我們分為生成者和消費者。首先他們有一個共同的服務接口 API。在這里,我們搞一個操作用戶信息的 service 接口。
public interface InfoUserService {
List<InfoUser> insertInfoUser(InfoUser infoUser);
InfoUser getInfoUserById(String id);
void deleteInfoUserById(String id);
String getNameById(String id);
Map<String,InfoUser> getAllUser();
}
2、服務類實現(xiàn)
作為生產(chǎn)者,它當然要有實現(xiàn)類,我們創(chuàng)建InfoUserServiceImpl實現(xiàn)類,并用注解把它標注為 RPC 的服務,然后注冊到 Srping 的 Bean 容器中。在這里,我們把infoUserMap當做數(shù)據(jù)庫,存儲用戶信息。
package com.viewscenes.netsupervisor.service.impl;
@RpcService
public class InfoUserServiceImpl implements InfoUserService {
Logger logger = LoggerFactory.getLogger(this.getClass());
//當做數(shù)據(jù)庫,存儲用戶信息
Map<String,InfoUser> infoUserMap = new HashMap<>();
public List<InfoUser> insertInfoUser(InfoUser infoUser) {
logger.info("新增用戶信息:{}", JSONObject.toJSONString(infoUser));
infoUserMap.put(infoUser.getId(),infoUser);
return getInfoUserList();
}
public InfoUser getInfoUserById(String id) {
InfoUser infoUser = infoUserMap.get(id);
logger.info("查詢用戶ID:{}",id);
return infoUser;
}
public List<InfoUser> getInfoUserList() {
List<InfoUser> userList = new ArrayList<>();
Iterator<Map.Entry<String, InfoUser>> iterator = infoUserMap.entrySet().iterator();
while (iterator.hasNext()){
Map.Entry<String, InfoUser> next = iterator.next();
userList.add(next.getValue());
}
logger.info("返回用戶信息記錄數(shù):{}",userList.size());
return userList;
}
public void deleteInfoUserById(String id) {
logger.info("刪除用戶信息:{}",JSONObject.toJSONString(infoUserMap.remove(id)));
}
public String getNameById(String id){
logger.info("根據(jù)ID查詢用戶名稱:{}",id);
return infoUserMap.get(id).getName();
}
public Map<String,InfoUser> getAllUser(){
logger.info("查詢所有用戶信息{}",infoUserMap.keySet().size());
return infoUserMap;
}
}
注解@RpcService定義如下:
package com.viewscenes.netsupervisor.annotation;
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {}
3、請求信息和返回信息
所有的請求信息和返回信息,我們用兩個 JavaBean 來表示。其中的重點是,返回信息要帶有請求信息的ID。
package com.viewscenes.netsupervisor.entity;
public class Request {
private String id;
private String className;// 類名
private String methodName;// 函數(shù)名稱
private Class<?>[] parameterTypes;// 參數(shù)類型
private Object[] parameters;// 參數(shù)列表
get/set ...
}
package com.viewscenes.netsupervisor.entity;
public class Response {
private String requestId;
private int code;
private String error_msg;
private Object data;
get/set ...
}
4、Netty 服務端
Netty 作為高性能的 NIO 通信框架,在很多 RPC 框架中都有它的身影。我們也采用它當做通信服務器。說到這,我們先看個配置文件,重點有兩個,zookeeper 的注冊地址和 Netty 通信服務器的地址。
#TOMCAT端口
server.port=8001
#zookeeper注冊地址
registry.address=192.168.174.10:2181
#RPC服務提供者地址
rpc.server.address=192.168.210.81:18868
為了方便管理,我們把它也注冊成 Bean,同時實現(xiàn) ApplicationContextAware 接口,把上面 @RpcService 注解的服務類撈出來,緩存起來,供消費者調(diào)用。同時,作為服務器,還要對客戶端的鏈路進行心跳檢測,超過60秒未讀寫數(shù)據(jù),關(guān)閉此連接。
package com.viewscenes.netsupervisor.netty.server;
import com.viewscenes.netsupervisor.annotation.RpcService;
import com.viewscenes.netsupervisor.netty.codec.json.JSONDecoder;
import com.viewscenes.netsupervisor.netty.codec.json.JSONEncoder;
import com.viewscenes.netsupervisor.registry.ServiceRegistry;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class NettyServer implements ApplicationContextAware,InitializingBean{
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
private static final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private static final EventLoopGroup workerGroup = new NioEventLoopGroup(4);
private Map<String, Object> serviceMap = new HashMap<>();
@Value("${rpc.server.address}")
private String serverAddress;
@Autowired
ServiceRegistry registry;
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RpcService.class);
for(Object serviceBean:beans.values()){
Class<?> clazz = serviceBean.getClass();
Class<?>[] interfaces = clazz.getInterfaces();
for (Class<?> inter : interfaces){
String interfaceName = inter.getName();
logger.info("加載服務類: {}", interfaceName);
serviceMap.put(interfaceName, serviceBean);
}
}
logger.info("已加載全部服務接口:{}", serviceMap);
}
public void afterPropertiesSet() throws Exception {
start();
}
public void start(){
final NettyServerHandler handler = new NettyServerHandler(serviceMap);
new Thread(() -> {
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup).
channel(NioServerSocketChannel.class).
option(ChannelOption.SO_BACKLOG,1024).
childOption(ChannelOption.SO_KEEPALIVE,true).
childOption(ChannelOption.TCP_NODELAY,true).
childHandler(new ChannelInitializer<SocketChannel>() {
//創(chuàng)建NIOSocketChannel成功后,在進行初始化時,將它的ChannelHandler設(shè)置到ChannelPipeline中,用于處理網(wǎng)絡IO事件
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 60));
pipeline.addLast(new JSONEncoder());
pipeline.addLast(new JSONDecoder());
pipeline.addLast(new HeartBeatHandler());
pipeline.addLast(handler);
}
});
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
ChannelFuture cf = bootstrap.bind(host,port).sync();
logger.info("RPC 服務器啟動.監(jiān)聽端口:"+port);
registry.register(serverAddress);
//等待服務端監(jiān)聽端口關(guān)閉
cf.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}).start();
}
}
setApplicationContext方法,將被 @RpcService 注解的服務類,存儲在 serviceMap 中。start方法,啟動 Netty 服務端。new IdleStateHandler(0, 0, 60)檢測心跳機制,表示 60s 內(nèi)如果沒有接收到客戶端的讀寫請求,將走ChannelInboundHandlerAdapter.userEventTriggered方法。于是我們自定義HeartBeatHandler心跳處理器,來重寫userEventTriggered方法,將連接關(guān)閉。
package com.viewscenes.netsupervisor.netty.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 用于檢測channel的心跳handler
* 繼承ChannelInboundHandlerAdapter,從而不需要實現(xiàn)channelRead0 方法
* @author K. L. Mao
* @create 2019/2/22
*/
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(HeartBeatHandler.class);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state()== IdleState.ALL_IDLE){
logger.info("客戶端已超過60秒未讀寫數(shù)據(jù),關(guān)閉連接.{}",ctx.channel().remoteAddress());
ctx.channel().close();
}
}
}
}
在處理器中的構(gòu)造函數(shù)中,我們先把服務 Bean 的 serviceMap 傳進來,所有的處理要基于這個 serviceMap 才能找到對應的實現(xiàn)類。在channelRead中,獲取請求方法的信息,然后通過反射調(diào)用方法獲取返回值。
package com.viewscenes.netsupervisor.netty.server;
import com.alibaba.fastjson.JSON;
import com.viewscenes.netsupervisor.entity.Request;
import com.viewscenes.netsupervisor.entity.Response;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Method;
import java.util.Map;
/**
* @program: rpc-provider
* @description: ${description}
* @author: shiqizhen
* @create: 2018-11-30 17:27
**/
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private final Map<String, Object> serviceMap;
public NettyServerHandler(Map<String, Object> serviceMap) {
this.serviceMap = serviceMap;
}
public void channelActive(ChannelHandlerContext ctx) {
logger.info("客戶端連接成功!"+ctx.channel().remoteAddress());
}
public void channelInactive(ChannelHandlerContext ctx) {
logger.info("客戶端斷開連接!{}",ctx.channel().remoteAddress());
ctx.channel().close();
}
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Request request = JSON.parseObject(msg.toString(),Request.class);
if ("heartBeat".equals(request.getMethodName())) {
logger.info("客戶端心跳信息..."+ctx.channel().remoteAddress());
}else{
logger.info("RPC客戶端請求接口:"+request.getClassName()+" 方法名:"+request.getMethodName());
Response response = new Response();
response.setRequestId(request.getId());
try {
Object result = this.handler(request);
response.setData(result);
} catch (Throwable e) {
e.printStackTrace();
response.setCode(1);
response.setError_msg(e.toString());
logger.error("RPC Server handle request error",e);
}
ctx.writeAndFlush(response);
}
}
/**
* 通過反射,執(zhí)行本地方法
* @param request
* @return
* @throws Throwable
*/
private Object handler(Request request) throws Throwable{
String className = request.getClassName();
Object serviceBean = serviceMap.get(className);
if (serviceBean!=null){
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
return method.invoke(serviceBean, getParameters(parameterTypes,parameters));
}else{
throw new Exception("未找到服務接口,請檢查配置!:"+className+"#"+request.getMethodName());
}
}
/**
* 獲取參數(shù)列表
* @param parameterTypes
* @param parameters
* @return
*/
private Object[] getParameters(Class<?>[] parameterTypes,Object[] parameters){
if (parameters==null || parameters.length==0){
return parameters;
}else{
Object[] new_parameters = new Object[parameters.length];
for(int i=0;i<parameters.length;i++){
new_parameters[i] = JSON.parseObject(parameters[i].toString(),parameterTypes[i]);
}
return new_parameters;
}
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.info(cause.getMessage());
ctx.close();
}
}
4、服務注冊
我們啟動了 Netty 通信服務器,并且把服務實現(xiàn)類加載到緩存,等待請求時調(diào)用。這一步,我們要進行服務注冊。為了簡單化處理,我們只注冊通信服務器的監(jiān)聽地址即可。
在上面代碼中,bind 之后我們執(zhí)行了registry.register(serverAddress);它的作用就是,將 Netty 監(jiān)聽的 IP 端口注冊到 zookeeper。
package com.viewscenes.netsupervisor.registry;
@Component
public class ServiceRegistry {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${registry.address}")
private String registryAddress;
private static final String ZK_REGISTRY_PATH = "/rpc";
public void register(String data) {
if (data != null) {
ZkClient client = connectServer();
if (client != null) {
AddRootNode(client);
createNode(client, data);
}
}
}
//連接zookeeper
private ZkClient connectServer() {
ZkClient client = new ZkClient(registryAddress,20000,20000);
return client;
}
//創(chuàng)建根目錄/rpc
private void AddRootNode(ZkClient client){
boolean exists = client.exists(ZK_REGISTRY_PATH);
if (!exists){
client.createPersistent(ZK_REGISTRY_PATH);
logger.info("創(chuàng)建zookeeper主節(jié)點 {}",ZK_REGISTRY_PATH);
}
}
//在/rpc根目錄下,創(chuàng)建臨時順序子節(jié)點
private void createNode(ZkClient client, String data) {
String path = client.create(ZK_REGISTRY_PATH + "/provider", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
logger.info("創(chuàng)建zookeeper數(shù)據(jù)節(jié)點 ({} => {})", path, data);
}
}
有一點需要注意,子節(jié)點必須是臨時節(jié)點。這樣,生產(chǎn)者端停掉之后,才能通知到消費者,把此服務從服務列表中剔除。到此為止,生產(chǎn)者端已經(jīng)完成。我們看一下它的啟動日志:
加載服務類: com.viewscenes.netsupervisor.service.InfoUserService
已加載全部服務接口:{com.viewscenes.netsupervisor.service.InfoUserService=com.viewscenes.netsupervisor.service.impl.InfoUserServiceImpl@46cc127b}
Initializing ExecutorService 'applicationTaskExecutor'
Tomcat started on port(s): 8001 (http) with context path ''
Started RpcProviderApplication in 2.003 seconds (JVM running for 3.1)
RPC 服務器啟動.監(jiān)聽端口:18868
Starting ZkClient event thread.
Socket connection established to node1/192.168.174.10:2181, initiating session
Session establishment complete on server node1/192.168.174.10:2181, sessionid = 0x367835b48970010, negotiated timeout = 4000
zookeeper state changed (SyncConnected)
創(chuàng)建zookeeper主節(jié)點 /rpc
創(chuàng)建zookeeper數(shù)據(jù)節(jié)點 (/rpc/provider0000000000 => 192.168.210.81:18868)
四、RPC 消費者
首先,我們需要把生產(chǎn)者端的服務接口API,即InfoUserService。以相同的目錄放到消費者端。路徑不同,調(diào)用會找不到的哦(實際項目上,是通過依賴 jar 包實現(xiàn)的)。
1、代理
RPC的目標其中有一條,程序員無需額外地為這個交互作用編程。所以,我們在調(diào)用的時候,就像調(diào)用本地方法一樣。就像下面這樣:
@Controller
public class IndexController {
@Autowired
private InfoUserService userService;
@RequestMapping("getById")
@ResponseBody
public InfoUser getById(String id){
logger.info("根據(jù)ID查詢用戶信息:{}",id);
return userService.getInfoUserById(id);
}
}
那么,問題來了。消費者端并沒有此接口的實現(xiàn),怎么調(diào)用到的呢?這里,首先就是代理。這里用的是 Spring 的工廠 Bean 機制創(chuàng)建的代理對象(JDK 動態(tài)代理),類似于 MyBatis 中的 Mapper 接口的調(diào)用。
首先,創(chuàng)建代理類(必須實現(xiàn)InvocationHandler):
package com.viewscenes.netsupervisor.configurer.rpc;
@Component
public class RpcFactory implements InvocationHandler {
@Autowired
private NettyClient client;
Logger logger = LoggerFactory.getLogger(this.getClass());
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Request request = new Request();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameters(args);
request.setParameterTypes(method.getParameterTypes());
request.setId(IdUtil.getId());
Object result = client.send(request);
Class<?> returnType = method.getReturnType();
Response response = JSON.parseObject(result.toString(), Response.class);
if (response.getCode()==1){
throw new Exception(response.getError_msg());
}
if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)){
return response.getData();
}else if (Collection.class.isAssignableFrom(returnType)){
return JSONArray.parseArray(response.getData().toString(),Object.class);
}else if(Map.class.isAssignableFrom(returnType)){
return JSON.parseObject(response.getData().toString(),Map.class);
}else{
Object data = response.getData();
return JSONObject.parseObject(data.toString(), returnType);
}
}
}
這個代理類的invoke方法,會將客戶端的Request發(fā)送給Netty 服務端,并接收服務端的返回值。
定義一個 RPC 工廠 Bean:
package com.viewscenes.netsupervisor.configurer.rpc;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.AbstractBeanFactory;
import org.springframework.context.support.AbstractApplicationContext;
import java.lang.reflect.Proxy;
/**
* Created by MACHENIKE on 2018-12-03.
*/
public class RpcFactoryBean<T> implements FactoryBean<T> {
private Class<T> rpcInterface;
@Autowired
private RpcFactory<T> factory;
/**
* {@link AbstractApplicationContext#registerBeanPostProcessors(org.springframework.beans.factory.config.ConfigurableListableBeanFactory)}
* 會通過 rpcInterface 實例化 RpcFactoryBean
* @param rpcInterface
*/
public RpcFactoryBean(Class<T> rpcInterface) {
this.rpcInterface = rpcInterface;
}
/**
* 把 Bean 的定義 GenericBeanDefinition 放到了容器之后,就需要初始化這些 Bean,
* 而 Bean 的初始化時機有2個:
* 1、在程序第一個主動調(diào)用 getBean 的時候
* 2、在完成容器初始化的時候會初始化 lazy-init 配置為 false 的Bean(默認為false)
* 在這里,由于 RpcFactoryBean 未設(shè)置懶加載,故初始化的時機是第二種。上面兩種初始化的過程都是一樣的,
* 都會調(diào)用 {@link AbstractBeanFactory#doGetBean(java.lang.String, java.lang.Class, java.lang.Object[], boolean) 方法,
* 里面有個方法 getObjectForBeanInstance,會判斷當前的 Bean 是否實現(xiàn)了 {@link FactoryBean}。
* 如果該 Bean 未實現(xiàn) FactoryBean 接口,則直接返回該Bean實例;
* 如果該 Bean 實現(xiàn)了 FactoryBean 接口,則會返回的實例是 getObject() 返回值。
* @return
* @throws Exception
*/
public T getObject() throws Exception {
return getRpc();
}
public Class<?> getObjectType() {
return this.rpcInterface;
}
public boolean isSingleton() {
return true;
}
/**
* JDK 動態(tài)代理,當調(diào)用 rpcInterface 接口的方法時,會走 factory 的 invoke 方法
* @param <T>
* @return
*/
public <T> T getRpc() {
return (T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] { rpcInterface },factory);
}
}
該類實現(xiàn)了FactoryBean接口,表示這個類是工廠 Bean,它在 Spring 容器存放的實例不是類本身,而是getObject的返回值。這里聲明了一個參數(shù)構(gòu)造器,目的是在程序啟動過程中,調(diào)用AbstractApplicationContext.refresh方法,refresh方法里面會走registerBeanPostProcessors方法,該方法會通過反射,把rpcInterface傳過來實例化 RpcFactoryBean。
接下來,我們就要定義一個路徑掃描類,來掃描指定路徑下的接口,生成 Bean 的定義 BeanDefinition,并放進容器。
package com.viewscenes.netsupervisor.configurer.rpc;
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.annotation.ClassPathBeanDefinitionScanner;
import org.springframework.core.type.classreading.MetadataReader;
import org.springframework.core.type.classreading.MetadataReaderFactory;
import org.springframework.core.type.filter.AnnotationTypeFilter;
import org.springframework.core.type.filter.TypeFilter;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.util.Arrays;
import java.util.Set;
/**
* Created by MACHENIKE on 2018-12-03.
*/
public class ClassPathRpcScanner extends ClassPathBeanDefinitionScanner{
public ClassPathRpcScanner(BeanDefinitionRegistry registry) {
super(registry);
}
public Set<BeanDefinitionHolder> doScan(String... basePackages) {
// 獲取指定路徑下的 beanDefinitions
Set<BeanDefinitionHolder> beanDefinitions = super.doScan(basePackages);
if (beanDefinitions.isEmpty()) {
logger.warn("No RPC mapper was found in '"
+ Arrays.toString(basePackages)
+ "' package. Please check your configuration.");
} else {
// 對 beanDefinitions 進行注冊
processBeanDefinitions(beanDefinitions);
}
return beanDefinitions;
}
/**
* 方法會根據(jù)配置的屬性生成對應的過濾器,然后這些過濾器在掃描的時候會起作用。
*/
public void registerFilters() {
// default include filter that accepts all classes 接收所有接口
addIncludeFilter((metadataReader, metadataReaderFactory) ->
true);
// exclude package-info.java
addExcludeFilter((metadataReader, metadataReaderFactory) -> {
String className = metadataReader.getClassMetadata()
.getClassName();
return className.endsWith("package-info");
});
}
private void processBeanDefinitions(
Set<BeanDefinitionHolder> beanDefinitions) {
GenericBeanDefinition definition;
for (BeanDefinitionHolder holder : beanDefinitions) {
definition = (GenericBeanDefinition) holder.getBeanDefinition();
definition.getConstructorArgumentValues().addGenericArgumentValue(definition.getBeanClassName());
// 設(shè)置接口的 beanClass 都為 RpcFactoryBean<?>,因為 RpcFactoryBean 實現(xiàn)了 FactoryBean 接口,這樣初始化 Bean 時就會調(diào)用 getObject 方法
definition.setBeanClass(RpcFactoryBean.class);
// 設(shè)置BeanDefinition自動注入類型,這樣就能被 Spring 管理了
definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE);
System.out.println(holder);
}
}
@Override
protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
return beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata().isIndependent();
}
}
doScan方法,是掃描出指定路徑下的BeanDefinitionHolder,然后執(zhí)行processBeanDefinitions(beanDefinitions),對BeanDefinitionHolder進行注冊(放入容器管理)。
registerFilters方法的目的是,根據(jù)配置的屬性生成對應的過濾器,然后這些過濾器在掃描的時候會起作用。本案例中由于沒有配置任何屬性,故生成接收所有接口的過濾器。
接下來就是要將掃描器ClassPathRpcScanner放入配置類,讓其生效了。
package com.viewscenes.netsupervisor.configurer.rpc;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
/**
* Created by MACHENIKE on 2018-12-03.
*/
@Component
public class RpcScannerConfigurer implements BeanDefinitionRegistryPostProcessor {
// 這個可以存在配置文件
String basePackage = "com.viewscenes.netsupervisor.service";
/**
* 掃描指定路徑下的接口,生成 Bean 的定義 GenericBeanDefinition,并放到了容器
* @param beanDefinitionRegistry
* @throws BeansException
*/
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry beanDefinitionRegistry) throws BeansException {
ClassPathRpcScanner scanner = new ClassPathRpcScanner(beanDefinitionRegistry);
scanner.registerFilters();
scanner.scan(StringUtils.tokenizeToStringArray(this.basePackage, ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS));
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory configurableListableBeanFactory) throws BeansException {
}
}
該配置類實現(xiàn)BeanDefinitionRegistryPostProcessor,重寫postProcessBeanDefinitionRegistry方法。然后引入ClassPathRpcScanner,調(diào)用其registerFilters和scan方法進行BeanDefinition的注冊。
注冊完成之后,只是把 Bean 的定義BeanDefinition放到了容器,還沒有初始化這些 Bean,而 Bean 的初始化時機有2個:
- 1、在程序第一個主動調(diào)用
getBean的時候 - 2、在完成容器初始化的時候會初始化 lazy-init 配置為 false 的Bean(默認為false)
由于RpcFactoryBean未設(shè)置懶加載,故容器初始化完成的時候就會初始化RpcFactoryBean。初始化的過程中,會調(diào)用AbstractBeanFactory.getBean方法,這就涉及到 Spring IOC 中,創(chuàng)建 Bean 的過程,直接上源碼:
protected <T> T doGetBean(
final String name, final Class<T> requiredType, final Object[] args, boolean typeCheckOnly)
throws BeansException {
// 轉(zhuǎn)化beanName,因為 FactoryBean 是以“&”前綴的,需要去掉
final String beanName = transformedBeanName(name);
Object bean;
// 創(chuàng)建單例的 Bean
Object sharedInstance = getSingleton(beanName);
if (sharedInstance != null && args == null) {
if (logger.isDebugEnabled()) {
if (isSingletonCurrentlyInCreation(beanName)) {
logger.debug("Returning eagerly cached instance of singleton bean '" + beanName +
"' that is not fully initialized yet - a consequence of a circular reference");
}
else {
logger.debug("Returning cached instance of singleton bean '" + beanName + "'");
}
}
// 根據(jù)實例信息獲取真正的實例,因為 FactotyBean 的實例不是 sharedInstance,而是其 getObect() 的返回值
bean = getObjectForBeanInstance(sharedInstance, name, beanName, null);
}
......
// 校驗類型與實例是否匹配
if (requiredType != null && bean != null && !requiredType.isInstance(bean)) {
try {
return getTypeConverter().convertIfNecessary(bean, requiredType);
}
catch (TypeMismatchException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to convert bean '" + name + "' to required type '" +
ClassUtils.getQualifiedName(requiredType) + "'", ex);
}
throw new BeanNotOfRequiredTypeException(name, requiredType, bean.getClass());
}
}
return (T) bean;
}
doGetBean里面有個方法 getObjectForBeanInstance,會判斷當前的 Bean 是否實現(xiàn)了FactoryBean。
如果該 Bean 未實現(xiàn)FactoryBean接口,則直接返回該 Bean 實例;
如果該 Bean 實現(xiàn)了FactoryBean接口,則會返回的實例是getObject()返回值。
protected Object getObjectForBeanInstance(
Object beanInstance, String name, String beanName, RootBeanDefinition mbd) {
// 非 FactoryBean 不能以“&”為前綴
if (BeanFactoryUtils.isFactoryDereference(name) && !(beanInstance instanceof FactoryBean)) {
throw new BeanIsNotAFactoryException(transformedBeanName(name), beanInstance.getClass());
}
// 驗證是否是 FactoryBean,如果不是,直接返回實例
if (!(beanInstance instanceof FactoryBean) || BeanFactoryUtils.isFactoryDereference(name)) {
return beanInstance;
}
Object object = null;
if (mbd == null) {
object = getCachedObjectForFactoryBean(beanName);
}
if (object == null) {
// Return bean instance from factory.
FactoryBean<?> factory = (FactoryBean<?>) beanInstance;
// Caches object obtained from FactoryBean if it is a singleton.
if (mbd == null && containsBeanDefinition(beanName)) {
mbd = getMergedLocalBeanDefinition(beanName);
}
boolean synthetic = (mbd != null && mbd.isSynthetic());
// 返回 FactoryBean 實例
object = getObjectFromFactoryBean(factory, beanName, !synthetic);
}
return object;
}
由于我們的RpcFactoryBean實現(xiàn)了FactoryBean,故其初始化的過程中,返回的實例是getObject()的返回值。我們可以看到,getObject()的實現(xiàn)使用了 JDK 動態(tài)代理(T) Proxy.newProxyInstance(rpcInterface.getClassLoader(), new Class[] { rpcInterface },factory),返回值為被代理對象rpcInterface的實例。于是,當rpcInterface接口調(diào)用其方法時,就會走RpcFactory.invoke方法。在這里,封裝請求信息,然后調(diào)用 Netty 的客戶端方法發(fā)送消息。然后根據(jù)方法返回值類型,轉(zhuǎn)成相應的對象返回。
2、服務發(fā)現(xiàn)
在生產(chǎn)者端,我們把服務 IP、端口都注冊到 zookeeper 中,所以這里,我們要去拿到服務地址,然后通過 Netty 連接。重要的是,還要對根目錄進行監(jiān)聽子節(jié)點數(shù)據(jù)變化,這樣隨著生產(chǎn)者的上線和下線,消費者端可以及時感知。
package com.viewscenes.netsupervisor.connection;
@Component
public class ServiceDiscovery {
@Value("${registry.address}")
private String registryAddress;
@Autowired
ConnectManage connectManage;
// 服務地址列表
private volatile List<String> addressList = new ArrayList<>();
private static final String ZK_REGISTRY_PATH = "/rpc";
private ZkClient client;
Logger logger = LoggerFactory.getLogger(this.getClass());
@PostConstruct
public void init(){
client = connectServer();
if (client != null) {
watchNode(client);
}
}
//連接zookeeper
private ZkClient connectServer() {
ZkClient client = new ZkClient(registryAddress,30000,30000);
return client;
}
//監(jiān)聽子節(jié)點變化(子節(jié)點的增加和刪除)
private void watchNode(final ZkClient client) {
List<String> nodeList = client.subscribeChildChanges(ZK_REGISTRY_PATH, (s, nodes) -> {
logger.info("監(jiān)聽到子節(jié)點變化{}",JSONObject.toJSONString(nodes));
addressList.clear();
getNodeData(nodes);
updateConnectedServer();
});
getNodeData(nodeList);
logger.info("已發(fā)現(xiàn)服務列表...{}", JSONObject.toJSONString(addressList));
updateConnectedServer();
}
//連接生產(chǎn)者端服務
private void updateConnectedServer(){
connectManage.updateConnectServer(addressList);
}
private void getNodeData(List<String> nodes){
logger.info("/rpc子節(jié)點數(shù)據(jù)為:{}", JSONObject.toJSONString(nodes));
for(String node:nodes){
String address = client.readData(ZK_REGISTRY_PATH+"/"+node);
addressList.add(address);
}
}
}
其中,connectManage.updateConnectServer(addressList);就是根據(jù)服務地址,去連接生產(chǎn)者端的 Netty 服務。然后創(chuàng)建一個 Channel 列表,在發(fā)送消息的時候,從中選取一個 Channel 和生產(chǎn)者端進行通信。
3、Netty 客戶端
Netty 客戶端有兩個方法比較重要,一個是doConnect:根據(jù)IP、端口連接服務器,返回Channel,加入到連接管理器;一個是send:用 Channel 發(fā)送請求數(shù)據(jù)。同時,作為客戶端,空閑的時候還要往服務端發(fā)送心跳信息。
package com.viewscenes.netsupervisor.netty.client;
import com.alibaba.fastjson.JSONArray;
import com.viewscenes.netsupervisor.connection.ConnectManage;
import com.viewscenes.netsupervisor.entity.Request;
import com.viewscenes.netsupervisor.entity.Response;
import com.viewscenes.netsupervisor.netty.codec.json.JSONDecoder;
import com.viewscenes.netsupervisor.netty.codec.json.JSONEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* Created by MACHENIKE on 2018-12-03.
*/
@Component
public class NettyClient {
Logger logger = LoggerFactory.getLogger(this.getClass());
private EventLoopGroup group = new NioEventLoopGroup(1);
private Bootstrap bootstrap = new Bootstrap();
@Autowired
NettyClientHandler clientHandler;
@Autowired
ConnectManage connectManage;
public NettyClient(){
bootstrap.group(group).
channel(NioSocketChannel.class).
option(ChannelOption.TCP_NODELAY, true).
option(ChannelOption.SO_KEEPALIVE,true).
handler(new ChannelInitializer<SocketChannel>() {
//創(chuàng)建NIOSocketChannel成功后,在進行初始化時,將它的ChannelHandler設(shè)置到ChannelPipeline中,用于處理網(wǎng)絡IO事件
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 30));
pipeline.addLast(new JSONEncoder());
pipeline.addLast(new JSONDecoder());
pipeline.addLast(new HeartBeatHandler());
pipeline.addLast(clientHandler);
}
});
}
@PreDestroy
public void destroy(){
logger.info("RPC客戶端退出,釋放資源!");
group.shutdownGracefully();
}
public Object send(Request request) throws InterruptedException{
Channel channel = connectManage.chooseChannel();
if (channel!=null && channel.isActive()) {
SynchronousQueue<Object> queue = clientHandler.sendRequest(request,channel);
Object result = queue.take();
return JSONArray.toJSONString(result);
}else{
Response res = new Response();
res.setCode(1);
res.setError_msg("未正確連接到服務器.請檢查相關(guān)配置信息!");
return JSONArray.toJSONString(res);
}
}
public Channel doConnect(SocketAddress address) throws InterruptedException {
ChannelFuture future = bootstrap.connect(address);
Channel channel = future.sync().channel();
return channel;
}
}
這里,我們依然定義了一個心跳機制處理器HeartBeatHandler,目的是在new IdleStateHandler(0, 0, 30)約定的30s內(nèi)客戶端未與服務端發(fā)生通信,為了告訴服務端該客戶端依然正常工作(因為服務端心跳檢測是60s),則客戶端需要發(fā)送心跳包給服務端。
package com.viewscenes.netsupervisor.netty.client;
import com.viewscenes.netsupervisor.entity.Request;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 用于檢測channel的心跳handler
* 繼承ChannelInboundHandlerAdapter,從而不需要實現(xiàn)channelRead0 方法
* @author K. L. Mao
* @create 2019/2/22
*/
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(HeartBeatHandler.class);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
logger.info("已超過30秒未與RPC服務器進行讀寫操作!將發(fā)送心跳消息...");
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state()== IdleState.ALL_IDLE){
Request request = new Request();
request.setMethodName("heartBeat");
ctx.channel().writeAndFlush(request);
}
}
}
}
我們必須重點關(guān)注send方法,它是在代理對象invoke方法調(diào)用到的。首先從連接器中輪詢選擇一個 Channel,然后發(fā)送數(shù)據(jù)。但是,Netty 是異步操作,我們還要轉(zhuǎn)為同步,就是說要等待生產(chǎn)者端返回數(shù)據(jù)才往下執(zhí)行。筆者在這里用的是同步隊列SynchronousQueue,它的take方法會阻塞在這里,直到里面有數(shù)據(jù)可讀。然后在處理器中,拿到返回信息寫到隊列中,take方法返回。
package com.viewscenes.netsupervisor.netty.client;
import com.alibaba.fastjson.JSON;
import com.viewscenes.netsupervisor.connection.ConnectManage;
import com.viewscenes.netsupervisor.entity.Request;
import com.viewscenes.netsupervisor.entity.Response;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
/**
* Created by MACHENIKE on 2018-12-03.
*/
@Component
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Autowired
NettyClient client;
@Autowired
ConnectManage connectManage;
Logger logger = LoggerFactory.getLogger(this.getClass());
private ConcurrentHashMap<String,SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>();
public void channelActive(ChannelHandlerContext ctx) {
logger.info("已連接到RPC服務器.{}",ctx.channel().remoteAddress());
}
public void channelInactive(ChannelHandlerContext ctx) {
InetSocketAddress address =(InetSocketAddress) ctx.channel().remoteAddress();
logger.info("與RPC服務器斷開連接."+address);
ctx.channel().close();
connectManage.removeChannel(ctx.channel());
}
/**
* 接收服務端返回信息
* @param ctx
* @param msg
* @throws Exception
*/
public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
Response response = JSON.parseObject(msg.toString(),Response.class);
String requestId = response.getRequestId();
SynchronousQueue<Object> queue = queueMap.get(requestId);
queue.put(response);
queueMap.remove(requestId);
}
public SynchronousQueue<Object> sendRequest(Request request,Channel channel) {
SynchronousQueue<Object> queue = new SynchronousQueue<>();
queueMap.put(request.getId(), queue);
channel.writeAndFlush(request);
return queue;
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
logger.info("RPC通信服務器發(fā)生異常.{}",cause);
ctx.channel().close();
}
}
至此,消費者端也基本完成。同樣的,我們先看一下啟動日志:
Waiting for keeper state SyncConnected
Opening socket connection to server 192.168.174.10:2181. Will not attempt to authenticate using SASL (unknown error)
Socket connection established to 192.168.174.10:2181, initiating session
Session establishment complete on server 192.168.174.10:2181, sessionid = 0x100000273ba002c, negotiated timeout = 20000
zookeeper state changed (SyncConnected)
/rpc子節(jié)點數(shù)據(jù)為:["provider0000000015"]
已發(fā)現(xiàn)服務列表...["192.168.210.81:18868"]
加入Channel到連接管理器./192.168.100.74:18868
已連接到RPC服務器./192.168.210.81:18868
Initializing ExecutorService 'applicationTaskExecutor'
Tomcat started on port(s): 7002 (http) with context path ''
Started RpcConsumerApplication in 4.218 seconds (JVM running for 5.569)
五、總結(jié)
本文簡單介紹了 RPC 的整個流程,如果你正在學習 RPC 的相關(guān)知識,可以根據(jù)文中的例子,自己實現(xiàn)一遍。相信寫完之后,你會對 RPC 會有更深一些的認識。
生產(chǎn)者端流程:
- 加載服務,并緩存
- 啟動通訊服務器(Netty)
- 服務注冊(把通訊地址放入 zookeeper,也可以把加載到的服務也放進去)
- 反射,本地調(diào)用
消費者端流程:
- 代理服務接口
- 服務發(fā)現(xiàn)(連接 zookeeper,拿到服務地址列表)
- 遠程調(diào)用(輪詢生產(chǎn)者服務列表,發(fā)送消息)
消費端調(diào)用服務流程:由于消費端使用了 JDK 動態(tài)代理(默認是使用 javassist 生成字節(jié)碼包做的代理),代理了服務接口,于是當調(diào)用服務接口時,會走代理類的invoke方法,invoke方法會將接口信息通過 Netty 發(fā)送給服務端,服務端首先通過接口名找到其實現(xiàn)類(內(nèi)部保存了映射關(guān)系),然后通過反射執(zhí)行本地實現(xiàn)類的方法,最后將返回結(jié)果通過 Netty 發(fā)送給消費端。