源碼分析---和dubbo相比 迪士尼源碼搭建下載 SOFARPC是如何實現(xiàn)負載均衡的?

迪士尼源碼搭建下載【hubawl.com】


官方目前建議使用的負載均衡包括以下幾種:

random(隨機算法)

localPref(本地優(yōu)先算法)

roundRobin(輪詢算法)

consistentHash(一致性hash算法)

所以我們接下來分析以下以上四種負載均衡的源碼是怎樣的。

隨機算法

我們先看一下SOFARPC的源碼實現(xiàn):

@OverridepublicProviderInfodoSelect(SofaRequest invocation, List<ProviderInfo> providerInfos){? ? ProviderInfo providerInfo =null;intsize = providerInfos.size();// 總個數(shù)inttotalWeight =0;// 總權(quán)重booleanisWeightSame =true;// 權(quán)重是否都一樣for(inti =0; i < size; i++) {intweight = getWeight(providerInfos.get(i));? ? ? ? totalWeight += weight;// 累計總權(quán)重if(isWeightSame && i >0&& weight != getWeight(providerInfos.get(i -1))) {? ? ? ? ? ? isWeightSame =false;// 計算所有權(quán)重是否一樣}? ? }if(totalWeight >0&& !isWeightSame) {// 如果權(quán)重不相同且權(quán)重大于0則按總權(quán)重數(shù)隨機intoffset = random.nextInt(totalWeight);// 并確定隨機值落在哪個片斷上for(inti =0; i < size; i++) {? ? ? ? ? ? offset -= getWeight(providerInfos.get(i));if(offset <0) {? ? ? ? ? ? ? ? providerInfo = providerInfos.get(i);break;? ? ? ? ? ? }? ? ? ? }? ? }else{// 如果權(quán)重相同或權(quán)重為0則均等隨機providerInfo = providerInfos.get(random.nextInt(size));? ? }returnproviderInfo;}

上面主要做了幾件事:

獲取所有的provider

遍歷provier,如果當(dāng)前的provider的權(quán)重和上一個provider的權(quán)重不一樣,那么就做個標(biāo)記

如果權(quán)重不相同那么就隨機取一個0到總權(quán)重之間的值,遍歷provider去減隨機數(shù),如果減到小于0,那么就返回那個provider

如果沒有權(quán)重相同,那么用隨機函數(shù)取一個provider

我們再來看看dubbo是怎么實現(xiàn)的:

@OverrideprotectedInvokerdoSelect(List<Invoker<T>> invokers, URL url, Invocation invocation){intlength = invokers.size();// Number of invokersbooleansameWeight =true;// Every invoker has the same weight?intfirstWeight = getWeight(invokers.get(0), invocation);inttotalWeight = firstWeight;// The sum of weightsfor(inti =1; i < length; i++) {intweight = getWeight(invokers.get(i), invocation);? ? ? ? totalWeight += weight;// Sumif(sameWeight && weight != firstWeight) {? ? ? ? ? ? sameWeight =false;? ? ? ? }? ? }if(totalWeight >0&& !sameWeight) {// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.intoffset = ThreadLocalRandom.current().nextInt(totalWeight);// Return a invoker based on the random value.for(inti =0; i < length; i++) {? ? ? ? ? ? offset -= getWeight(invokers.get(i), invocation);if(offset <0) {returninvokers.get(i);? ? ? ? ? ? }? ? ? ? }? ? }// If all invokers have the same weight value or totalWeight=0, return evenly.returninvokers.get(ThreadLocalRandom.current().nextInt(length));}

獲取invoker的數(shù)量

獲取第一個invoker的權(quán)重,并復(fù)制給firstWeight

循環(huán)invoker集合,把它們的權(quán)重全部相加,并復(fù)制給totalWeight,如果權(quán)重不相等,那么sameWeight為false

如果invoker集合的權(quán)重并不是全部相等的,那么獲取一個隨機數(shù)在1到totalWeight之間,賦值給offset屬性

循環(huán)遍歷invoker集合,獲取權(quán)重并與offset相減,當(dāng)offset減到小于零,那么就返回這個inovker

如果權(quán)重相等,那么直接在invoker集合里面取一個隨機數(shù)返回

從上面我們可以看到,基本上SOFARPC和dubbo的負載均衡實現(xiàn)是一致的。

本地優(yōu)先算法

在負載均衡時使用保持本機優(yōu)先。這個相信大家也比較好理解。在所有的可選地址中,找到本機發(fā)布的地址,然后進行調(diào)用。

@OverridepublicProviderInfodoSelect(SofaRequest invocation, List<ProviderInfo> providerInfos){? ? String localhost = SystemInfo.getLocalHost();if(StringUtils.isEmpty(localhost)) {returnsuper.doSelect(invocation, providerInfos);? ? }? ? List localProviderInfo =newArrayList();for(ProviderInfo providerInfo : providerInfos) {// 解析IP,看是否和本地一致if(localhost.equals(providerInfo.getHost())) {? ? ? ? ? ? localProviderInfo.add(providerInfo);? ? ? ? }? ? }if(CommonUtils.isNotEmpty(localProviderInfo)) {// 命中本機的服務(wù)端returnsuper.doSelect(invocation, localProviderInfo);? ? }else{// 沒有命中本機上的服務(wù)端returnsuper.doSelect(invocation, providerInfos);? ? }}

查看本機的host,如果為空,那么直接調(diào)用父類隨機算法

遍歷所有的provider,如果服務(wù)提供方的host和服務(wù)調(diào)用方的host一致,那么保存到集合里

如果存在服務(wù)提供方的host和服務(wù)調(diào)用方的host一致,那么就在這些集合中選取

如果不一致,那么就在所有provider中選取

輪詢算法

我們首先來看看SOFARPC的輪訓(xùn)是怎么實現(xiàn)的:

privatefinalConcurrentMap sequences =newConcurrentHashMap();@OverridepublicProviderInfodoSelect(SofaRequest request, List<ProviderInfo> providerInfos){? ? String key = getServiceKey(request);// 每個方法級自己輪詢,互不影響intlength = providerInfos.size();// 總個數(shù)PositiveAtomicCounter sequence = sequences.get(key);if(sequence ==null) {? ? ? ? sequences.putIfAbsent(key,newPositiveAtomicCounter());? ? ? ? sequence = sequences.get(key);? ? }returnproviderInfos.get(sequence.getAndIncrement() % length);}privateStringgetServiceKey(SofaRequest request){? ? StringBuilder builder =newStringBuilder();? ? builder.append(request.getTargetAppName()).append("#")? ? ? ? .append(request.getMethodName());returnbuilder.toString();}

從上面的代碼我們可以看出,SOFARPC的輪詢做的很直接簡單。就是new了一個map,然后把每個服務(wù)的服務(wù)名拼上方法名存到map里面,然后每次value值自增1對provider取模。

我們再看dubbo的實現(xiàn)方式:

protectedInvokerdoSelect(List<Invoker<T>> invokers, URL url, Invocation invocation){? ? String key = invokers.get(0).getUrl().getServiceKey() +"."+ invocation.getMethodName();? ? ConcurrentMap map = methodWeightMap.get(key);if(map ==null) {? ? ? ? methodWeightMap.putIfAbsent(key,newConcurrentHashMap());? ? ? ? map = methodWeightMap.get(key);? ? }inttotalWeight =0;longmaxCurrent = Long.MIN_VALUE;longnow = System.currentTimeMillis();? ? Invoker selectedInvoker =null;? ? WeightedRoundRobin selectedWRR =null;for(Invoker invoker : invokers) {? ? ? ? String identifyString = invoker.getUrl().toIdentityString();? ? ? ? WeightedRoundRobin weightedRoundRobin = map.get(identifyString);intweight = getWeight(invoker, invocation);if(weight <0) {? ? ? ? ? ? weight =0;? ? ? ? }if(weightedRoundRobin ==null) {? ? ? ? ? ? weightedRoundRobin =newWeightedRoundRobin();? ? ? ? ? ? weightedRoundRobin.setWeight(weight);? ? ? ? ? ? map.putIfAbsent(identifyString, weightedRoundRobin);? ? ? ? ? ? weightedRoundRobin = map.get(identifyString);? ? ? ? }if(weight != weightedRoundRobin.getWeight()) {//weight changedweightedRoundRobin.setWeight(weight);? ? ? ? }longcur = weightedRoundRobin.increaseCurrent();? ? ? ? weightedRoundRobin.setLastUpdate(now);if(cur > maxCurrent) {? ? ? ? ? ? maxCurrent = cur;? ? ? ? ? ? selectedInvoker = invoker;? ? ? ? ? ? selectedWRR = weightedRoundRobin;? ? ? ? }? ? ? ? totalWeight += weight;? ? }if(!updateLock.get() && invokers.size() != map.size()) {if(updateLock.compareAndSet(false,true)) {try{// copy -> modify -> update referenceConcurrentMap newMap =newConcurrentHashMap();? ? ? ? ? ? ? ? newMap.putAll(map);? ? ? ? ? ? ? ? Iterator> it = newMap.entrySet().iterator();while(it.hasNext()) {? ? ? ? ? ? ? ? ? ? Entry item = it.next();if(now - item.getValue().getLastUpdate() > RECYCLE_PERIOD) {? ? ? ? ? ? ? ? ? ? ? ? it.remove();? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? methodWeightMap.put(key, newMap);? ? ? ? ? ? }finally{? ? ? ? ? ? ? ? updateLock.set(false);? ? ? ? ? ? }? ? ? ? }? ? }if(selectedInvoker !=null) {? ? ? ? selectedWRR.sel(totalWeight);returnselectedInvoker;? ? }// should not happen herereturninvokers.get(0);}

dubbo的輪詢的實現(xiàn)里面還加入了權(quán)重在里面,sofarpc的權(quán)重輪詢是放到另外一個類當(dāng)中去做的,因為性能太差了而被棄用了。

我們舉個例子來簡單看一下dubbo的加權(quán)輪詢是怎么做的:

假定有3臺dubboprovider:10.0.0.1:20884, weight=2

10.0.0.1:20886, weight=3

10.0.0.1:20888, weight=4

totalWeight=9;

那么第一次調(diào)用的時候:

10.0.0.1:20884, weight=2? ? selectedWRR ->current =210.0.0.1:20886, weight=3? ? selectedWRR ->current =310.0.0.1:20888, weight=4? ? selectedWRR ->current =4selectedInvoker->10.0.0.1:20888調(diào)用 selectedWRR.sel(totalWeight);10.0.0.1:20888, weight=4? ? selectedWRR ->current = -5返回10.0.0.1:20888這個實例那么第二次調(diào)用的時候:10.0.0.1:20884, weight=2? ? selectedWRR ->current =410.0.0.1:20886, weight=3? ? selectedWRR ->current =610.0.0.1:20888, weight=4? ? selectedWRR ->current = -1selectedInvoker->10.0.0.1:20886調(diào)用 selectedWRR.sel(totalWeight);10.0.0.1:20886 , weight=4? selectedWRR ->current = -3返回10.0.0.1:20886這個實例那么第三次調(diào)用的時候:10.0.0.1:20884, weight=2? ? selectedWRR ->current =610.0.0.1:20886, weight=3? ? selectedWRR ->current =010.0.0.1:20888, weight=4? ? selectedWRR ->current =3selectedInvoker->10.0.0.1:20884調(diào)用 selectedWRR.sel(totalWeight);10.0.0.1:20884, weight=2? ? selectedWRR ->current = -3返回10.0.0.1:20884這個實例

一致性hash算法

在SOFARPC中有兩種方式實現(xiàn)一致性hash算法,一種是帶權(quán)重的一種是不帶權(quán)重的,我對比了一下,兩邊的代碼基本上是一樣的,所以我直接分析帶權(quán)重的代碼就好了。

下面我們來分析一下代碼:

privatefinalConcurrentHashMap selectorCache =newConcurrentHashMap();@OverridepublicProviderInfodoSelect(SofaRequest request, List<ProviderInfo> providerInfos){? ? String interfaceId = request.getInterfaceName();? ? String method = request.getMethodName();? ? String key = interfaceId +"#"+ method;// 判斷是否同樣的服務(wù)列表inthashcode = providerInfos.hashCode();? ? Selector selector = selectorCache.get(key);// 原來沒有if(selector ==null||// 或者服務(wù)列表已經(jīng)變化selector.getHashCode() != hashcode) {? ? ? ? selector =newSelector(interfaceId, method, providerInfos, hashcode);? ? ? ? selectorCache.put(key, selector);? ? }returnselector.select(request);}

上面的doSelect方法就是獲取到相同服務(wù)的Selector,如果沒有就新建一個。Selector是WeightConsistentHashLoadBalancer里面的內(nèi)部類,我們接下來看看這個內(nèi)部類的實現(xiàn)。

publicSelector(String interfaceId, String method, List actualNodes,inthashcode){this.interfaceId = interfaceId;this.method = method;this.hashcode = hashcode;// 創(chuàng)建虛擬節(jié)點環(huán) (provider創(chuàng)建虛擬節(jié)點數(shù) =? 真實節(jié)點權(quán)重 * 32)this.virtualNodes =newTreeMap();// 設(shè)置越大越慢,精度越高intnum =32;for(ProviderInfo providerInfo : actualNodes) {for(inti =0; i < num * providerInfo.getWeight() /4; i++) {byte[] digest = HashUtils.messageDigest(providerInfo.getHost() + providerInfo.getPort() + i);for(inth =0; h <4; h++) {longm = HashUtils.hash(digest, h);? ? ? ? ? ? ? ? virtualNodes.put(m, providerInfo);? ? ? ? ? ? }? ? ? ? }? ? }}

Selector內(nèi)部類中就是構(gòu)建了一個TreeMap實例,然后遍歷所有的provider,每個provider虛擬的節(jié)點數(shù)是(真實節(jié)點權(quán)重 * 32)個。

虛擬好節(jié)點后,我們直接調(diào)用Selector#select方法在hash環(huán)中得到相應(yīng)的provider。

publicProviderInfoselect(SofaRequest request){? ? String key = buildKeyOfHash(request.getMethodArgs());byte[] digest = HashUtils.messageDigest(key);returnselectForKey(HashUtils.hash(digest,0));}/** * 獲取第一參數(shù)作為hash的key * *@paramargs the args *@returnthe string */privateStringbuildKeyOfHash(Object[] args){if(CommonUtils.isEmpty(args)) {returnStringUtils.EMPTY;? ? }else{returnStringUtils.toString(args[0]);? ? }}/** * Select for key. * *@paramhash the hash *@returnthe provider */privateProviderInfoselectForKey(longhash){? ? Map.Entry entry = virtualNodes.ceilingEntry(hash);if(entry ==null) {? ? ? ? entry = virtualNodes.firstEntry();? ? }returnentry.getValue();}

這上面主要是獲取第一參數(shù)作為hash的key,然后對它進行hash。所以我感覺這里可能有一個問題就是如果一個某個服務(wù)里面很多個參數(shù)一樣的服務(wù),那么是不是都會打到那同一臺機器上呢?

dubbo的實現(xiàn)方式也和SOFARPC類似,這里不再贅述。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容