dubbo之Cluster(路由Router)

dubbo的路由規(guī)則,主要包括 Router、RouterFactory 兩個核心接口和RouterChain一個核心類;其中RouterChain是入口,RouterChain會通過RouterFactory生成Router放入緩存list,然后遍歷執(zhí)行route;而RouterChain主要用于Directory.list方法,根據(jù)Invocation會話列出當(dāng)前符合條件(路由、負(fù)載均衡、可用)的Invoker;dubbo提供幾種內(nèi)置Router,主要分為兩類:靜態(tài)Router(ConditionRouter、ScriptRouter)和動態(tài)Router(實現(xiàn)ConfigurationListener接口,比如TagRouter、AppRouterServiceRouter),主要區(qū)別在于支持路由規(guī)則動態(tài)更新。下面從RouterChain開始,逐個分析。
RouterChain內(nèi)部邏輯非常簡單,核心關(guān)注私有構(gòu)造方法以及route方法:

private RouterChain(URL url) {
    //加載RouterFactory SPI擴(kuò)展
    List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
            .getActivateExtension(url, (String[]) null);
    // 通過RouterFactory生成對應(yīng)Router
    List<Router> routers = extensionFactories.stream()
            .map(factory -> factory.getRouter(url))
            .collect(Collectors.toList());
    // 初始化內(nèi)置Router
    initWithRouters(routers);
}

public void initWithRouters(List<Router> builtinRouters) {
   this.builtinRouters = builtinRouters;
   this.routers = new CopyOnWriteArrayList<>(builtinRouters);
   // 排序規(guī)則,先根據(jù)priority排然后是url.fullString
   this.sort();
}

// route,遍歷調(diào)用所有routers的route,返回過濾后的Invoker
public List<Invoker<T>> route(URL url, Invocation invocation) {
        List<Invoker<T>> finalInvokers = invokers;
        for (Router router : routers) {
            finalInvokers = router.route(finalInvokers, url, invocation);
        }
        return finalInvokers;
}

RouterFactory支持SPI,dubbo內(nèi)置實現(xiàn) AppRouterFactory、ServiceRouterFactory、TagRouterFactory、CacheableRouterFactory、ConditionRouterFactory、ScriptRouterFactory等,分別對應(yīng)AppRouter、ServiceRouter、TagRouter、ConditionRouter、ScripRouter;RouterFactory邏輯非常簡單,就不上代碼了。
了解完RouterChain和RouterFactory,下面來關(guān)注各種Router。先來看ConditionRouter(非常重要,動態(tài)Router依賴ConditionRouter實現(xiàn)),顧名思義,條件路由,內(nèi)置路由規(guī)則格式正則以及兩個條件map(whenCondition、thenCondition,這兩個條件map設(shè)計的非常有意思)。先從構(gòu)造方法開始:

public ConditionRouter(URL url) {
    this.url = url;
    // 從url獲取制定參數(shù)
    this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
    this.force = url.getParameter(Constants.FORCE_KEY, false);
    this.enabled = url.getParameter(Constants.ENABLED_KEY, true);
    // 根據(jù)url配置的路由規(guī)則,執(zhí)行核心初始化邏輯
    init(url.getParameterAndDecoded(Constants.RULE_KEY));
}

public void init(String rule) {
        try {
            if (rule == null || rule.trim().length() == 0) {
                throw new IllegalArgumentException("Illegal route rule!");
            }
            // 這里可以猜想下rule的格式
            rule = rule.replace("consumer.", "").replace("provider.", "");
            int i = rule.indexOf("=>");
            // 截取whenRule,用于解析whenCondition
            String whenRule = i < 0 ? null : rule.substring(0, i).trim();
            // 同樣的,截取thenRule,用于解析thenCondition
            String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();
            // 解析whenContion和thenCondition
            Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule);
            Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule);
            // NOTE: It should be determined on the business level whether the `When condition` can be empty or not.
            this.whenCondition = when;
            this.thenCondition = then;
        } catch (ParseException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
}

//順便看一下MatchPair的設(shè)計
protected static final class MatchPair {
    // 匹配結(jié)果集
    final Set<String> matches = new HashSet<String>();
    // 不匹配結(jié)果集
    final Set<String> mismatches = new HashSet<String>();
        // 判斷是否匹配
    private boolean isMatch(String value, URL param) {
        if (!matches.isEmpty() && mismatches.isEmpty()) {
            for (String match : matches) {
               if (UrlUtils.isMatchGlobPattern(match, value, param)) {
                     return true;
               }
            }
                return false;
        }

        if (!mismatches.isEmpty() && matches.isEmpty()) {
            for (String mismatch : mismatches) {
                if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {
                    return false;
                }
            }
            return true;
        }

        if (!matches.isEmpty() && !mismatches.isEmpty()) {
            // 即在matches又在mismatches,認(rèn)為是mismatches
            //when both mismatches and matches contain the same value, then using mismatches first
            for (String mismatch : mismatches) {
                 if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {
                       return false;
                 }
            }
            for (String match : matches) {
                if (UrlUtils.isMatchGlobPattern(match, value, param)) {
                     return true;
                }
            }
            return false;
        }
            return false;
    }
}

