從今天開(kāi)始,會(huì)不定期更新dubbo源碼相關(guān)文章。
今天所要描述的場(chǎng)景是當(dāng)注冊(cè)中心provider數(shù)據(jù)發(fā)生變更時(shí),consumer端如何感知并同步更新。閱讀以下文章需要對(duì)dubbo的基本實(shí)現(xiàn)機(jī)制有一定了解。

先引用官方的dubbo架構(gòu)圖,接下來(lái)所要描述的場(chǎng)景就是圖中的第三步,notify。
當(dāng)前團(tuán)隊(duì)使用訂閱機(jī)制是輪詢機(jī)制,每隔一個(gè)周期去請(qǐng)求注冊(cè)中心,對(duì)比注冊(cè)中心的版本號(hào)和本地版本號(hào),當(dāng)版本號(hào)發(fā)生變化時(shí),更新本地訂閱信息。

大致流程圖如上所示。下面開(kāi)始看源碼。
//org.apache.dubbo.registry.support.AbstractRegistry#notify()
/**
* Notify changes from the Provider side.
*
* @param url consumer side url
* @param listener listener
* @param urls provider latest urls
*/
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
//省略部分代碼
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
listener.notify(categoryList);//更新本地的訂閱信息
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
saveProperties(url);//保存最新provider信息到本地文件
}
}
上述方法是實(shí)現(xiàn)自定義注冊(cè)中心的入口。也是更新本地訂閱信息的入口。
參數(shù)介紹:
- url是consumer端url。
- listener是實(shí)現(xiàn)更新的接口
- urls是provider端url。
接著看listener.notify的實(shí)現(xiàn)。
public interface NotifyListener {
/**
* Triggered when a service change notification is received.
* <p>
* Notify needs to support the contract: <br>
* 1. Always notifications on the service interface and the dimension of the data type. that is, won't notify part of the same type data belonging to one service. Users do not need to compare the results of the previous notification.<br>
* 2. The first notification at a subscription must be a full notification of all types of data of a service.<br>
* 3. At the time of change, different types of data are allowed to be notified separately, e.g.: providers, consumers, routers, overrides. It allows only one of these types to be notified, but the data of this type must be full, not incremental.<br>
* 4. If a data type is empty, need to notify a empty protocol with category parameter identification of url data.<br>
* 5. The order of notifications to be guaranteed by the notifications(That is, the implementation of the registry). Such as: single thread push, queue serialization, and version comparison.<br>
*
* @param urls The list of registered information , is always not empty. The meaning is the same as the return value of {@link org.apache.dubbo.registry.RegistryService#lookup(URL)}.
*/
void notify(List<URL> urls);
}
大致譯文如下:
當(dāng)服務(wù)發(fā)生變化時(shí),觸發(fā)該方法。
Notify實(shí)現(xiàn)需要支持以下契約:
- 使用服務(wù)接口和數(shù)據(jù)類(lèi)型規(guī)格來(lái)通知。就是說(shuō),不要僅僅通知一個(gè)服務(wù)的部分?jǐn)?shù)據(jù)(需要完整數(shù)據(jù))。用戶不需要將當(dāng)前通知與前置通知做比較。
- 訂閱的第一個(gè)通知必須是一個(gè)服務(wù)所有數(shù)據(jù)類(lèi)型的全量通知。
- 變化發(fā)生時(shí),不同數(shù)據(jù)類(lèi)型允許分開(kāi)通知,比如provider,consumers,routers,overides。允許只有一個(gè)類(lèi)型被通知,但是數(shù)據(jù)類(lèi)型必須是完整的,不能是增量的。
- 如果數(shù)據(jù)類(lèi)型為空,需要返回有category參數(shù)的空協(xié)議(這一點(diǎn)很重要,如果不顯示返回空協(xié)議,本地的配置不會(huì)被覆蓋)。
- 通知的順序必須有保證(registry的實(shí)現(xiàn))。比如單線程推送、隊(duì)列串行化、版本比較。
//org.apache.dubbo.registry.integration.RegistryDirectory#notify
@Override
public synchronized void notify(List<URL> urls) {
//省略部分代碼
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// providers
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
refreshOverrideAndInvoker(providerURLs);
}
org.apache.dubbo.registry.integration.RegistryDirectory#notify是NotifyListener.notify的一個(gè)實(shí)現(xiàn)。包含以下幾步:
- 獲取url中的configurators信息,configurators是外部化配置信息,包含服務(wù)者動(dòng)態(tài)配置 URL 元數(shù)據(jù)信息。
- 獲取url中的routers信息, routers是路由配置信息,包含消費(fèi)者路由策略 URL 元數(shù)據(jù)信息。
- 獲取url中的providers信息,providers是服務(wù)提供者注冊(cè)信息,包含多個(gè)服務(wù)者 URL 元數(shù)據(jù)信息。
接下來(lái)看下refreshOverrideAndInvoker的實(shí)現(xiàn)。
private void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
overrideDirectoryUrl();//做配置信息的覆蓋合并
refreshInvoker(urls);//刷新invokers信息。
}
配置信息的覆蓋比較簡(jiǎn)單,接下里看如何刷新invokers。
private void refreshInvoker(List<URL> invokerUrls) {
//省略部分代碼
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// url轉(zhuǎn)換為 Invoker map
//省略部分代碼
try {
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // 銷(xiāo)毀舊的Invoker,啟動(dòng)新的Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
刷新invokers分為兩步。
- url轉(zhuǎn)換為invokers實(shí)例。
- 銷(xiāo)毀舊的Invoker,啟動(dòng)新的Invoker。
接下來(lái)看下url轉(zhuǎn)換的實(shí)現(xiàn):
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
//省略部分代碼
URL url = mergeUrl(providerUrl);// 合并url參數(shù),包括是否啟用等配置信息。
//合并的順序是override > -D >Consumer > Provider
for (URL providerUrl : urls) {
// key是url全稱
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // 本地的invoker實(shí)例
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { //本地查找invoker不存在
try {
boolean enabled = true;
if (url.hasParameter(DISABLED_KEY)) { //先判斷disabled key是否存在
enabled = !url.getParameter(DISABLED_KEY, false);
} else { //再判斷enabled key是否存在
enabled = url.getParameter(ENABLED_KEY, true);
}
if (enabled) { //如果enabled=true,創(chuàng)建新的invoker實(shí)例
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // 新的invoker實(shí)例放入map中
newUrlInvokerMap.put(key, invoker);
}
} else { //invoker在本地存在,不需要重新構(gòu)造,直接放入map中。
newUrlInvokerMap.put(key, invoker);
}
}
return newUrlInvokerMap;
}
url轉(zhuǎn)換Invoker完成后,再看下如果啟動(dòng)新的Invokers。
private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
//省略部分代碼
// 找出oldUrlInvokerMap中有,newUrlInvokerMap中沒(méi)有的實(shí)例
List<String> deleted = null;
if (oldUrlInvokerMap != null) {
Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
if (!newInvokers.contains(entry.getValue())) {
if (deleted == null) {
deleted = new ArrayList<>();
}
deleted.add(entry.getKey());
}
}
}
//調(diào)用detroy方法銷(xiāo)毀實(shí)例
if (deleted != null) {
for (String url : deleted) {
if (url != null) {
Invoker<T> invoker = oldUrlInvokerMap.remove(url);
if (invoker != null) {
try {
invoker.destroy();
if (logger.isDebugEnabled()) {
logger.debug("destroy invoker[" + invoker.getUrl() + "] success. ");
}
} catch (Exception e) {
logger.warn("destroy invoker[" + invoker.getUrl() + "] failed. " + e.getMessage(), e);
}
}
}
}
}
}
OK,整個(gè)訂閱更新的過(guò)程已經(jīng)走完。