Sentinel流量控制

系列

開(kāi)篇

  • 流量控制(flow control),其原理是監(jiān)控應(yīng)用流量的 QPS 或并發(fā)線程數(shù)等指標(biāo),當(dāng)達(dá)到指定的閾值時(shí)對(duì)流量進(jìn)行控制,以避免被瞬時(shí)的流量高峰沖垮,從而保障應(yīng)用的高可用性。
  • FlowSlot 會(huì)根據(jù)預(yù)設(shè)的規(guī)則,結(jié)合前面 NodeSelectorSlot、ClusterBuilderSlot、StatisticSlot 統(tǒng)計(jì)出來(lái)的實(shí)時(shí)信息進(jìn)行流量控制。
  • 流量控制的詳細(xì)介紹可以參考Sentinel流量控制的介紹。

流控規(guī)則介紹

  • 流控規(guī)則在管理后臺(tái)的配置參數(shù)如上圖所示。
  • 流控規(guī)則核心屬性如上圖所示。

流控規(guī)則配置

    private static void initFlowRule(int interfaceFlowLimit, boolean method) {
        FlowRule flowRule = new FlowRule(INTERFACE_RES_KEY)
                .setCount(interfaceFlowLimit)
                .setGrade(RuleConstant.FLOW_GRADE_QPS);
        List<FlowRule> list = new ArrayList<>();
        if (method) {
            FlowRule flowRule1 = new FlowRule(RES_KEY)
                    .setCount(5)
                    .setGrade(RuleConstant.FLOW_GRADE_QPS);
            list.add(flowRule1);
        }
        list.add(flowRule);
        FlowRuleManager.loadRules(list);
    }
  • 通過(guò)配置流控規(guī)則并且通過(guò)FlowRuleManager生效限流規(guī)則。

流控規(guī)則定義

public interface Rule {
    String getResource();
}

public abstract class AbstractRule implements Rule {
    // 資源名
    private String resource;
    // 流控對(duì)應(yīng)的調(diào)用來(lái)源
    private String limitApp;
}

public class FlowRule extends AbstractRule {

    // 流控類型:0=線程,1=QPS FLOW_GRADE_THREAD = 0 FLOW_GRADE_QPS = 1;
    private int grade = RuleConstant.FLOW_GRADE_QPS;
    // 流控閾值,配置的是qps類型則代表qps的值;配置的是線程數(shù)類型則代表線程數(shù)
    private double count;
     // 流控限流策略
    private int strategy = RuleConstant.STRATEGY_DIRECT;
    // 關(guān)聯(lián)流控的資源
    private String refResource;
    // 流控效果控制 0. default(reject directly), 1. warm up,  2. rate limiter, 3. warm up + rate limiter
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
    // 對(duì)應(yīng)流控效果為Warm Up情況下,出現(xiàn)的預(yù)熱時(shí)長(zhǎng)
    private int warmUpPeriodSec = 10;
    // 對(duì)應(yīng)流控效果為排隊(duì)等待情況下,出現(xiàn)的超時(shí)時(shí)間
    private int maxQueueingTimeMs = 500;
    // 對(duì)應(yīng)新增流控規(guī)則頁(yè)面的是否集群
    private boolean clusterMode;
    // 集群流控的相關(guān)配置
    private ClusterFlowConfig clusterConfig;
    // 流量整形的實(shí)現(xiàn),不同流控效果有不同算法
    private TrafficShapingController controller;
}

一條限流規(guī)則主要由下面幾個(gè)因素組成,我們可以組合這些元素來(lái)實(shí)現(xiàn)不同的限流效果:

  • resource:資源名,即限流規(guī)則的作用對(duì)象
  • count: 限流閾值
  • grade: 限流閾值類型(QPS 或并發(fā)線程數(shù))
  • limitApp: 流控針對(duì)的調(diào)用來(lái)源,若為 default 則不區(qū)分調(diào)用來(lái)源
  • strategy: 調(diào)用關(guān)系限流策略
  • controlBehavior: 流量控制效果(直接拒絕、Warm Up、勻速排隊(duì))

流控流程

public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    private final FlowRuleChecker checker;

    void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
        throws BlockException {
        // 校驗(yàn)是否限流
        checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
    }
}


public class FlowRuleChecker {

    public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {

        // 獲取匹配的規(guī)則
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                // 檢查規(guī)則能否通過(guò)
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }

    public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, 
                               int acquireCount, boolean prioritized) {
        String limitApp = rule.getLimitApp();
        // 集群模式下的規(guī)則檢測(cè)
        if (rule.isClusterMode()) {
            return passClusterCheck(rule, context, node, acquireCount, prioritized);
        }
        // 單機(jī)模式下的規(guī)則檢測(cè)
        return passLocalCheck(rule, context, node, acquireCount, prioritized);
    }

    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        // 選擇流量統(tǒng)計(jì)的節(jié)點(diǎn)進(jìn)行限流計(jì)算
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        // rule.getRater返回TrafficShapingController對(duì)象,
        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }
  • 流控的核心邏輯在FlowSlot進(jìn)行處理,通過(guò)FlowRuleChecker進(jìn)行限流規(guī)則生效判斷
  • FlowRuleChecker的checkFlow會(huì)遍歷FlowRule進(jìn)行canPassCheck判斷
  • canPassCheck單機(jī)模式執(zhí)行passLocalCheck,集群模式執(zhí)行passClusterCheck
  • passLocalCheck根據(jù)流控效果策略和獲取的統(tǒng)計(jì)節(jié)點(diǎn)判斷是否限流
  • selectNodeByRequesterAndStrategy返回ClusterBuilderSlot階段生成的ClusterNode
  • TrafficShapingController在默認(rèn)模式下返回流控效果策略DefaultController。
  • DefaultController負(fù)責(zé)實(shí)現(xiàn)流量控制。