//然后再來看規(guī)則解析,rule格式類似 
// host = 2.2.2.2,1.1.1.1,3.3.3.3" 或者 "host !=4.4.4.4 & host = 2.2.2.2,1.1.1.1,3.3.3.3"
// 以后者為例,解析結(jié)果是 key = host,value = matchpair [matches:{"2.2.2.2","1.1.1.1","3.3.3.3"},misMatches:{"4.4.4.4"]
private static Map<String, MatchPair> parseRule(String rule) throws ParseException {
        Map<String, MatchPair> condition = new HashMap<String, MatchPair>();
        if (StringUtils.isBlank(rule)) {
            return condition;
        }
        // Key-Value pair, stores both match and mismatch conditions
        MatchPair pair = null;
        // Multiple values
        Set<String> values = null;
            //正則解析rule
        final Matcher matcher = ROUTE_PATTERN.matcher(rule);
        while (matcher.find()) { // Try to match one by one
            // separator = "" 
            String separator = matcher.group(1);
            // content = "host"
            String content = matcher.group(2);
            // Start part of the condition expression.
            if (StringUtils.isEmpty(separator)) {
                pair = new MatchPair();
                condition.put(content, pair);
            }
            // The KV part of the condition expression
            else if ("&".equals(separator)) {
                if (condition.get(content) == null) {
                    pair = new MatchPair();
                    condition.put(content, pair);
                } else {
                    pair = condition.get(content);
                }
            }
            // The Value in the KV part.
            else if ("=".equals(separator)) {
                if (pair == null) {
                    //拋異常,省略
                }

                values = pair.matches;
                values.add(content);
            }
            // The Value in the KV part.
            else if ("!=".equals(separator)) {
                if (pair == null) {
                  // 拋異常,省略
                }
                values = pair.mismatches;
                values.add(content);
            }
            // The Value in the KV part, if Value have more than one items.多值用,分割
            else if (",".equals(separator)) { // Should be separated by ','
                if (values == null || values.isEmpty()) {
                   // 拋異常,省略
                }
                values.add(content);
            } else {
                 // 拋異常,省略
            }
        }
        return condition;

上面就是ConditionRouter的初始化過程,route方法的邏輯比較簡單,先做條件校驗,不滿足則直接返回原invokers;滿足則將匹配then條件的放入結(jié)果集返回,重點關(guān)注mathCondition方法。

public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
        throws RpcException {
    // 不可用,則直接返回原invokers
    if (!enabled) {
        return invokers;
    }

    if (CollectionUtils.isEmpty(invokers)) {
        return invokers;
    }
    try {
        // 不滿足when條件,直接返回所有invokers,不做條件過濾
        if (!matchWhen(url, invocation)) {
            return invokers;
        }
        List<Invoker<T>> result = new ArrayList<Invoker<T>>();
        if (thenCondition == null) {
            logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
            return result;
        }
        // 匹配then規(guī)則的放入結(jié)果集
        for (Invoker<T> invoker : invokers) {
            if (matchThen(invoker.getUrl(), url)) {
                result.add(invoker);
            }
        }
        if (!result.isEmpty()) {
            return result;
        } else if (force) {
            logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY));
            return result;
        }
    } catch (Throwable t) {
        logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
    }
    return invokers;
}

private boolean matchCondition(Map<String, MatchPair> condition, URL url, URL param, Invocation invocation) {
        Map<String, String> sample = url.toMap();
        boolean result = false;
        for (Map.Entry<String, MatchPair> matchPair : condition.entrySet()) {
            String key = matchPair.getKey();
            String sampleValue;
            //get real invoked method name from invocation
            // 匹配的維度包括method、address、host,以及URL中的自定義參數(shù)
            if (invocation != null && (Constants.METHOD_KEY.equals(key) || Constants.METHODS_KEY.equals(key))) {
                sampleValue = invocation.getMethodName();
            } else if (Constants.ADDRESS_KEY.equals(key)) {
                sampleValue = url.getAddress();
            } else if (Constants.HOST_KEY.equals(key)) {
                sampleValue = url.getHost();
            } else {
                sampleValue = sample.get(key);
                if (sampleValue == null) {
                    sampleValue = sample.get(Constants.DEFAULT_KEY_PREFIX + key);
                }
            }
            // 匹配成功 true,否則false;
            if (sampleValue != null) {
                if (!matchPair.getValue().isMatch(sampleValue, param)) {
                    return false;
                } else {
                    result = true;
                }
            } else {
                //not pass the condition
                if (!matchPair.getValue().matches.isEmpty()) {
                    return false;
                } else {
                    result = true;
                }
            }
        }
        return result;
    }

