dubbo是一個分布式服務(wù)框架,能避免單點故障和支持服務(wù)的橫向擴容。一個服務(wù)通常會部署多個實例,同時一個服務(wù)能注冊到多個注冊中心。如何從多個服務(wù) Provider 組成的集群中挑選出一個進行調(diào)用,就涉及到一個負載均衡的策略。
1、dubbo負載均衡實現(xiàn)說明
dubbo服務(wù)調(diào)用流程圖:

從以上調(diào)用流程圖可知,dubbo的負載均衡主要在客戶端實現(xiàn),并通過封裝Cluster、Directory、LoadBalance相關(guān)接口實現(xiàn)。
1.1、Cluster、Directory、Router、LoadBalance關(guān)系

各組件關(guān)系說明:
- 這里的Invoker是Provider的一個可調(diào)用Service的抽象,Invoker封裝了Provider地址及Service接口信息。
- Directory代表多個Invoker,可以把它看成List,但與List不同的是,它的值可能是動態(tài)變化的,比如注冊中心推送變更。
- Cluster將Directory中的多個Invoker偽裝成一個Invoker,對上層透明,偽裝過程包含了容錯邏輯,調(diào)用失敗后,重試另一個。
- Router負責(zé)從多個Invoker中按路由規(guī)則選出子集,比如讀寫分離,應(yīng)用隔離等。
- LoadBalance負責(zé)從多個Invoker中選出具體的一個用于本次調(diào)用,選的過程包含了負載均衡算法,調(diào)用失敗后,需要重選。
1.2、客戶端負載均衡源碼分析
1.2.1、ReferenceConfig中負載均衡的封裝
客戶端在進行代理處理時,在如下地方對負載均衡相關(guān)進行封裝:
包路徑:dubbo-config->dubbo-config-api
類名:ReferenceConfig
方法名:createProxy()
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use RegistryAwareCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, RegistryAwareCluster.NAME);
// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
invoker = cluster.join(new StaticDirectory(invokers));
}
}
處理流程:
- 若只有一個注冊中心,則直接調(diào)用RegistryProtocol.refer()進行服務(wù)引用處理,將其封裝成invoker,RegistryProtocol.refer()內(nèi)部對負載均衡進行了封裝;
- 若為多個注冊中心,先分別對各注冊中心進行服務(wù)引用處理,然后再應(yīng)用Cluster.join()及StaticDirectory將多個注冊中心的invoker再封裝成一個invoker來達到對外透明;
1.2.2、RegistryProtocol中負載均衡的封裝
RegistryProtocol中對單個注冊中心進行了負載均衡的封裝:
包路徑:dubbo-registry->dubbo-registry-api
類名:RegistryProtocol
方法名:doRefer()
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
registry.register(getRegisteredConsumerUrl(subscribeUrl, url));
}
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
主要處理流程:
- 實例化RegistryDirectory類型的Directory,RegistryDirectory有如下功能,訂閱感興趣的服務(wù)提供者信息,當(dāng)提供者或路由變更時,動態(tài)變更本地的提供者列表及路由過濾鏈;并根據(jù)Router、LoadBalance動態(tài)選擇調(diào)用的服務(wù)提供者;
- 將消費端注冊到注冊中心;
- 構(gòu)建Directory中的路由過濾鏈;
- 在Directory中對注冊中心中對應(yīng)的提供者相關(guān)的配置、提供者、路由規(guī)則進行訂閱;
- 用Cluster對Directory進行封裝;
1.2.3、RegistryDirectory中負載均衡處理的封裝
RegistryDirectory主要封裝了訂閱、信息變更通知處理、獲取服務(wù)提供者信息等;
包路徑:dubbo-registry->dubbo-registry-api
類名:RegistryDirectory
(1)、訂閱感興趣的信息
源碼如下:
public void subscribe(URL url) {
setConsumerUrl(url);
consumerConfigurationListener.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}
本處主要對消費端的訂閱進行了處理,消費端向注冊中心訂閱三個信息:配置信息、服務(wù)提供者、路由信息;當(dāng)這三個信息有任何變更,本地就會接到通知,并進行處理;
(2)、配置信息、服務(wù)提供者、路由信息變更通知處理
源碼如下:
public synchronized void notify(List<URL> urls) {
List<URL> categoryUrls = urls.stream()
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.toList());
/**
* TODO Try to refactor the processing of these three type of urls using Collectors.groupBy()?
*/
this.configurators = Configurator.toConfigurators(classifyUrls(categoryUrls, UrlUtils::isConfigurator))
.orElse(configurators);
toRouters(classifyUrls(categoryUrls, UrlUtils::isRoute)).ifPresent(this::addRouters);
// providers
refreshOverrideAndInvoker(classifyUrls(categoryUrls, UrlUtils::isProvider));
}
當(dāng)有信息變更時,本方法就會被調(diào)用,會根據(jù)變更的配置或路由信息或服務(wù)提供者進行相應(yīng)處理;若路由信息變更,則重新構(gòu)建路由過濾鏈;若服務(wù)提供者變更,則重構(gòu)刷新本地緩存的服務(wù)提供者列表;
(3)、獲取服務(wù)提供者列表
源碼如下:
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 1. No service provider 2. Service providers are disabled
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
", please check status of providers(disabled, not registered or in blacklist).");
}
if (multiGroup) {
return this.invokers == null ? Collections.emptyList() : this.invokers;
}
List<Invoker<T>> invokers = null;
try {
// Get invokers from cache, only runtime routers will be executed.
invokers = routerChain.route(getConsumerUrl(), invocation);
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
// FIXME Is there any need of failing back to Constants.ANY_VALUE or the first available method invokers when invokers is null?
/*Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation);
invokers = localMethodInvokerMap.get(methodName);
if (invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}*/
return invokers == null ? Collections.emptyList() : invokers;
}
當(dāng)負載均衡獲取可用的服務(wù)提供者列表時會調(diào)用此方法,此方法主要根據(jù)注冊中心提供的服務(wù)提供者列表,并利用路由規(guī)則對提供者列表進行過濾。
1.2.4、FailoverCluster中負載均衡處理的封裝
包路徑:dubbo-cluster
dubbo中默認的Cluster實現(xiàn)為FailoverCluster,其主要是通過join()方法將Directory進行封裝的,而實際的處理是通過FailoverClusterInvoker實現(xiàn)的,客戶端調(diào)用服務(wù)時就是通過此invoker.invoke()進行實際調(diào)用處理的;
源碼如下:
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
FailoverClusterInvoker.invoke()實現(xiàn):
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
List<Invoker<T>> invokers = list(invocation);
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
主要處理流程:
- 獲取rpc上下文信息,將這些信息添加到調(diào)用附加參數(shù)信息中;
- 調(diào)用list(),實際調(diào)用的是Directory.list()獲取可用的服務(wù)提供者列表,此列表是通過Directory中的路由器過濾后的列表;
- 調(diào)用initLoadBalance(),獲取配置的LoadBalance,若為配置,則使用默認的LoadBalance實現(xiàn)RandomLoadBalance;
- 調(diào)用doInvode()進行實際的服務(wù)提供者選取及服務(wù)調(diào)用等;
FailoverClusterInvoker.doInvoke()實現(xiàn):
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
主要處理流程為:調(diào)用select()選取服務(wù)提供者并調(diào)用;select()中主要調(diào)用LoadBalance.selct()進行選擇;
2、Directory實現(xiàn)分析
Directory代表多個Invoker,可以把它看成List,但與List不同的是,它的值可能是動態(tài)變化的,比如注冊中心推送變更。Cluster將Directory中的多個Invoker偽裝成一個Invoker,對上層透明,偽裝過程包含了容錯邏輯,調(diào)用失敗后,重試另一個。
Directory接口類繼承圖:

RegistryDirectory:
RegistryDirectory實現(xiàn)了NotifyListener接口,因此他本身也是一個監(jiān)聽器,可以在服務(wù)變更時接受通知,消費方要調(diào)用遠程服務(wù),會向注冊中心訂閱這個服務(wù)的所有的服務(wù)提供方,訂閱的時候會調(diào)用notify方法,進行invoker實例的重新生成,也就是服務(wù)的重新引用。在服務(wù)提供方有變動時,也會調(diào)用notify方法,有關(guān)notify方法在Dubbo中訂閱和通知解析那篇文章中已經(jīng)解釋,不做重復(fù)。subscribe方法也不做重復(fù)解釋。
StaticDirectory:
靜態(tài)目錄服務(wù),當(dāng)有多個注冊中心時會使用此實現(xiàn)。
3、Cluster實現(xiàn)分析
Dubbo中的Cluster可以將多個服務(wù)提供方偽裝成一個提供方,具體也就是將Directory中的多個Invoker偽裝成一個Invoker,在偽裝的過程中包含了容錯的處理,負載均衡的處理和路由的處理。
Cluster主要實現(xiàn)的類繼承圖:

AbstractClusterInvoker主要實現(xiàn)類繼承圖:

集群的容錯模式:
failover(默認):
- 失敗自動切換,當(dāng)出現(xiàn)失敗,重試其他服務(wù)器。
- 通常用于讀操作,但重試會帶來更長延遲。
- 可通過retries=x來設(shè)置重試次數(shù)(不含第一次)。
failfast:
- 快速失敗,只發(fā)起一次調(diào)用,失敗理解報錯。
- 通常用于非冪等性的寫操作,比如新增記錄。
failsafe:
- 失敗安全,出現(xiàn)異常時,直接忽略;
- 通常用于寫入審計日志等操作。
failback:
- 失敗自動回復(fù),后臺記錄失敗請求,定時重發(fā)。
- 通常用于消息通知操作。
forking:
- 并行調(diào)用多個服務(wù)器,只要一個成功即返回。
- 通常用于實時性要求較高的讀操作,但需要浪費更多服務(wù)資源。
- 可通過forks=x來設(shè)置最大并行數(shù)。
broadcast:
- 廣播調(diào)用所有提供者,逐個調(diào)用,任意一臺報錯則報錯。
- 通常用于通知所有提供者更新緩存或日志等本地資源信息。
4、LoadBalance實現(xiàn)分析

