Nacos(版本2.0.1)注冊原理源碼解析

通過查閱官網(wǎng)可知,服務注冊實際上就是向Nacos服務端發(fā)起一個http請求。



對應的controller(InstanceController)如下:

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + UtilsAndCommons.NACOS_NAMING_INSTANCE_CONTEXT)
public class InstanceController {
。。。
}

1. 客戶端服務注冊流程

  1. nacos-discovery-spring-boot-starter 啟動服務通過自動裝配功能裝配nacos客戶端。
  2. Nacos自動配置服務實現(xiàn)Spring的應用監(jiān)聽器用來注冊nacos服務。
  3. 監(jiān)聽到spring的WebServerInitializedEvent事件后把springboot服務注冊到nacos注冊中心。
  4. 調用nacos-client jar包中的com.alibaba.nacos.client.naming.net.NamingProxy#registerService完成服務注冊。

以上為spring Boot自動裝配原理以及spring容器啟動時監(jiān)聽器的原理,不做過多解釋。

registerService對應代碼如下:

 @Override
    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        
        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
                instance);
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        final Map<String, String> params = new HashMap<>(32);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, groupedServiceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put(IP_PARAM, instance.getIp());
        params.put(PORT_PARAM, String.valueOf(instance.getPort()));
        params.put(WEIGHT_PARAM, String.valueOf(instance.getWeight()));
        params.put(REGISTER_ENABLE_PARAM, String.valueOf(instance.isEnabled()));
        params.put(HEALTHY_PARAM, String.valueOf(instance.isHealthy()));
        params.put(EPHEMERAL_PARAM, String.valueOf(instance.isEphemeral()));
        params.put(META_PARAM, JacksonUtils.toJson(instance.getMetadata()));
        
        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); //發(fā)送http請求向服務端進行注冊。
    }

2. 服務端注冊邏輯

調用鏈路如下:

com.alibaba.nacos.naming.controllers.InstanceController#register
 ->com.alibaba.nacos.naming.core.ServiceManager#registerInstance
  ->com.alibaba.nacos.naming.core.ServiceManager#addInstance
   ->com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put
    ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put
     ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#onPut
      ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DataStore#put
      ->com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#addTask
    @CanDistro
    @PostMapping
    @Secured(action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        
        final String namespaceId = WebUtils
                .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        
        final Instance instance = HttpRequestInstanceBuilder.newBuilder()
                .setDefaultInstanceEphemeral(switchDomain.isDefaultInstanceEphemeral()).setRequest(request).build();
        //注冊邏輯
        getInstanceOperator().registerInstance(namespaceId, serviceName, instance);
        NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), "",
                false, namespaceId, NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName),
                instance.getIp(), instance.getPort()));
        return "ok";
    }

接著往下走進入到InstanceOperatorServiceImpl類中


 @Override
 public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
     com.alibaba.nacos.naming.core.Instance coreInstance = parseInstance(instance);
    serviceManager.registerInstance(namespaceId, serviceName, coreInstance);
   }

進入到ServiceManager類中

 public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        
        NamingUtils.checkInstanceIsLegal(instance);
        
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        
        Service service = getService(namespaceId, serviceName);
        
        checkServiceIsNull(service, namespaceId, serviceName);
        //添加到注冊表里
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }

在繼續(xù)跟進addInstance方法

 public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
            throws NacosException {
        
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        
        Service service = getService(namespaceId, serviceName);
        
        synchronized (service) {
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
            
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            //加入到注冊表中的邏輯
            consistencyService.put(key, instances);
        }
    }

依據(jù)調用鏈路走到這里

    @Override
    public void put(String key, Record value) throws NacosException {
        //添加注冊表
        onPut(key, value);

        if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
            return;
        }
      //集群架構下進行數(shù)據(jù)同步的邏輯,此分支可以先不看
        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                DistroConfig.getInstance().getSyncDelayMillis());
    }
 public void onPut(String key, Record value) {
        
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
        //這里就是將instance保存在一個map中
            dataStore.put(key, datum);
        }
        
        if (!listeners.containsKey(key)) {
            return;
        }
        //添加客戶端信息到阻塞隊列
        notifier.addTask(key, DataOperation.CHANGE);
    }