流控效果策略

public class DefaultController implements TrafficShapingController {

    private static final int DEFAULT_AVG_USED_TOKENS = 0;
    private double count;
    private int grade;

    public DefaultController(double count, int grade) {
        this.count = count;
        this.grade = grade;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // 獲取當(dāng)前已使用的token
        int curCount = avgUsedTokens(node);
        // 當(dāng)前已使用token + 獲取的token 大于token數(shù)量的場(chǎng)景
        if (curCount + acquireCount > count) {
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);

                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }

    private int avgUsedTokens(Node node) {
        if (node == null) {
            return DEFAULT_AVG_USED_TOKENS;
        }
        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
    }
}
  • 默認(rèn)流控策略比較已使用token和此次消耗token是否大于token數(shù)量判斷限流

流控策略

com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController
com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController
com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController
com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController

public final class FlowRuleUtil {
    private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
        if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            switch (rule.getControlBehavior()) {
                // 預(yù)熱/冷啟動(dòng)
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                    return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
                            ColdFactorProperty.coldFactor);
                // 勻速排隊(duì)
                case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                    return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
                // 冷啟動(dòng)+勻速排隊(duì)
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                    return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                            rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);

                case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
                default:
                    // Default mode or unknown mode: default traffic shaping controller (fast-reject).
            }
        }
        // 直接拒絕
        return new DefaultController(rule.getCount(), rule.getGrade());
    }
}
  • 流控效果策略根據(jù)不同的規(guī)則返回不同的流控策略。
  • 直接拒絕(RuleConstant.CONTROL_BEHAVIOR_DEFAULT)方式是默認(rèn)的流量控制方式,當(dāng)QPS超過(guò)任意規(guī)則的閾值后,新的請(qǐng)求就會(huì)被立即拒絕,拒絕方式為拋出FlowException。
  • Warm Up(RuleConstant.CONTROL_BEHAVIOR_WARM_UP)方式,即預(yù)熱/冷啟動(dòng)方式。當(dāng)系統(tǒng)長(zhǎng)期處于低水位的情況下,當(dāng)流量突然增加時(shí),直接把系統(tǒng)拉升到高水位可能瞬間把系統(tǒng)壓垮。通過(guò)"冷啟動(dòng)",讓通過(guò)的流量緩慢增加,在一定時(shí)間內(nèi)逐漸增加到閾值上限,給冷系統(tǒng)一個(gè)預(yù)熱的時(shí)間,避免冷系統(tǒng)被壓垮。
  • 勻速排隊(duì)(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)方式會(huì)嚴(yán)格控制請(qǐng)求通過(guò)的間隔時(shí)間,也即是讓請(qǐng)求以均勻的速度通過(guò),對(duì)應(yīng)的是漏桶算法。

流控節(jié)點(diǎn)選擇

public class FlowRuleChecker {

    static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
        // The limit app should not be empty.
        String limitApp = rule.getLimitApp();
        int strategy = rule.getStrategy();
        String origin = context.getOrigin();

        if (limitApp.equals(origin) && filterOrigin(origin)) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Matches limit origin, return origin statistic node.
                return context.getOriginNode();
            }

            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
            // 實(shí)際訪問(wèn)的分支
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Return the cluster node.
                return node.getClusterNode();
            }

            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
            && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                return context.getOriginNode();
            }

            return selectReferenceNode(rule, context, node);
        }

        return null;
    }
}
  • 根據(jù)請(qǐng)求和策略來(lái)來(lái)返回?cái)?shù)據(jù)統(tǒng)計(jì)節(jié)點(diǎn)用以流控限制。
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {

    private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        // 負(fù)責(zé)創(chuàng)建DefaultNode,職責(zé)鏈以資源維度,slot以職責(zé)鏈維度
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }

            }
        }

        context.setCurNode(node);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}

public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    // 以資源作為key保存的全局的集群節(jié)點(diǎn)
    private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
    private static final Object lock = new Object();
    private volatile ClusterNode clusterNode = null;

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
        throws Throwable {
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    // Create the cluster node.
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);

                    clusterNodeMap = newMap;
                }
            }
        }
        // 將DefaultNode設(shè)置進(jìn)集群節(jié)點(diǎn)
        node.setClusterNode(clusterNode);

        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
}
  • NodeSelectorSlot生成的DefaultNode在ClusterBuilderSlot被設(shè)置ClusterNode。
  • ClusterNode記錄訪問(wèn)的統(tǒng)計(jì)數(shù)據(jù)會(huì)被用在流控當(dāng)中。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容