今天來看dubbo負(fù)載均衡策略中的最后一種,leastActive。leastActive在權(quán)重基礎(chǔ)上,新增活躍度(active)維度限制,步驟如下:
1、初始化。初始化最低活躍值(-1)、最低活躍節(jié)點(diǎn)個(gè)數(shù)(可能有多個(gè),默認(rèn)0)、最低活躍度節(jié)點(diǎn)下標(biāo)數(shù)組、
權(quán)重?cái)?shù)組、總權(quán)重、首個(gè)最低活躍度節(jié)點(diǎn)權(quán)重。
2、遍歷。遍歷invoker,尋找所有活躍度最低的invoker;獲取active值并計(jì)算權(quán)重;記錄最低活躍節(jié)點(diǎn)、節(jié)點(diǎn)個(gè)數(shù)、totalweight等
3、若leastCount= 1,直接返回第一個(gè);否則采用類似random的方式,基于totalweight生成隨機(jī)值,然后基于隨機(jī)值,選一個(gè)invoker
4、以上條件均不滿足,則隨機(jī)選取(leastIndexes中對應(yīng)的下標(biāo))一個(gè)返回。
整個(gè)過程看起來是比較簡單的,下面來看代碼實(shí)現(xiàn)。
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
//初始化最低活躍值、最低活躍節(jié)點(diǎn)數(shù)、最低活躍節(jié)點(diǎn)下標(biāo)數(shù)組、權(quán)重?cái)?shù)組、總權(quán)重,首個(gè)最低活躍度節(jié)點(diǎn)權(quán)重
int length = invokers.size();
int leastActive = -1;
int leastCount = 0;
int[] leastIndexes = new int[length];
int[] weights = new int[length];
int totalWeight = 0;
int firstWeight = 0;
boolean sameWeight = true;
// 遍歷,過濾所有活躍度最低的invoker
for (int i = 0; i < length; i++) {
Invoker<T> invoker = invokers.get(i);
//這里獲取invoker的active值
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
int afterWarmup = getWeight(invoker, invocation);
weights[i] = afterWarmup;
if (leastActive == -1 || active < leastActive) {
leastActive = active;
leastCount = 1;
leastIndexes[0] = i;
totalWeight = afterWarmup;
firstWeight = afterWarmup;
sameWeight = true;
} else if (active == leastActive) {
leastIndexes[leastCount++] = i;
totalWeight += afterWarmup;
if (sameWeight && i > 0
&& afterWarmup != firstWeight) {
sameWeight = false;
}
}
}
// 結(jié)果集只有一個(gè),直接返回
if (leastCount == 1) {
return invokers.get(leastIndexes[0]);
}
//結(jié)果集不止一個(gè),且所有節(jié)點(diǎn)權(quán)重不完全相同,totalweight>0
if (!sameWeight && totalWeight > 0) {
// 這里采用類似random負(fù)載均衡的方式,隨機(jī)選取一個(gè)返回
int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
for (int i = 0; i < leastCount; i++) {
int leastIndex = leastIndexes[i];
offsetWeight -= weights[leastIndex];
if (offsetWeight < 0) {
return invokers.get(leastIndex);
}
}
}
//否則,隨機(jī)選一個(gè)直接返回
return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
}
到這里,leastActive負(fù)載均衡策略已經(jīng)介紹完畢。其中一個(gè)細(xì)節(jié)需要單獨(dú)拎出來看,注意下面這行代碼
int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
active值是按照URL、methodName從RpcStatus中獲取的,那么,active的值是怎么計(jì)算出來的呢?先來看下取值邏輯:
public static RpcStatus getStatus(URL url, String methodName) {
String uri = url.toIdentityString();
ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
if (map == null) {
METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());
map = METHOD_STATISTICS.get(uri);
}
RpcStatus status = map.get(methodName);
if (status == null) {
map.putIfAbsent(methodName, new RpcStatus());
status = map.get(methodName);
}
return status;
}
邏輯比較簡單,直接根據(jù)URL的identityString從Map緩存里拿,那么值是什么時(shí)候,怎么放進(jìn)去的呢?關(guān)注下面的方法:
public static boolean beginCount(URL url, String methodName, int max) {
max = (max <= 0) ? Integer.MAX_VALUE : max;
RpcStatus appStatus = getStatus(url);
RpcStatus methodStatus = getStatus(url, methodName);
if (methodStatus.active.incrementAndGet() > max) {
methodStatus.active.decrementAndGet();
return false;
} else {
appStatus.active.incrementAndGet();
return true;
}
}
可以看到,這里是active值統(tǒng)計(jì)的統(tǒng)一入口;這是一個(gè)公用方法,同時(shí)支持method緯度和app維度的active值統(tǒng)計(jì);當(dāng)active值超過max上限時(shí),會觸發(fā)decrement。找到active入口之后,再來看看,什么時(shí)候會對active值進(jìn)行統(tǒng)計(jì),來看下面的代碼:
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
public class ActiveLimitFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
//開始統(tǒng)計(jì)
if (!RpcStatus.beginCount(url, methodName, max)) {
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
//加鎖,防止其他調(diào)用者修改count
synchronized (count) {
while (!RpcStatus.beginCount(url, methodName, max)) {
try {
count.wait(remain);
} catch (InterruptedException e) {
// ignore
}
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + count.getActive()
+ ". max concurrent invoke limit: " + max);
}
}
}
}
boolean isSuccess = true;
long begin = System.currentTimeMillis();
try {
return invoker.invoke(invocation);
} catch (RuntimeException t) {
isSuccess = false;
throw t;
} finally {
//結(jié)束統(tǒng)計(jì)
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
if (max > 0) {
synchronized (count) {
count.notifyAll();
}
}
}
}
}
每次consumer進(jìn)行RPC調(diào)用時(shí),ActiveLimitFilter都會統(tǒng)計(jì)被調(diào)用應(yīng)用、方法的active,是leastActive負(fù)載均衡中active值的源頭。了解這里的邏輯之后,我們就能比較清晰理解leastActive負(fù)載均衡策略的核心原理。
注:參考 dubbo源碼版本 2.7.1,歡迎指正。