dubbo的路由規(guī)則,主要包括 Router、RouterFactory 兩個核心接口和RouterChain一個核心類;其中RouterChain是入口,RouterChain會通過RouterFactory生成Router放入緩存list,然后遍歷執(zhí)行route;而RouterChain主要用于Directory.list方法,根據(jù)Invocation會話列出當(dāng)前符合條件(路由、負(fù)載均衡、可用)的Invoker;dubbo提供幾種內(nèi)置Router,主要分為兩類:靜態(tài)Router(ConditionRouter、ScriptRouter)和動態(tài)Router(實現(xiàn)ConfigurationListener接口,比如TagRouter、AppRouter、ServiceRouter),主要區(qū)別在于支持路由規(guī)則動態(tài)更新。下面從RouterChain開始,逐個分析。
RouterChain內(nèi)部邏輯非常簡單,核心關(guān)注私有構(gòu)造方法以及route方法:
private RouterChain(URL url) {
//加載RouterFactory SPI擴(kuò)展
List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
.getActivateExtension(url, (String[]) null);
// 通過RouterFactory生成對應(yīng)Router
List<Router> routers = extensionFactories.stream()
.map(factory -> factory.getRouter(url))
.collect(Collectors.toList());
// 初始化內(nèi)置Router
initWithRouters(routers);
}
public void initWithRouters(List<Router> builtinRouters) {
this.builtinRouters = builtinRouters;
this.routers = new CopyOnWriteArrayList<>(builtinRouters);
// 排序規(guī)則,先根據(jù)priority排然后是url.fullString
this.sort();
}
// route,遍歷調(diào)用所有routers的route,返回過濾后的Invoker
public List<Invoker<T>> route(URL url, Invocation invocation) {
List<Invoker<T>> finalInvokers = invokers;
for (Router router : routers) {
finalInvokers = router.route(finalInvokers, url, invocation);
}
return finalInvokers;
}
RouterFactory支持SPI,dubbo內(nèi)置實現(xiàn) AppRouterFactory、ServiceRouterFactory、TagRouterFactory、CacheableRouterFactory、ConditionRouterFactory、ScriptRouterFactory等,分別對應(yīng)AppRouter、ServiceRouter、TagRouter、ConditionRouter、ScripRouter;RouterFactory邏輯非常簡單,就不上代碼了。
了解完RouterChain和RouterFactory,下面來關(guān)注各種Router。先來看ConditionRouter(非常重要,動態(tài)Router依賴ConditionRouter實現(xiàn)),顧名思義,條件路由,內(nèi)置路由規(guī)則格式正則以及兩個條件map(whenCondition、thenCondition,這兩個條件map設(shè)計的非常有意思)。先從構(gòu)造方法開始:
public ConditionRouter(URL url) {
this.url = url;
// 從url獲取制定參數(shù)
this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
this.force = url.getParameter(Constants.FORCE_KEY, false);
this.enabled = url.getParameter(Constants.ENABLED_KEY, true);
// 根據(jù)url配置的路由規(guī)則,執(zhí)行核心初始化邏輯
init(url.getParameterAndDecoded(Constants.RULE_KEY));
}
public void init(String rule) {
try {
if (rule == null || rule.trim().length() == 0) {
throw new IllegalArgumentException("Illegal route rule!");
}
// 這里可以猜想下rule的格式
rule = rule.replace("consumer.", "").replace("provider.", "");
int i = rule.indexOf("=>");
// 截取whenRule,用于解析whenCondition
String whenRule = i < 0 ? null : rule.substring(0, i).trim();
// 同樣的,截取thenRule,用于解析thenCondition
String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();
// 解析whenContion和thenCondition
Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule);
Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule);
// NOTE: It should be determined on the business level whether the `When condition` can be empty or not.
this.whenCondition = when;
this.thenCondition = then;
} catch (ParseException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
//順便看一下MatchPair的設(shè)計
protected static final class MatchPair {
// 匹配結(jié)果集
final Set<String> matches = new HashSet<String>();
// 不匹配結(jié)果集
final Set<String> mismatches = new HashSet<String>();
// 判斷是否匹配
private boolean isMatch(String value, URL param) {
if (!matches.isEmpty() && mismatches.isEmpty()) {
for (String match : matches) {
if (UrlUtils.isMatchGlobPattern(match, value, param)) {
return true;
}
}
return false;
}
if (!mismatches.isEmpty() && matches.isEmpty()) {
for (String mismatch : mismatches) {
if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {
return false;
}
}
return true;
}
if (!matches.isEmpty() && !mismatches.isEmpty()) {
// 即在matches又在mismatches,認(rèn)為是mismatches
//when both mismatches and matches contain the same value, then using mismatches first
for (String mismatch : mismatches) {
if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {
return false;
}
}
for (String match : matches) {
if (UrlUtils.isMatchGlobPattern(match, value, param)) {
return true;
}
}
return false;
}
return false;
}
}
//然后再來看規(guī)則解析,rule格式類似
// host = 2.2.2.2,1.1.1.1,3.3.3.3" 或者 "host !=4.4.4.4 & host = 2.2.2.2,1.1.1.1,3.3.3.3"
// 以后者為例,解析結(jié)果是 key = host,value = matchpair [matches:{"2.2.2.2","1.1.1.1","3.3.3.3"},misMatches:{"4.4.4.4"]
private static Map<String, MatchPair> parseRule(String rule) throws ParseException {
Map<String, MatchPair> condition = new HashMap<String, MatchPair>();
if (StringUtils.isBlank(rule)) {
return condition;
}
// Key-Value pair, stores both match and mismatch conditions
MatchPair pair = null;
// Multiple values
Set<String> values = null;
//正則解析rule
final Matcher matcher = ROUTE_PATTERN.matcher(rule);
while (matcher.find()) { // Try to match one by one
// separator = ""
String separator = matcher.group(1);
// content = "host"
String content = matcher.group(2);
// Start part of the condition expression.
if (StringUtils.isEmpty(separator)) {
pair = new MatchPair();
condition.put(content, pair);
}
// The KV part of the condition expression
else if ("&".equals(separator)) {
if (condition.get(content) == null) {
pair = new MatchPair();
condition.put(content, pair);
} else {
pair = condition.get(content);
}
}
// The Value in the KV part.
else if ("=".equals(separator)) {
if (pair == null) {
//拋異常,省略
}
values = pair.matches;
values.add(content);
}
// The Value in the KV part.
else if ("!=".equals(separator)) {
if (pair == null) {
// 拋異常,省略
}
values = pair.mismatches;
values.add(content);
}
// The Value in the KV part, if Value have more than one items.多值用,分割
else if (",".equals(separator)) { // Should be separated by ','
if (values == null || values.isEmpty()) {
// 拋異常,省略
}
values.add(content);
} else {
// 拋異常,省略
}
}
return condition;
上面就是ConditionRouter的初始化過程,route方法的邏輯比較簡單,先做條件校驗,不滿足則直接返回原invokers;滿足則將匹配then條件的放入結(jié)果集返回,重點關(guān)注mathCondition方法。
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
throws RpcException {
// 不可用,則直接返回原invokers
if (!enabled) {
return invokers;
}
if (CollectionUtils.isEmpty(invokers)) {
return invokers;
}
try {
// 不滿足when條件,直接返回所有invokers,不做條件過濾
if (!matchWhen(url, invocation)) {
return invokers;
}
List<Invoker<T>> result = new ArrayList<Invoker<T>>();
if (thenCondition == null) {
logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
return result;
}
// 匹配then規(guī)則的放入結(jié)果集
for (Invoker<T> invoker : invokers) {
if (matchThen(invoker.getUrl(), url)) {
result.add(invoker);
}
}
if (!result.isEmpty()) {
return result;
} else if (force) {
logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY));
return result;
}
} catch (Throwable t) {
logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
}
return invokers;
}
private boolean matchCondition(Map<String, MatchPair> condition, URL url, URL param, Invocation invocation) {
Map<String, String> sample = url.toMap();
boolean result = false;
for (Map.Entry<String, MatchPair> matchPair : condition.entrySet()) {
String key = matchPair.getKey();
String sampleValue;
//get real invoked method name from invocation
// 匹配的維度包括method、address、host,以及URL中的自定義參數(shù)
if (invocation != null && (Constants.METHOD_KEY.equals(key) || Constants.METHODS_KEY.equals(key))) {
sampleValue = invocation.getMethodName();
} else if (Constants.ADDRESS_KEY.equals(key)) {
sampleValue = url.getAddress();
} else if (Constants.HOST_KEY.equals(key)) {
sampleValue = url.getHost();
} else {
sampleValue = sample.get(key);
if (sampleValue == null) {
sampleValue = sample.get(Constants.DEFAULT_KEY_PREFIX + key);
}
}
// 匹配成功 true,否則false;
if (sampleValue != null) {
if (!matchPair.getValue().isMatch(sampleValue, param)) {
return false;
} else {
result = true;
}
} else {
//not pass the condition
if (!matchPair.getValue().matches.isEmpty()) {
return false;
} else {
result = true;
}
}
}
return result;
}
了解完靜態(tài)Router,下面來看依賴ConditionRouter的其他動態(tài)Router(TagRouter、AppRouter、ServiceRouter);其中AppRouter與ServiceRouter一樣(唯一區(qū)別在于ruleKey,分別取自URL的application和serviceKey),繼承自ListenableRouter,核心邏輯也在ListenableRouter,所以直接看ListenableRouter。LisenableRouter內(nèi)置ConditionRule和ConditionRouter列表,核心的route方法也是借助ConditionRouter實現(xiàn),下面是初始化邏輯的核心代碼
// 構(gòu)造方法,核心關(guān)注init
public ListenableRouter(DynamicConfiguration configuration, URL url, String ruleKey) {
super(configuration, url);
this.force = false;
this.init(ruleKey);
}
private synchronized void init(String ruleKey) {
if (StringUtils.isEmpty(ruleKey)) {
return;
}
// routerKey = ${applicationName}.condition-router
String routerKey = ruleKey + RULE_SUFFIX;
// 監(jiān)聽配置變更
configuration.addListener(routerKey, this);
String rule = configuration.getConfig(routerKey);
if (rule != null) {
// 處理規(guī)則變更事件,ConfigurationListener接口process方法的實現(xiàn)
this.process(new ConfigChangeEvent(routerKey, rule));
}
}
// 路由規(guī)則動態(tài)變更事件處理,初始化會調(diào)用
public synchronized void process(ConfigChangeEvent event) {
if (logger.isInfoEnabled()) {
logger.info("Notification of condition rule, change type is: " + event.getChangeType() +
", raw rule is:\n " + event.getValue());
}
// 刪除事件,則置空routerRule和conditonRouters
if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
routerRule = null;
conditionRouters = Collections.emptyList();
} else {
try {
// 路由規(guī)則解析(涉及yml解析),根據(jù)規(guī)則生成conditonRouter,邏輯比較簡單,就不單獨列了
routerRule = ConditionRuleParser.parse(event.getValue());
generateConditions(routerRule);
} catch (Exception e) {
//異常處理,忽略
}
}
}
route邏輯也比較簡單
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
if (CollectionUtils.isEmpty(invokers) || conditionRouters.size() == 0) {
return invokers;
}
// We will check enabled status inside each router.
// 結(jié)果需要滿足所有router規(guī)則
for (Router router : conditionRouters) {
invokers = router.route(invokers, url, invocation);
}
return invokers;
}
最后來看TagRouter,TagRouter內(nèi)置TagRouterRule,核心route邏輯完全自己實現(xiàn);另外TagRouter也是唯一一個實現(xiàn)Router接口notify方法的router。初始化過程比較簡單,直接來看核心邏輯route和notify。
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return invokers;
}
// tagRouterRule 無效,則使用靜態(tài)tag進(jìn)行路由
if (tagRouterRule == null || !tagRouterRule.isValid() || !tagRouterRule.isEnabled()) {
return filterUsingStaticTag(invokers, url, invocation);
}
List<Invoker<T>> result = invokers;
// 優(yōu)先取invocation中的tag
String tag = StringUtils.isEmpty(invocation.getAttachment(TAG_KEY)) ? url.getParameter(TAG_KEY) :
invocation.getAttachment(TAG_KEY);
// if we are requesting for a Provider with a specific tag
// 有指定tag,則優(yōu)先使用tagRouterRule中的address過濾;
// 過濾結(jié)果非空且要求強(qiáng)制使用tagRule,則直接返回;否則根據(jù)tag再次過濾
if (StringUtils.isNotEmpty(tag)) {
List<String> addresses = tagRouterRule.getTagnameToAddresses().get(tag);
// filter by dynamic tag group first
// 根據(jù)地址過濾,addresses.contains(invoker.getUrl.getAddress)
if (CollectionUtils.isNotEmpty(addresses)) {
result = filterInvoker(invokers, invoker -> addressMatches(invoker.getUrl(), addresses));
// if result is not null OR it's null but force=true, return result directly
if (CollectionUtils.isNotEmpty(result) || tagRouterRule.isForce()) {
return result;
}
} else {
// dynamic tag group doesn't have any item about the requested app OR it's null after filtered by
// dynamic tag group but force=false. check static tag
// 否則根據(jù)tag進(jìn)行過濾
result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl().getParameter(TAG_KEY)));
}
// If there's no tagged providers that can match the current tagged request. force.tag is set by default to false, which means it will invoke any providers without a tag unless it's explicitly disallowed.
if (CollectionUtils.isNotEmpty(result) || isForceUseTag(invocation)) {
return result;
}
// FAILOVER: return all Providers without any tags.
// 失效轉(zhuǎn)移,此時會取沒有match到address的invoker
else {
List<Invoker<T>> tmp = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(),
tagRouterRule.getAddresses()));
return filterInvoker(tmp, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(TAG_KEY)));
}
} else {
// List<String> addresses = tagRouterRule.filter(providerApp);
// return all addresses in dynamic tag group.
// 未指定tag,優(yōu)先找address不匹配的invoker,若地址全部在tag,則直接返回空
List<String> addresses = tagRouterRule.getAddresses();
if (CollectionUtils.isNotEmpty(addresses)) {
result = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(), addresses));
// 1. all addresses are in dynamic tag group, return empty list.
if (CollectionUtils.isEmpty(result)) {
return result;
}
// 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the static tag group.
}
return filterInvoker(result, invoker -> {
String localTag = invoker.getUrl().getParameter(TAG_KEY);
return StringUtils.isEmpty(localTag) || !tagRouterRule.getTagNames().contains(localTag);
});
}
}
// 使用靜態(tài)tag進(jìn)行route路由過濾
private <T> List<Invoker<T>> filterUsingStaticTag(List<Invoker<T>> invokers, URL url, Invocation invocation) {
List<Invoker<T>> result = invokers;
// Dynamic param,從invocation取tag
String tag = StringUtils.isEmpty(invocation.getAttachment(TAG_KEY)) ? url.getParameter(TAG_KEY) :
invocation.getAttachment(TAG_KEY);
// Tag request
if (!StringUtils.isEmpty(tag)) {
// 配了tag,優(yōu)先通過tag過濾
result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY)));
// 未匹配到,則判斷是否強(qiáng)制使用tag,不強(qiáng)制則取tag值為空的invoker的
if (CollectionUtils.isEmpty(result) && !isForceUseTag(invocation)) {
result = filterInvoker(invokers, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY)));
}
} else {
// 取tag值為空的invoker
result = filterInvoker(invokers, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY)));
}
return result;
}
動態(tài)tag與靜態(tài)tag的區(qū)別在于動態(tài)tag使用tagRouterRule,通過監(jiān)聽動態(tài)配置,tagRouterRule會發(fā)生變更,代碼如下:
public synchronized void process(ConfigChangeEvent event) {
if (logger.isDebugEnabled()) {
logger.debug("Notification of tag rule, change type is: " + event.getChangeType() + ", raw rule is:\n " +
event.getValue());
}
try {
if (event.getChangeType().equals(ConfigChangeType.DELETED)) {
this.tagRouterRule = null;
} else {
this.tagRouterRule = TagRuleParser.parse(event.getValue());
}
} catch (Exception e) {
// 異常處理,忽略
}
}
最后來看notify,邏輯也非常簡單,根據(jù)invoker的url獲取providerApplication;若與當(dāng)前application不一致,則會重新加載tagRouterRule
public <T> void notify(List<Invoker<T>> invokers) {
if (CollectionUtils.isEmpty(invokers)) {
return;
}
Invoker<T> invoker = invokers.get(0);
URL url = invoker.getUrl();
// remote.application
String providerApplication = url.getParameter(Constants.REMOTE_APPLICATION_KEY);
if (StringUtils.isEmpty(providerApplication)) {
logger.error("TagRouter must getConfig from or subscribe to a specific application, but the application " +
"in this TagRouter is not specified.");
return;
}
//涉及configuration內(nèi)部lisenter
synchronized (this) {
if (!providerApplication.equals(application)) {
if (!StringUtils.isEmpty(application)) {
configuration.removeListener(application + RULE_SUFFIX, this);
}
String key = providerApplication + RULE_SUFFIX;
configuration.addListener(key, this);
application = providerApplication;
String rawRule = configuration.getConfig(key);
if (rawRule != null) {
this.process(new ConfigChangeEvent(key, rawRule));
}
}
}
}
關(guān)于Router的解析就到這里,
注:dubbo源碼版本2.7.1,歡迎指正。