dubbo負(fù)載均衡之leastActive

今天來看dubbo負(fù)載均衡策略中的最后一種,leastActive。leastActive在權(quán)重基礎(chǔ)上,新增活躍度(active)維度限制,步驟如下:

1、初始化。初始化最低活躍值(-1)、最低活躍節(jié)點(diǎn)個(gè)數(shù)(可能有多個(gè),默認(rèn)0)、最低活躍度節(jié)點(diǎn)下標(biāo)數(shù)組、
權(quán)重?cái)?shù)組、總權(quán)重、首個(gè)最低活躍度節(jié)點(diǎn)權(quán)重。
2、遍歷。遍歷invoker,尋找所有活躍度最低的invoker;獲取active值并計(jì)算權(quán)重;記錄最低活躍節(jié)點(diǎn)、節(jié)點(diǎn)個(gè)數(shù)、totalweight等
3、若leastCount= 1,直接返回第一個(gè);否則采用類似random的方式,基于totalweight生成隨機(jī)值,然后基于隨機(jī)值,選一個(gè)invoker
4、以上條件均不滿足,則隨機(jī)選取(leastIndexes中對應(yīng)的下標(biāo))一個(gè)返回。

整個(gè)過程看起來是比較簡單的,下面來看代碼實(shí)現(xiàn)。

@Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        //初始化最低活躍值、最低活躍節(jié)點(diǎn)數(shù)、最低活躍節(jié)點(diǎn)下標(biāo)數(shù)組、權(quán)重?cái)?shù)組、總權(quán)重,首個(gè)最低活躍度節(jié)點(diǎn)權(quán)重
        int length = invokers.size();
        int leastActive = -1;
        int leastCount = 0;
        int[] leastIndexes = new int[length];
        int[] weights = new int[length];
        int totalWeight = 0;
        int firstWeight = 0;
        boolean sameWeight = true;


        // 遍歷,過濾所有活躍度最低的invoker
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            //這里獲取invoker的active值
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
            int afterWarmup = getWeight(invoker, invocation);
            weights[i] = afterWarmup;
            if (leastActive == -1 || active < leastActive) {
                leastActive = active;
                leastCount = 1;
                leastIndexes[0] = i;
                totalWeight = afterWarmup;
                firstWeight = afterWarmup;
                sameWeight = true;
            } else if (active == leastActive) {
                leastIndexes[leastCount++] = i;
                totalWeight += afterWarmup;
                if (sameWeight && i > 0
                        && afterWarmup != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        // 結(jié)果集只有一個(gè),直接返回
        if (leastCount == 1) {
            return invokers.get(leastIndexes[0]);
        }
        //結(jié)果集不止一個(gè),且所有節(jié)點(diǎn)權(quán)重不完全相同,totalweight>0
        if (!sameWeight && totalWeight > 0) {
            // 這里采用類似random負(fù)載均衡的方式,隨機(jī)選取一個(gè)返回
            int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexes[i];
                offsetWeight -= weights[leastIndex];
                if (offsetWeight < 0) {
                    return invokers.get(leastIndex);
                }
            }
        }
        //否則,隨機(jī)選一個(gè)直接返回
        return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
    }

到這里,leastActive負(fù)載均衡策略已經(jīng)介紹完畢。其中一個(gè)細(xì)節(jié)需要單獨(dú)拎出來看,注意下面這行代碼

int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();

active值是按照URL、methodName從RpcStatus中獲取的,那么,active的值是怎么計(jì)算出來的呢?先來看下取值邏輯:

public static RpcStatus getStatus(URL url, String methodName) {
    String uri = url.toIdentityString();
    ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
    if (map == null) {
        METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());
        map = METHOD_STATISTICS.get(uri);
    }
    RpcStatus status = map.get(methodName);
    if (status == null) {
        map.putIfAbsent(methodName, new RpcStatus());
        status = map.get(methodName);
    }
    return status;
}

邏輯比較簡單,直接根據(jù)URL的identityString從Map緩存里拿,那么值是什么時(shí)候,怎么放進(jìn)去的呢?關(guān)注下面的方法:

public static boolean beginCount(URL url, String methodName, int max) {
    max = (max <= 0) ? Integer.MAX_VALUE : max;
    RpcStatus appStatus = getStatus(url);
    RpcStatus methodStatus = getStatus(url, methodName);
    if (methodStatus.active.incrementAndGet() > max) {
        methodStatus.active.decrementAndGet();
        return false;
    } else {
        appStatus.active.incrementAndGet();
        return true;
    }
}

可以看到,這里是active值統(tǒng)計(jì)的統(tǒng)一入口;這是一個(gè)公用方法,同時(shí)支持method緯度和app維度的active值統(tǒng)計(jì);當(dāng)active值超過max上限時(shí),會觸發(fā)decrement。找到active入口之后,再來看看,什么時(shí)候會對active值進(jìn)行統(tǒng)計(jì),來看下面的代碼:

@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY)
public class ActiveLimitFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
        RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
        //開始統(tǒng)計(jì)
        if (!RpcStatus.beginCount(url, methodName, max)) {
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            //加鎖,防止其他調(diào)用者修改count
            synchronized (count) {
                while (!RpcStatus.beginCount(url, methodName, max)) {
                    try {
                        count.wait(remain);
                    } catch (InterruptedException e) {
                        // ignore
                    }
                    long elapsed = System.currentTimeMillis() - start;
                    remain = timeout - elapsed;
                    if (remain <= 0) {
                        throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                + invoker.getInterface().getName() + ", method: "
                                + invocation.getMethodName() + ", elapsed: " + elapsed
                                + ", timeout: " + timeout + ". concurrent invokes: " + count.getActive()
                                + ". max concurrent invoke limit: " + max);
                    }
                }
            }
        }

        boolean isSuccess = true;
        long begin = System.currentTimeMillis();
        try {
            return invoker.invoke(invocation);
        } catch (RuntimeException t) {
            isSuccess = false;
            throw t;
        } finally {
            //結(jié)束統(tǒng)計(jì)
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
            if (max > 0) {
                synchronized (count) {
                    count.notifyAll();
                }
            }
        }
    }
}

每次consumer進(jìn)行RPC調(diào)用時(shí),ActiveLimitFilter都會統(tǒng)計(jì)被調(diào)用應(yīng)用、方法的active,是leastActive負(fù)載均衡中active值的源頭。了解這里的邏輯之后,我們就能比較清晰理解leastActive負(fù)載均衡策略的核心原理。

注:參考 dubbo源碼版本 2.7.1,歡迎指正。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 技術(shù)點(diǎn) 面試中Dubbo負(fù)載均衡常問的幾點(diǎn) 常見負(fù)載均衡算法簡介 Dubbo 官方文檔介紹 Dubbo 負(fù)載均衡的...
    小刀愛編程閱讀 1,161評論 0 1
  • 開篇 這篇文章的目的是為了講解清楚dubbo負(fù)載均衡的底層實(shí)現(xiàn)機(jī)制,根據(jù)瀏覽的網(wǎng)上資料來看似乎隨著dubbo的版本...
    晴天哥_王志閱讀 644評論 0 5
  • 概述 比較經(jīng)典的5種負(fù)載均衡算法:隨機(jī)法、輪詢法、最少連接數(shù)法、最快響應(yīng)法、Hash化散列法(包括IP-Hash和...
    黃靠譜閱讀 3,087評論 0 33
  • 概述 本篇文章主要是想講解清楚dubbo的負(fù)載均衡策略,其中文章的內(nèi)容部分共享自網(wǎng)絡(luò),部分來自自己的整理,目的...
    晴天哥_王志閱讀 4,464評論 0 2
  • 今天感恩節(jié)哎,感謝一直在我身邊的親朋好友。感恩相遇!感恩不離不棄。 中午開了第一次的黨會,身份的轉(zhuǎn)變要...
    余生動聽閱讀 10,920評論 0 11

友情鏈接更多精彩內(nèi)容