CIM-router功能和設(shè)計(jì)分析

第一篇已經(jīng)分析了CIM-client功能和設(shè)計(jì)。其中也提到了client需要向router注冊(cè),獲取可用的服務(wù)器(負(fù)載均衡),線(xiàn)上運(yùn)維(統(tǒng)計(jì)在線(xiàn)人數(shù),模糊查找)。那么這篇注重看看router的設(shè)計(jì)和實(shí)現(xiàn)。
cim github地址: https://github.com/crossoverJie/cim

1. 協(xié)議
1.1 請(qǐng)求協(xié)議

請(qǐng)求協(xié)議的類(lèi)圖結(jié)構(gòu)如下:

image.png

其中基類(lèi)BaseRequest的實(shí)現(xiàn)(在client篇里面也談到過(guò))如下:

public class BaseRequest {
  //請(qǐng)求序列號(hào)
    private String reqNo;
// 請(qǐng)求時(shí)間戳
    private int timeStamp;
}
  • ChatReqVO增加了userId,msg字段,用來(lái)表示抽象聊天的數(shù)據(jù)請(qǐng)求。
  • LoginReqVO 增加了userId,userName字段,用來(lái)表示登陸的數(shù)據(jù)請(qǐng)求。
  • P2PReqVO 增加了userId,receiveUserId,msg字段,用來(lái)表示私聊的數(shù)據(jù)請(qǐng)求。
1.2 響應(yīng)協(xié)議
  • CIMServerResVO 包含了ip,cimServerPort,httpPort這三個(gè)字段,用來(lái)表示獲取某個(gè)服務(wù)的ip+prot數(shù)據(jù)請(qǐng)求。
  • RegisterInfoResVO包含了userId,userName用來(lái)表示用戶(hù)注冊(cè)的某個(gè)服務(wù)。
2. 程序運(yùn)行流程
public class RouteApplication implements CommandLineRunner{

    private final static Logger LOGGER = LoggerFactory.getLogger(RouteApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(RouteApplication.class, args);
        LOGGER.info("啟動(dòng) route 成功");
    }

    @Override
    public void run(String... args) throws Exception {

        //監(jiān)聽(tīng)服務(wù)
        Thread thread = new Thread(new ServerListListener());
        thread.setName("zk-listener");
        thread.start() ;
    }
}
  • 標(biāo)準(zhǔn)的Springboot應(yīng)用啟動(dòng),并在容器啟動(dòng)后,啟動(dòng)一個(gè)線(xiàn)程去向ZK注冊(cè)監(jiān)聽(tīng)器。
public class ServerListListener implements Runnable{

    private static Logger logger = LoggerFactory.getLogger(ServerListListener.class);

    private ZKit zkUtil;

    private AppConfiguration appConfiguration ;


    public ServerListListener() {
        zkUtil = SpringBeanFactory.getBean(ZKit.class) ;
        appConfiguration = SpringBeanFactory.getBean(AppConfiguration.class) ;
    }

    @Override
    public void run() {
        //注冊(cè)監(jiān)聽(tīng)服務(wù)
        zkUtil.subscribeEvent(appConfiguration.getZkRoot());

    }
}
// 當(dāng)獲取ZK中root節(jié)點(diǎn)發(fā)生變更(增刪改)后更新本地ServerCache
public void subscribeEvent(String path) {
        zkClient.subscribeChildChanges(path, new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                logger.info("清除/更新本地緩存 parentPath=【{}】,currentChilds=【{}】", parentPath,currentChilds.toString());

                //更新所有緩存/先刪除 再新增
                serverCache.updateCache(currentChilds) ;
            }
        });


    }

ServerCache 保存的是ZK根目錄下的所有注冊(cè)的服務(wù)器ip,這樣的目的在于每次都緩存了所有的服務(wù)節(jié)點(diǎn),而不用每次都向ZK請(qǐng)求,減少網(wǎng)絡(luò)請(qǐng)求次數(shù)。

3. 對(duì)外提供的Http服務(wù)

router就是對(duì)外提供的http服務(wù),下面介紹它對(duì)外提供服務(wù)的具體實(shí)現(xiàn)