了解完靜態(tài)Router,下面來看依賴ConditionRouter的其他動態(tài)Router(TagRouter、AppRouter、ServiceRouter);其中AppRouter與ServiceRouter一樣(唯一區(qū)別在于ruleKey,分別取自URL的application和serviceKey),繼承自ListenableRouter,核心邏輯也在ListenableRouter,所以直接看ListenableRouter。LisenableRouter內(nèi)置ConditionRule和ConditionRouter列表,核心的route方法也是借助ConditionRouter實現(xiàn),下面是初始化邏輯的核心代碼

// 構(gòu)造方法,核心關(guān)注init
public ListenableRouter(DynamicConfiguration configuration, URL url, String ruleKey) {
    super(configuration, url);
    this.force = false;
    this.init(ruleKey);
}

private synchronized void init(String ruleKey) {
   if (StringUtils.isEmpty(ruleKey)) {
       return;
   }
   // routerKey = ${applicationName}.condition-router
   String routerKey = ruleKey + RULE_SUFFIX;
   // 監(jiān)聽配置變更
   configuration.addListener(routerKey, this);
   String rule = configuration.getConfig(routerKey);
   if (rule != null) {
       // 處理規(guī)則變更事件,ConfigurationListener接口process方法的實現(xiàn)
       this.process(new ConfigChangeEvent(routerKey, rule));
   }
}

// 路由規(guī)則動態(tài)變更事件處理,初始化會調(diào)用
public synchronized void process(ConfigChangeEvent event) {
        if (logger.isInfoEnabled()) {
            logger.info("Notification of condition rule, change type is: " + event.getChangeType() +
                    ", raw rule is:\n " + event.getValue());
        }
                // 刪除事件,則置空routerRule和conditonRouters
        if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
            routerRule = null;
            conditionRouters = Collections.emptyList();
        } else {
            try {
                // 路由規(guī)則解析(涉及yml解析),根據(jù)規(guī)則生成conditonRouter,邏輯比較簡單,就不單獨列了
                routerRule = ConditionRuleParser.parse(event.getValue());
                generateConditions(routerRule);
            } catch (Exception e) {
             //異常處理,忽略
            }
        }
}

route邏輯也比較簡單

public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
    if (CollectionUtils.isEmpty(invokers) || conditionRouters.size() == 0) {
        return invokers;
    }
    // We will check enabled status inside each router.
    // 結(jié)果需要滿足所有router規(guī)則
    for (Router router : conditionRouters) {
        invokers = router.route(invokers, url, invocation);
    }
    return invokers;
}

最后來看TagRouter,TagRouter內(nèi)置TagRouterRule,核心route邏輯完全自己實現(xiàn);另外TagRouter也是唯一一個實現(xiàn)Router接口notify方法的router。初始化過程比較簡單,直接來看核心邏輯route和notify。

