第一篇已經(jīng)分析了CIM-client功能和設(shè)計(jì)。其中也提到了client需要向router注冊(cè),獲取可用的服務(wù)器(負(fù)載均衡),線(xiàn)上運(yùn)維(統(tǒng)計(jì)在線(xiàn)人數(shù),模糊查找)。那么這篇注重看看router的設(shè)計(jì)和實(shí)現(xiàn)。
cim github地址: https://github.com/crossoverJie/cim
1. 協(xié)議
1.1 請(qǐng)求協(xié)議
請(qǐng)求協(xié)議的類(lèi)圖結(jié)構(gòu)如下:

image.png
其中基類(lèi)
BaseRequest的實(shí)現(xiàn)(在client篇里面也談到過(guò))如下:
public class BaseRequest {
//請(qǐng)求序列號(hào)
private String reqNo;
// 請(qǐng)求時(shí)間戳
private int timeStamp;
}
-
ChatReqVO增加了userId,msg字段,用來(lái)表示抽象聊天的數(shù)據(jù)請(qǐng)求。 -
LoginReqVO增加了userId,userName字段,用來(lái)表示登陸的數(shù)據(jù)請(qǐng)求。 -
P2PReqVO增加了userId,receiveUserId,msg字段,用來(lái)表示私聊的數(shù)據(jù)請(qǐng)求。
1.2 響應(yīng)協(xié)議
-
CIMServerResVO包含了ip,cimServerPort,httpPort這三個(gè)字段,用來(lái)表示獲取某個(gè)服務(wù)的ip+prot數(shù)據(jù)請(qǐng)求。 -
RegisterInfoResVO包含了userId,userName用來(lái)表示用戶(hù)注冊(cè)的某個(gè)服務(wù)。
2. 程序運(yùn)行流程
public class RouteApplication implements CommandLineRunner{
private final static Logger LOGGER = LoggerFactory.getLogger(RouteApplication.class);
public static void main(String[] args) {
SpringApplication.run(RouteApplication.class, args);
LOGGER.info("啟動(dòng) route 成功");
}
@Override
public void run(String... args) throws Exception {
//監(jiān)聽(tīng)服務(wù)
Thread thread = new Thread(new ServerListListener());
thread.setName("zk-listener");
thread.start() ;
}
}
- 標(biāo)準(zhǔn)的Springboot應(yīng)用啟動(dòng),并在容器啟動(dòng)后,啟動(dòng)一個(gè)線(xiàn)程去向ZK注冊(cè)監(jiān)聽(tīng)器。
public class ServerListListener implements Runnable{
private static Logger logger = LoggerFactory.getLogger(ServerListListener.class);
private ZKit zkUtil;
private AppConfiguration appConfiguration ;
public ServerListListener() {
zkUtil = SpringBeanFactory.getBean(ZKit.class) ;
appConfiguration = SpringBeanFactory.getBean(AppConfiguration.class) ;
}
@Override
public void run() {
//注冊(cè)監(jiān)聽(tīng)服務(wù)
zkUtil.subscribeEvent(appConfiguration.getZkRoot());
}
}
// 當(dāng)獲取ZK中root節(jié)點(diǎn)發(fā)生變更(增刪改)后更新本地ServerCache
public void subscribeEvent(String path) {
zkClient.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
logger.info("清除/更新本地緩存 parentPath=【{}】,currentChilds=【{}】", parentPath,currentChilds.toString());
//更新所有緩存/先刪除 再新增
serverCache.updateCache(currentChilds) ;
}
});
}
ServerCache 保存的是ZK根目錄下的所有注冊(cè)的服務(wù)器ip,這樣的目的在于每次都緩存了所有的服務(wù)節(jié)點(diǎn),而不用每次都向ZK請(qǐng)求,減少網(wǎng)絡(luò)請(qǐng)求次數(shù)。
3. 對(duì)外提供的Http服務(wù)
router就是對(duì)外提供的http服務(wù),下面介紹它對(duì)外提供服務(wù)的具體實(shí)現(xiàn)
3.1 注冊(cè)服務(wù)
//提供的http接口
@ApiOperation("注冊(cè)賬號(hào)")
@RequestMapping(value = "registerAccount", method = RequestMethod.POST)
@ResponseBody()
public BaseResponse<RegisterInfoResVO> registerAccount(@RequestBody RegisterInfoReqVO registerInfoReqVO) throws Exception {
BaseResponse<RegisterInfoResVO> res = new BaseResponse();
long userId = System.currentTimeMillis();
RegisterInfoResVO info = new RegisterInfoResVO(userId, registerInfoReqVO.getUserName());
info = accountService.register(info);
res.setDataBody(info);
res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
return res;
}
public RegisterInfoResVO register(RegisterInfoResVO info) {
String key = ACCOUNT_PREFIX + info.getUserId();
String name = redisTemplate.opsForValue().get(info.getUserName());
if (null == name) {
//為了方便查詢(xún),冗余一份
redisTemplate.opsForValue().set(key, info.getUserName());
redisTemplate.opsForValue().set(info.getUserName(), key);
} else {
//已經(jīng)存在
long userId = Long.parseLong(name.split(":")[1]);
info.setUserId(userId);
info.setUserName(info.getUserName());
}
return info;
}
3.2 獲取所有的在線(xiàn)用戶(hù)
@ApiOperation("獲取所有在線(xiàn)用戶(hù)")
@RequestMapping(value = "onlineUser", method = RequestMethod.POST)
@ResponseBody()
public BaseResponse<Set<CIMUserInfo>> onlineUser() throws Exception {
BaseResponse<Set<CIMUserInfo>> res = new BaseResponse();
Set<CIMUserInfo> cimUserInfos = userInfoCacheService.onlineUser();
res.setDataBody(cimUserInfos) ;
res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
return res;
}
public Set<CIMUserInfo> onlineUser() {
Set<CIMUserInfo> set = null ;
Set<String> members = redisTemplate.opsForSet().members(LOGIN_STATUS_PREFIX);
for (String member : members) {
if (set == null){
set = new HashSet<>(64) ;
}
//通過(guò)usrid獲取到UserInfo
CIMUserInfo cimUserInfo = loadUserInfoByUserId(Long.valueOf(member)) ;
set.add(cimUserInfo) ;
}
return set;
}
通過(guò)LOGIN_STATUS_PREFIX記錄所有的登陸用戶(hù),因此獲取到這個(gè)Set集合就行。
3.3 登陸并獲取到可用的一個(gè)服務(wù)節(jié)點(diǎn)
@ApiOperation("登錄并獲取服務(wù)器")
@RequestMapping(value = "login", method = RequestMethod.POST)
@ResponseBody()
public BaseResponse<CIMServerResVO> login(@RequestBody LoginReqVO loginReqVO) throws Exception {
BaseResponse<CIMServerResVO> res = new BaseResponse();
//登錄校驗(yàn),如果登陸成功,則保存登陸狀態(tài)
StatusEnum status = accountService.login(loginReqVO);
if (status == StatusEnum.SUCCESS) {
String server = routeHandle.routeServer(serverCache.getAll(),String.valueOf(loginReqVO.getUserId()));
String[] serverInfo = server.split(":");
//下面講到一致性hash算法
CIMServerResVO vo = new CIMServerResVO(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2]));
//保存路由信息,即把(userid,server)對(duì)應(yīng)起來(lái)
accountService.saveRouteInfo(loginReqVO,server);
res.setDataBody(vo);
}
res.setCode(status.getCode());
res.setMessage(status.getMessage());
return res;
}
- 這里選擇服務(wù)器的形式有3中,如下圖:
image.png
其中,LoopHandle就是對(duì)輪詢(xún)的形式獲取到服務(wù)節(jié)點(diǎn),RandomHandle就是隨機(jī)獲取到服務(wù)節(jié)點(diǎn),ConsistentHashHandle就是通過(guò)一致性hash獲取到服務(wù)節(jié)點(diǎn)。關(guān)于一致性hash,有兩種實(shí)現(xiàn)形式,如下:
public class TreeMapConsistentHash extends AbstractConsistentHash {
//通過(guò)treemap來(lái)實(shí)現(xiàn)
private TreeMap<Long,String> treeMap = new TreeMap<Long, String>() ;
/**
* 虛擬節(jié)點(diǎn)數(shù)量
*/
private static final int VIRTUAL_NODE_SIZE = 2 ;
//加入虛擬節(jié)點(diǎn)
@Override
public void add(long key, String value) {
for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
Long hash = super.hash("vir" + key + i);
treeMap.put(hash,value);
}
treeMap.put(key, value);
}
@Override
public String getFirstNodeValue(String value) {
long hash = super.hash(value);
System.out.println("value=" + value + " hash = " + hash);
//返回大于等于value的視圖SortedMap
SortedMap<Long, String> last = treeMap.tailMap(hash);
if (!last.isEmpty()) {
//返回第一個(gè)key大于value的對(duì)應(yīng)map里面保存的server
return last.get(last.firstKey());
}
//如果沒(méi)有,則返回第一個(gè)
return treeMap.firstEntry().getValue();
}
}
//用Node數(shù)組實(shí)現(xiàn)的
public class SortArrayMapConsistentHash extends AbstractConsistentHash {
private SortArrayMap sortArrayMap = new SortArrayMap();
/**
* 虛擬節(jié)點(diǎn)數(shù)量
*/
private static final int VIRTUAL_NODE_SIZE = 2 ;
@Override
public void add(long key, String value) {
for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
Long hash = super.hash("vir" + key + i);
sortArrayMap.add(hash,value);
}
sortArrayMap.add(key, value);
}
//Arrays的sort
@Override
public void sort() {
sortArrayMap.sort();
}
//順時(shí)針找第一個(gè)比給定key大的server
@Override
public String getFirstNodeValue(String value) {
long hash = super.hash(value);
System.out.println("value=" + value + " hash = " + hash);
return sortArrayMap.firstNodeValue(hash);
}
}
- 上面是兩種一種一致性hash的實(shí)現(xiàn),都是加入虛擬節(jié)點(diǎn),然后把尋找第一個(gè)比給定value大的節(jié)點(diǎn),返回該虛擬節(jié)點(diǎn)對(duì)應(yīng)的value就行。
3.4 用戶(hù)下線(xiàn)
@ApiOperation("客戶(hù)端下線(xiàn)")
@RequestMapping(value = "offLine", method = RequestMethod.POST)
@ResponseBody()
public BaseResponse<NULLBody> offLine(@RequestBody ChatReqVO groupReqVO) throws Exception {
BaseResponse<NULLBody> res = new BaseResponse();
CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(groupReqVO.getUserId());
LOGGER.info("下線(xiàn)用戶(hù)[{}]", cimUserInfo.toString());
accountService.offLine(groupReqVO.getUserId());
res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
return res;
}
@Override
public void offLine(Long userId) throws Exception {
//刪除路由
redisTemplate.delete(ROUTE_PREFIX + userId) ;
//刪除登錄狀態(tài)
userInfoCacheService.removeLoginStatus(userId);
}
- 下線(xiàn)就是刪除,刪除登錄狀態(tài)就行。
3.5 群聊
public BaseResponse<NULLBody> groupRoute(@RequestBody ChatReqVO groupReqVO) throws Exception {
BaseResponse<NULLBody> res = new BaseResponse();
LOGGER.info("msg=[{}]", groupReqVO.toString());
//獲取所有的推送列表
Map<Long, CIMServerResVO> serverResVOMap = accountService.loadRouteRelated();
for (Map.Entry<Long, CIMServerResVO> cimServerResVOEntry : serverResVOMap.entrySet()) {
Long userId = cimServerResVOEntry.getKey();
CIMServerResVO value = cimServerResVOEntry.getValue();
if (userId.equals(groupReqVO.getUserId())){
//過(guò)濾掉自己
CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(groupReqVO.getUserId());
LOGGER.warn("過(guò)濾掉了發(fā)送者 userId={}",cimUserInfo.toString());
continue;
}
//推送消息
String url = "http://" + value.getIp() + ":" + value.getHttpPort() + "/sendMsg" ;
ChatReqVO chatVO = new ChatReqVO(userId,groupReqVO.getMsg()) ;
accountService.pushMsg(url,groupReqVO.getUserId(),chatVO);
}
res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
return res;
}
//掃描以ROUTE_PREFIX開(kāi)始的字符串,這個(gè)字符串對(duì)應(yīng)著每個(gè)用戶(hù)使用的server
public Map<Long, CIMServerResVO> loadRouteRelated() {
Map<Long, CIMServerResVO> routes = new HashMap<>(64);
RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
ScanOptions options = ScanOptions.scanOptions()
.match(ROUTE_PREFIX + "*")
.build();
Cursor<byte[]> scan = connection.scan(options);
while (scan.hasNext()) {
byte[] next = scan.next();
String key = new String(next, StandardCharsets.UTF_8);
LOGGER.info("key={}", key);
parseServerInfo(routes, key);
}
try {
scan.close();
} catch (IOException e) {
LOGGER.error("IOException",e);
}
return routes;
}
- 上面就是找到找到每個(gè)節(jié)點(diǎn)的對(duì)應(yīng)的服務(wù)節(jié)點(diǎn)
public void pushMsg(String url, long sendUserId, ChatReqVO groupReqVO) throws Exception {
CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(sendUserId);
JSONObject jsonObject = new JSONObject();
jsonObject.put("msg", cimUserInfo.getUserName() + ":【" + groupReqVO.getMsg() + "】");
jsonObject.put("userId", groupReqVO.getUserId());
RequestBody requestBody = RequestBody.create(mediaType, jsonObject.toString());
Request request = new Request.Builder()
.url(url)
.post(requestBody)
.build();
Response response = okHttpClient.newCall(request).execute();
try {
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
}finally {
response.body().close();
}
}
- 發(fā)送到具體的Server,讓sever負(fù)責(zé)廣播它收到的群聊消息。
3.6 私聊
@ApiOperation("私聊 API")
@RequestMapping(value = "p2pRoute", method = RequestMethod.POST)
@ResponseBody()
public BaseResponse<NULLBody> p2pRoute(@RequestBody P2PReqVO p2pRequest) throws Exception {
BaseResponse<NULLBody> res = new BaseResponse();
try {
//獲取接收消息用戶(hù)的路由信息
CIMServerResVO cimServerResVO = accountService.loadRouteRelatedByUserId(p2pRequest.getReceiveUserId());
//推送消息
String url = "http://" + cimServerResVO.getIp() + ":" + cimServerResVO.getHttpPort() + "/sendMsg" ;
//p2pRequest.getReceiveUserId()==>消息接收者的 userID
ChatReqVO chatVO = new ChatReqVO(p2pRequest.getReceiveUserId(),p2pRequest.getMsg()) ;
accountService.pushMsg(url,p2pRequest.getUserId(),chatVO);
res.setCode(StatusEnum.SUCCESS.getCode());
res.setMessage(StatusEnum.SUCCESS.getMessage());
}catch (CIMException e){
res.setCode(e.getErrorCode());
res.setMessage(e.getErrorMessage());
}
return res;
}
- 與群聊很相似,只不過(guò)這里是一個(gè)用戶(hù)。都是定位具體的sever,然后再根據(jù)登陸在server的消息,發(fā)送消息到具體的channel就可以。
總結(jié)
至此,router分析也完成了,router的實(shí)現(xiàn)主要是獲取到server,同過(guò)http調(diào)用,從而發(fā)送消息。這個(gè)過(guò)程中,也涉及到負(fù)載均衡,在用戶(hù)注冊(cè)的時(shí)候,盡可能均衡分布。這里用了輪詢(xún),隨機(jī),一致性hash這三種負(fù)載均衡算法。而且后面也方便拓展。