3.1 注冊(cè)服務(wù)
//提供的http接口
@ApiOperation("注冊(cè)賬號(hào)")
    @RequestMapping(value = "registerAccount", method = RequestMethod.POST)
    @ResponseBody()
    public BaseResponse<RegisterInfoResVO> registerAccount(@RequestBody RegisterInfoReqVO registerInfoReqVO) throws Exception {
        BaseResponse<RegisterInfoResVO> res = new BaseResponse();

        long userId = System.currentTimeMillis();
        RegisterInfoResVO info = new RegisterInfoResVO(userId, registerInfoReqVO.getUserName());
        info = accountService.register(info);

        res.setDataBody(info);
        res.setCode(StatusEnum.SUCCESS.getCode());
        res.setMessage(StatusEnum.SUCCESS.getMessage());
        return res;
    }

 public RegisterInfoResVO register(RegisterInfoResVO info) {
        String key = ACCOUNT_PREFIX + info.getUserId();

        String name = redisTemplate.opsForValue().get(info.getUserName());
        if (null == name) {
            //為了方便查詢(xún),冗余一份
            redisTemplate.opsForValue().set(key, info.getUserName());
            redisTemplate.opsForValue().set(info.getUserName(), key);
        } else {
          //已經(jīng)存在
            long userId = Long.parseLong(name.split(":")[1]);
            info.setUserId(userId);
            info.setUserName(info.getUserName());
        }

        return info;
    }
3.2 獲取所有的在線(xiàn)用戶(hù)
 @ApiOperation("獲取所有在線(xiàn)用戶(hù)")
    @RequestMapping(value = "onlineUser", method = RequestMethod.POST)
    @ResponseBody()
    public BaseResponse<Set<CIMUserInfo>> onlineUser() throws Exception {
        BaseResponse<Set<CIMUserInfo>> res = new BaseResponse();

        Set<CIMUserInfo> cimUserInfos = userInfoCacheService.onlineUser();
        res.setDataBody(cimUserInfos) ;
        res.setCode(StatusEnum.SUCCESS.getCode());
        res.setMessage(StatusEnum.SUCCESS.getMessage());
        return res;
    }

public Set<CIMUserInfo> onlineUser() {
        Set<CIMUserInfo> set = null ;
        Set<String> members = redisTemplate.opsForSet().members(LOGIN_STATUS_PREFIX);
        for (String member : members) {
            if (set == null){
                set = new HashSet<>(64) ;
            }
            //通過(guò)usrid獲取到UserInfo
            CIMUserInfo cimUserInfo = loadUserInfoByUserId(Long.valueOf(member)) ;
            set.add(cimUserInfo) ;
        }

        return set;
    }

通過(guò)LOGIN_STATUS_PREFIX記錄所有的登陸用戶(hù),因此獲取到這個(gè)Set集合就行。

3.3 登陸并獲取到可用的一個(gè)服務(wù)節(jié)點(diǎn)
@ApiOperation("登錄并獲取服務(wù)器")
    @RequestMapping(value = "login", method = RequestMethod.POST)
    @ResponseBody()
    public BaseResponse<CIMServerResVO> login(@RequestBody LoginReqVO loginReqVO) throws Exception {
        BaseResponse<CIMServerResVO> res = new BaseResponse();

        //登錄校驗(yàn),如果登陸成功,則保存登陸狀態(tài)
        StatusEnum status = accountService.login(loginReqVO);
        if (status == StatusEnum.SUCCESS) {

            String server = routeHandle.routeServer(serverCache.getAll(),String.valueOf(loginReqVO.getUserId()));
            String[] serverInfo = server.split(":");
            //下面講到一致性hash算法
            CIMServerResVO vo = new CIMServerResVO(serverInfo[0], Integer.parseInt(serverInfo[1]),Integer.parseInt(serverInfo[2]));

            //保存路由信息,即把(userid,server)對(duì)應(yīng)起來(lái)
            accountService.saveRouteInfo(loginReqVO,server);

            res.setDataBody(vo);

        }
        res.setCode(status.getCode());
        res.setMessage(status.getMessage());

        return res;
    }
  • 這里選擇服務(wù)器的形式有3中,如下圖:
    image.png

    其中,LoopHandle就是對(duì)輪詢(xún)的形式獲取到服務(wù)節(jié)點(diǎn),RandomHandle就是隨機(jī)獲取到服務(wù)節(jié)點(diǎn),ConsistentHashHandle就是通過(guò)一致性hash獲取到服務(wù)節(jié)點(diǎn)。關(guān)于一致性hash,有兩種實(shí)現(xiàn)形式,如下:
public class TreeMapConsistentHash extends AbstractConsistentHash {
    //通過(guò)treemap來(lái)實(shí)現(xiàn)
    private TreeMap<Long,String> treeMap = new TreeMap<Long, String>() ;

    /**
     * 虛擬節(jié)點(diǎn)數(shù)量
     */
    private static final int VIRTUAL_NODE_SIZE = 2 ;
    //加入虛擬節(jié)點(diǎn)
    @Override
    public void add(long key, String value) {
        for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
            Long hash = super.hash("vir" + key + i);
            treeMap.put(hash,value);
        }
        treeMap.put(key, value);
    }

    @Override
    public String getFirstNodeValue(String value) {
        long hash = super.hash(value);
        System.out.println("value=" + value + " hash = " + hash);
        //返回大于等于value的視圖SortedMap
        SortedMap<Long, String> last = treeMap.tailMap(hash);
        if (!last.isEmpty()) {
            //返回第一個(gè)key大于value的對(duì)應(yīng)map里面保存的server
            return last.get(last.firstKey());
        }
        //如果沒(méi)有,則返回第一個(gè)
        return treeMap.firstEntry().getValue();
    }
}