public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
    if (CollectionUtils.isEmpty(invokers)) {
        return invokers;
    }

    // tagRouterRule 無效,則使用靜態(tài)tag進(jìn)行路由
    if (tagRouterRule == null || !tagRouterRule.isValid() || !tagRouterRule.isEnabled()) {
        return filterUsingStaticTag(invokers, url, invocation);
    }

    List<Invoker<T>> result = invokers;
    // 優(yōu)先取invocation中的tag
    String tag = StringUtils.isEmpty(invocation.getAttachment(TAG_KEY)) ? url.getParameter(TAG_KEY) :
            invocation.getAttachment(TAG_KEY);

    // if we are requesting for a Provider with a specific tag
    // 有指定tag,則優(yōu)先使用tagRouterRule中的address過濾;
    // 過濾結(jié)果非空且要求強(qiáng)制使用tagRule,則直接返回;否則根據(jù)tag再次過濾
    if (StringUtils.isNotEmpty(tag)) {
        List<String> addresses = tagRouterRule.getTagnameToAddresses().get(tag);
        // filter by dynamic tag group first
        // 根據(jù)地址過濾,addresses.contains(invoker.getUrl.getAddress)
        if (CollectionUtils.isNotEmpty(addresses)) {
            result = filterInvoker(invokers, invoker -> addressMatches(invoker.getUrl(), addresses));
            // if result is not null OR it's null but force=true, return result directly
            if (CollectionUtils.isNotEmpty(result) || tagRouterRule.isForce()) {
                return result;
            }
        } else {
            // dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
            // dynamic tag group but force=false. check static tag
            // 否則根據(jù)tag進(jìn)行過濾
            result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl().getParameter(TAG_KEY)));
        }
        // If there's no tagged providers that can match the current tagged request. force.tag is set by default to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
        if (CollectionUtils.isNotEmpty(result) || isForceUseTag(invocation)) {
            return result;
        }
        // FAILOVER: return all Providers without any tags.
        // 失效轉(zhuǎn)移,此時會取沒有match到address的invoker
        else {
            List<Invoker<T>> tmp = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(),
                    tagRouterRule.getAddresses()));
            return filterInvoker(tmp, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(TAG_KEY)));
        }
    } else {
        // List<String> addresses = tagRouterRule.filter(providerApp);
        // return all addresses in dynamic tag group.
        // 未指定tag,優(yōu)先找address不匹配的invoker,若地址全部在tag,則直接返回空
        List<String> addresses = tagRouterRule.getAddresses();
        if (CollectionUtils.isNotEmpty(addresses)) {
            result = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(), addresses));
            // 1. all addresses are in dynamic tag group, return empty list.
            if (CollectionUtils.isEmpty(result)) {
                return result;
            }
            // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the static tag group.
        }
        return filterInvoker(result, invoker -> {
            String localTag = invoker.getUrl().getParameter(TAG_KEY);
            return StringUtils.isEmpty(localTag) || !tagRouterRule.getTagNames().contains(localTag);
        });
    }
}

// 使用靜態(tài)tag進(jìn)行route路由過濾
private <T> List<Invoker<T>> filterUsingStaticTag(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        List<Invoker<T>> result = invokers;
        // Dynamic param,從invocation取tag
        String tag = StringUtils.isEmpty(invocation.getAttachment(TAG_KEY)) ? url.getParameter(TAG_KEY) :
                invocation.getAttachment(TAG_KEY);
        // Tag request
        if (!StringUtils.isEmpty(tag)) {
            // 配了tag,優(yōu)先通過tag過濾
            result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY)));
            // 未匹配到,則判斷是否強(qiáng)制使用tag,不強(qiáng)制則取tag值為空的invoker的
            if (CollectionUtils.isEmpty(result) && !isForceUseTag(invocation)) {
                result = filterInvoker(invokers, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY)));
            }
        } else {
            // 取tag值為空的invoker
            result = filterInvoker(invokers, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY)));
        }
        return result;
}

動態(tài)tag與靜態(tài)tag的區(qū)別在于動態(tài)tag使用tagRouterRule,通過監(jiān)聽動態(tài)配置,tagRouterRule會發(fā)生變更,代碼如下:

public synchronized void process(ConfigChangeEvent event) {
    if (logger.isDebugEnabled()) {
        logger.debug("Notification of tag rule, change type is: " + event.getChangeType() + ", raw rule is:\n " +
                event.getValue());
    }

    try {
        if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
            this.tagRouterRule = null;
        } else {
            this.tagRouterRule = TagRuleParser.parse(event.getValue());
        }
    } catch (Exception e) {
       // 異常處理,忽略
    }
}

最后來看notify,邏輯也非常簡單,根據(jù)invoker的url獲取providerApplication;若與當(dāng)前application不一致,則會重新加載tagRouterRule

public <T> void notify(List<Invoker<T>> invokers) {
    if (CollectionUtils.isEmpty(invokers)) {
        return;
    }

    Invoker<T> invoker = invokers.get(0);
    URL url = invoker.getUrl();
    // remote.application
    String providerApplication = url.getParameter(Constants.REMOTE_APPLICATION_KEY);

    if (StringUtils.isEmpty(providerApplication)) {
        logger.error("TagRouter must getConfig from or subscribe to a specific application, but the application " +
                "in this TagRouter is not specified.");
        return;
    }

    //涉及configuration內(nèi)部lisenter
    synchronized (this) {
        if (!providerApplication.equals(application)) {
            if (!StringUtils.isEmpty(application)) {
                configuration.removeListener(application + RULE_SUFFIX, this);
            }
            String key = providerApplication + RULE_SUFFIX;
            configuration.addListener(key, this);
            application = providerApplication;
            String rawRule = configuration.getConfig(key);
            if (rawRule != null) {
                this.process(new ConfigChangeEvent(key, rawRule));
            }
        }
    }
}

關(guān)于Router的解析就到這里,

注:dubbo源碼版本2.7.1,歡迎指正。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容