public class Notifier implements Runnable {
        
        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
        //阻塞隊列
        private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
        
        /**
         * Add new notify task to queue.
         *
         * @param datumKey data key
         * @param action   action for data
         */
        public void addTask(String datumKey, DataOperation action) {
            
            if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                return;
            }
            if (action == DataOperation.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            tasks.offer(Pair.with(datumKey, action));
        }
        
        public int getTaskSize() {
            return tasks.size();
        }
        
        //既然是一個線程類,那么就首先看run方法,DistroConsistencyServiceImpl初始化的時候會將Notifier 
        //提交到只有一個線程的線程池中去處理
        @Override
        public void run() {
            Loggers.DISTRO.info("distro notifier started");
            
            for (; ; ) {
                try {
                    Pair<String, DataOperation> pair = tasks.take();
                    // 拿出阻塞隊列中的客戶端信息進行處理 
                    handle(pair);
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }
        
        private void handle(Pair<String, DataOperation> pair) {
            try {
                String datumKey = pair.getValue0();
                DataOperation action = pair.getValue1();
                
                services.remove(datumKey);
                
                int count = 0;
                
                if (!listeners.containsKey(datumKey)) {
                    return;
                }
                //遍歷所有的實例
                for (RecordListener listener : listeners.get(datumKey)) {
                    
                    count++;
                    
                    try {
                        //如果實例信息發(fā)生了改變  
                        if (action == DataOperation.CHANGE) {
                        //在onPut方法中已經(jīng)將instance放入到一個dataStore的map中,if條件滿足則取出來對ip地址進行修改
                            listener.onChange(datumKey, dataStore.get(datumKey).value);
                            continue;
                        }
                        
                        if (action == DataOperation.DELETE) {
                            listener.onDelete(datumKey);
                            continue;
                        }
                    } catch (Throwable e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                    }
                }
                
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO
                            .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                                    datumKey, count, action.name());
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
            }
        }
    }

Nacos采用阻塞隊列加Notifier 的形式,完成異步注冊架構,這樣做的好處在于:提高注冊的并發(fā),對于客戶端來說就是阻塞狀態(tài),啟動速度變慢,對于正常的功能沒有任何影響,而且大多數(shù)項目中的服務數(shù)量也不可能存在將阻塞隊列裝滿的情況。

后續(xù)調用鏈路

  ->com.alibaba.nacos.naming.core.Service#onChange
   ->com.alibaba.nacos.naming.core.Service#updateIPs
    ->com.alibaba.nacos.naming.core.Cluster#updateIps
public void updateIps(List<Instance> ips, boolean ephemeral) {
        //如果為ephemeral 則復制出一份副本
        Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
        
        HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
        //復制操作
        for (Instance ip : toUpdateInstances) {
            oldIpMap.put(ip.getDatumKey(), ip);
        }
        //基于oldIpMap 即復制出來的 進行注冊操作,并不是復制出整個注冊表,而是只復制了實例的set集合
        List<Instance> updatedIps = updatedIps(ips, oldIpMap.values());
       。。。

        //最終將 toUpdateInstances 賦值給ephemeralInstances 或者 persistentInstances
        toUpdateInstances = new HashSet<>(ips);
        
        if (ephemeral) {
        // Set<Instance> ephemeralInstances = new HashSet<>(); 真正存放instance的地方
            ephemeralInstances = toUpdateInstances;
        } else {
            persistentInstances = toUpdateInstances;
        }
    }

此處復制出一個map的作用就是為了提高并發(fā)讀寫能力,利用cow的思想免除了了加鎖的開銷,也可以避免消費端從注冊中心中讀取到臟數(shù)據(jù)。又因為初始化的時候只會初始化一次,所以也只有一個線程來處理隊列中的任務,所以也不會出現(xiàn)覆蓋問題。

3. Nacos注冊表結構


舉例說明


?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容