//用Node數(shù)組實(shí)現(xiàn)的
public class SortArrayMapConsistentHash extends AbstractConsistentHash {

    private SortArrayMap sortArrayMap = new SortArrayMap();

    /**
     * 虛擬節(jié)點(diǎn)數(shù)量
     */
    private static final int VIRTUAL_NODE_SIZE = 2 ;

    @Override
    public void add(long key, String value) {
        for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
            Long hash = super.hash("vir" + key + i);
            sortArrayMap.add(hash,value);
        }
        sortArrayMap.add(key, value);
    }
    //Arrays的sort
    @Override
    public void sort() {
        sortArrayMap.sort();
    }
    //順時(shí)針找第一個(gè)比給定key大的server
    @Override
    public String getFirstNodeValue(String value) {
        long hash = super.hash(value);
        System.out.println("value=" + value + " hash = " + hash);
        return sortArrayMap.firstNodeValue(hash);
    }
}
  • 上面是兩種一種一致性hash的實(shí)現(xiàn),都是加入虛擬節(jié)點(diǎn),然后把尋找第一個(gè)比給定value大的節(jié)點(diǎn),返回該虛擬節(jié)點(diǎn)對(duì)應(yīng)的value就行。
3.4 用戶(hù)下線(xiàn)
@ApiOperation("客戶(hù)端下線(xiàn)")
    @RequestMapping(value = "offLine", method = RequestMethod.POST)
    @ResponseBody()
    public BaseResponse<NULLBody> offLine(@RequestBody ChatReqVO groupReqVO) throws Exception {
        BaseResponse<NULLBody> res = new BaseResponse();

        CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(groupReqVO.getUserId());

        LOGGER.info("下線(xiàn)用戶(hù)[{}]", cimUserInfo.toString());
        accountService.offLine(groupReqVO.getUserId());

        res.setCode(StatusEnum.SUCCESS.getCode());
        res.setMessage(StatusEnum.SUCCESS.getMessage());
        return res;
    }

 @Override
    public void offLine(Long userId) throws Exception {
        //刪除路由
        redisTemplate.delete(ROUTE_PREFIX + userId) ;

        //刪除登錄狀態(tài)
        userInfoCacheService.removeLoginStatus(userId);
    }
  • 下線(xiàn)就是刪除,刪除登錄狀態(tài)就行。
3.5 群聊
 public BaseResponse<NULLBody> groupRoute(@RequestBody ChatReqVO groupReqVO) throws Exception {
        BaseResponse<NULLBody> res = new BaseResponse();
        LOGGER.info("msg=[{}]", groupReqVO.toString());
        //獲取所有的推送列表
        Map<Long, CIMServerResVO> serverResVOMap = accountService.loadRouteRelated();
        for (Map.Entry<Long, CIMServerResVO> cimServerResVOEntry : serverResVOMap.entrySet()) {
            Long userId = cimServerResVOEntry.getKey();
            CIMServerResVO value = cimServerResVOEntry.getValue();
            if (userId.equals(groupReqVO.getUserId())){
                //過(guò)濾掉自己
                CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(groupReqVO.getUserId());
                LOGGER.warn("過(guò)濾掉了發(fā)送者 userId={}",cimUserInfo.toString());
                continue;
            }
            //推送消息
            String url = "http://" + value.getIp() + ":" + value.getHttpPort() + "/sendMsg" ;
            ChatReqVO chatVO = new ChatReqVO(userId,groupReqVO.getMsg()) ;

            accountService.pushMsg(url,groupReqVO.getUserId(),chatVO);

        }
        res.setCode(StatusEnum.SUCCESS.getCode());
        res.setMessage(StatusEnum.SUCCESS.getMessage());
        return res;
    }
 