random:
- 隨機,按權(quán)重設(shè)置隨機概率。
- 在一個截面上碰撞的概率高,但調(diào)用量越大分布越均勻,而且按概率使用權(quán)重后也比較均勻,有利于動態(tài)調(diào)整提供者權(quán)重。
roundrobin:
- 輪詢,按公約后的權(quán)重設(shè)置輪詢比率。
- 存在慢的提供者累計請求問題,比如:第二臺機器很慢,但沒掛,當(dāng)請求調(diào)用到第二臺是就卡在那,久而久之,所有請求都卡在掉第二臺上。
leastactive:
- 最少活躍調(diào)用數(shù),相同活躍數(shù)的隨機,活躍數(shù)指調(diào)用前后計數(shù)差。
- 使慢的提供者收到更少請求,因為越慢的提供者的調(diào)用前后計數(shù)差會越大。
consistenthash:
- 一致性Hash,相同參數(shù)的請求總是發(fā)到同一提供者。
- 當(dāng)某一臺提供者掛掉時,原本發(fā)往該提供者的請求,基于虛擬節(jié)點,平攤到其他提供者,不會引起劇烈變動。
- 算法參見:https://en.wikipedia.org/wiki/Consistent_hashiing
- 缺省只對第一個參考Hash,如果要修改,請配置<dubbo:parameter key="hash.arguments" value="0,1" />
- 缺省用160份虛擬節(jié)點,如果要修改,請配置<dubbo:parameter key="hash.nodes" value="320" />
5、Router實現(xiàn)分析
dubbo的路由干的事,就是一個請求過來,dubbo依據(jù)配置的路由規(guī)則,計算出哪些提供者可以提供這次的請求服務(wù)。所以,它的優(yōu)先級是在集群容錯策略和負載均衡策略之前的。即先有路由規(guī)則遴選出符合條件的服務(wù)提供者然后,再在這些服務(wù)提供者之中應(yīng)用負載均衡,集群容錯策略。
Router接口繼承圖:

ScriptRouter:
腳本路由規(guī)則 支持 JDK 腳本引擎的所有腳本,比如:javascript, jruby, groovy 等,通過 type=javascript 參數(shù)設(shè)置腳本類型,缺省為 javascript。
ConditionRouter:
條件路由主要就是根據(jù)dubbo管理控制臺配置的路由規(guī)則來過濾相關(guān)的invoker,當(dāng)我們對路由規(guī)則點擊啟用的時候,就會觸發(fā)RegistryDirectory類的notify方法,其會重構(gòu)本地路由調(diào)用鏈,而當(dāng)從Directory中獲取服務(wù)提供者的list時,會利用此路由規(guī)則將提供者列表進行過濾;