//掃描以ROUTE_PREFIX開(kāi)始的字符串,這個(gè)字符串對(duì)應(yīng)著每個(gè)用戶(hù)使用的server
 public Map<Long, CIMServerResVO> loadRouteRelated() {

        Map<Long, CIMServerResVO> routes = new HashMap<>(64);


        RedisConnection connection = redisTemplate.getConnectionFactory().getConnection();
        ScanOptions options = ScanOptions.scanOptions()
                .match(ROUTE_PREFIX + "*")
                .build();
        Cursor<byte[]> scan = connection.scan(options);

        while (scan.hasNext()) {
            byte[] next = scan.next();
            String key = new String(next, StandardCharsets.UTF_8);
            LOGGER.info("key={}", key);
            parseServerInfo(routes, key);

        }
        try {
            scan.close();
        } catch (IOException e) {
            LOGGER.error("IOException",e);
        }

        return routes;
    }
  • 上面就是找到找到每個(gè)節(jié)點(diǎn)的對(duì)應(yīng)的服務(wù)節(jié)點(diǎn)
 public void pushMsg(String url, long sendUserId, ChatReqVO groupReqVO) throws Exception {
        CIMUserInfo cimUserInfo = userInfoCacheService.loadUserInfoByUserId(sendUserId);

        JSONObject jsonObject = new JSONObject();
        jsonObject.put("msg", cimUserInfo.getUserName() + ":【" + groupReqVO.getMsg() + "】");
        jsonObject.put("userId", groupReqVO.getUserId());
        RequestBody requestBody = RequestBody.create(mediaType, jsonObject.toString());

        Request request = new Request.Builder()
                .url(url)
                .post(requestBody)
                .build();

        Response response = okHttpClient.newCall(request).execute();
        try {
            if (!response.isSuccessful()) {
                throw new IOException("Unexpected code " + response);
            }
        }finally {
            response.body().close();
        }
    }
  • 發(fā)送到具體的Server,讓sever負(fù)責(zé)廣播它收到的群聊消息。
3.6 私聊
 @ApiOperation("私聊 API")
    @RequestMapping(value = "p2pRoute", method = RequestMethod.POST)
    @ResponseBody()
    public BaseResponse<NULLBody> p2pRoute(@RequestBody P2PReqVO p2pRequest) throws Exception {
        BaseResponse<NULLBody> res = new BaseResponse();

        try {
            //獲取接收消息用戶(hù)的路由信息
            CIMServerResVO cimServerResVO = accountService.loadRouteRelatedByUserId(p2pRequest.getReceiveUserId());
            //推送消息
            String url = "http://" + cimServerResVO.getIp() + ":" + cimServerResVO.getHttpPort() + "/sendMsg" ;

            //p2pRequest.getReceiveUserId()==>消息接收者的 userID
            ChatReqVO chatVO = new ChatReqVO(p2pRequest.getReceiveUserId(),p2pRequest.getMsg()) ;
            accountService.pushMsg(url,p2pRequest.getUserId(),chatVO);

            res.setCode(StatusEnum.SUCCESS.getCode());
            res.setMessage(StatusEnum.SUCCESS.getMessage());

        }catch (CIMException e){
            res.setCode(e.getErrorCode());
            res.setMessage(e.getErrorMessage());
        }
        return res;
    }
  • 與群聊很相似,只不過(guò)這里是一個(gè)用戶(hù)。都是定位具體的sever,然后再根據(jù)登陸在server的消息,發(fā)送消息到具體的channel就可以。
總結(jié)

至此,router分析也完成了,router的實(shí)現(xiàn)主要是獲取到server,同過(guò)http調(diào)用,從而發(fā)送消息。這個(gè)過(guò)程中,也涉及到負(fù)載均衡,在用戶(hù)注冊(cè)的時(shí)候,盡可能均衡分布。這里用了輪詢(xún),隨機(jī),一致性hash這三種負(fù)載均衡算法。而且后面也方便拓展。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 網(wǎng)關(guān)(Gateway) 簡(jiǎn)單定義一:網(wǎng)絡(luò)連接到另一個(gè)網(wǎng)絡(luò)的“關(guān)口”。 理論 如果網(wǎng)絡(luò)A中的主機(jī)發(fā)現(xiàn)數(shù)據(jù)包的目的主機(jī)...
    Enomothem閱讀 6,091評(píng)論 0 2
  • 實(shí)在開(kāi)心到不行,人生頭次擁有腹肌,暫且叫她們小A,小B好了。 順產(chǎn)八斤多后,不堪入目的肚皮,一看見(jiàn)就無(wú)比嫌棄自己。...
    Vinc姐閱讀 283評(píng)論 0 0
  • 《話(huà)說(shuō)服務(wù)緣起》:千古“活結(jié)”為哪般?——百談服務(wù)經(jīng)之來(lái)龍去脈1 古詩(shī)云:“人事有代謝,往來(lái)成古今”.如果循著歷史...
    嚴(yán)滿(mǎn)意聊服務(wù)事閱讀 503評(píng)論 0 0
  • 彤云郁郁愁容笑,驟雨驚風(fēng)任曳搖。一抹珠簾天扯地,天開(kāi)日出映虹橋。
    古道西風(fēng)廋馬閱讀 165評(píng)論 0 0
  • 大家好,我是質(zhì)檢崗的新同事。4月份之前我也跟大家一樣,在電催一線(xiàn)工作,平均每天打至少300個(gè)電話(huà)。為什么我寫(xiě)這篇文...
    容大小姐閱讀 7,130評(píng)論 7 1

友情鏈接更多精彩內(nèi)容