系列
- Sentinel流程介紹
- Sentinel資源節(jié)點(diǎn)樹(shù)構(gòu)成
- Sentinel滑動(dòng)窗口介紹
- Sentinel流量控制
- Sentinel的職責(zé)鏈slot介紹
- Sentinel熔斷降級(jí)
- Sentinel Dashboard和應(yīng)用通信
- Sentinel 控制臺(tái)
開(kāi)篇
- 流量控制(flow control),其原理是監(jiān)控應(yīng)用流量的 QPS 或并發(fā)線程數(shù)等指標(biāo),當(dāng)達(dá)到指定的閾值時(shí)對(duì)流量進(jìn)行控制,以避免被瞬時(shí)的流量高峰沖垮,從而保障應(yīng)用的高可用性。
- FlowSlot 會(huì)根據(jù)預(yù)設(shè)的規(guī)則,結(jié)合前面 NodeSelectorSlot、ClusterBuilderSlot、StatisticSlot 統(tǒng)計(jì)出來(lái)的實(shí)時(shí)信息進(jìn)行流量控制。
- 流量控制的詳細(xì)介紹可以參考Sentinel流量控制的介紹。
流控規(guī)則介紹

- 流控規(guī)則在管理后臺(tái)的配置參數(shù)如上圖所示。

- 流控規(guī)則核心屬性如上圖所示。
流控規(guī)則配置
private static void initFlowRule(int interfaceFlowLimit, boolean method) {
FlowRule flowRule = new FlowRule(INTERFACE_RES_KEY)
.setCount(interfaceFlowLimit)
.setGrade(RuleConstant.FLOW_GRADE_QPS);
List<FlowRule> list = new ArrayList<>();
if (method) {
FlowRule flowRule1 = new FlowRule(RES_KEY)
.setCount(5)
.setGrade(RuleConstant.FLOW_GRADE_QPS);
list.add(flowRule1);
}
list.add(flowRule);
FlowRuleManager.loadRules(list);
}
- 通過(guò)配置流控規(guī)則并且通過(guò)FlowRuleManager生效限流規(guī)則。
流控規(guī)則定義
public interface Rule {
String getResource();
}
public abstract class AbstractRule implements Rule {
// 資源名
private String resource;
// 流控對(duì)應(yīng)的調(diào)用來(lái)源
private String limitApp;
}
public class FlowRule extends AbstractRule {
// 流控類型:0=線程,1=QPS FLOW_GRADE_THREAD = 0 FLOW_GRADE_QPS = 1;
private int grade = RuleConstant.FLOW_GRADE_QPS;
// 流控閾值,配置的是qps類型則代表qps的值;配置的是線程數(shù)類型則代表線程數(shù)
private double count;
// 流控限流策略
private int strategy = RuleConstant.STRATEGY_DIRECT;
// 關(guān)聯(lián)流控的資源
private String refResource;
// 流控效果控制 0. default(reject directly), 1. warm up, 2. rate limiter, 3. warm up + rate limiter
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
// 對(duì)應(yīng)流控效果為Warm Up情況下,出現(xiàn)的預(yù)熱時(shí)長(zhǎng)
private int warmUpPeriodSec = 10;
// 對(duì)應(yīng)流控效果為排隊(duì)等待情況下,出現(xiàn)的超時(shí)時(shí)間
private int maxQueueingTimeMs = 500;
// 對(duì)應(yīng)新增流控規(guī)則頁(yè)面的是否集群
private boolean clusterMode;
// 集群流控的相關(guān)配置
private ClusterFlowConfig clusterConfig;
// 流量整形的實(shí)現(xiàn),不同流控效果有不同算法
private TrafficShapingController controller;
}
一條限流規(guī)則主要由下面幾個(gè)因素組成,我們可以組合這些元素來(lái)實(shí)現(xiàn)不同的限流效果:
- resource:資源名,即限流規(guī)則的作用對(duì)象
- count: 限流閾值
- grade: 限流閾值類型(QPS 或并發(fā)線程數(shù))
- limitApp: 流控針對(duì)的調(diào)用來(lái)源,若為 default 則不區(qū)分調(diào)用來(lái)源
- strategy: 調(diào)用關(guān)系限流策略
- controlBehavior: 流量控制效果(直接拒絕、Warm Up、勻速排隊(duì))
流控流程
public class FlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
private final FlowRuleChecker checker;
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
// 校驗(yàn)是否限流
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
}
public class FlowRuleChecker {
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
// 獲取匹配的規(guī)則
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
// 檢查規(guī)則能否通過(guò)
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node,
int acquireCount, boolean prioritized) {
String limitApp = rule.getLimitApp();
// 集群模式下的規(guī)則檢測(cè)
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
// 單機(jī)模式下的規(guī)則檢測(cè)
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
// 選擇流量統(tǒng)計(jì)的節(jié)點(diǎn)進(jìn)行限流計(jì)算
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
// rule.getRater返回TrafficShapingController對(duì)象,
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
- 流控的核心邏輯在FlowSlot進(jìn)行處理,通過(guò)FlowRuleChecker進(jìn)行限流規(guī)則生效判斷
- FlowRuleChecker的checkFlow會(huì)遍歷FlowRule進(jìn)行canPassCheck判斷
- canPassCheck單機(jī)模式執(zhí)行passLocalCheck,集群模式執(zhí)行passClusterCheck
- passLocalCheck根據(jù)流控效果策略和獲取的統(tǒng)計(jì)節(jié)點(diǎn)判斷是否限流
- selectNodeByRequesterAndStrategy返回ClusterBuilderSlot階段生成的ClusterNode
- TrafficShapingController在默認(rèn)模式下返回流控效果策略DefaultController。
- DefaultController負(fù)責(zé)實(shí)現(xiàn)流量控制。
流控效果策略
public class DefaultController implements TrafficShapingController {
private static final int DEFAULT_AVG_USED_TOKENS = 0;
private double count;
private int grade;
public DefaultController(double count, int grade) {
this.count = count;
this.grade = grade;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 獲取當(dāng)前已使用的token
int curCount = avgUsedTokens(node);
// 當(dāng)前已使用token + 獲取的token 大于token數(shù)量的場(chǎng)景
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
}
- 默認(rèn)流控策略比較已使用token和此次消耗token是否大于token數(shù)量判斷限流
流控策略
com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController
com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController
com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpRateLimiterController
com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController
public final class FlowRuleUtil {
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
switch (rule.getControlBehavior()) {
// 預(yù)熱/冷啟動(dòng)
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
// 勻速排隊(duì)
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
// 冷啟動(dòng)+勻速排隊(duì)
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
// 直接拒絕
return new DefaultController(rule.getCount(), rule.getGrade());
}
}
- 流控效果策略根據(jù)不同的規(guī)則返回不同的流控策略。

- 直接拒絕(RuleConstant.CONTROL_BEHAVIOR_DEFAULT)方式是默認(rèn)的流量控制方式,當(dāng)QPS超過(guò)任意規(guī)則的閾值后,新的請(qǐng)求就會(huì)被立即拒絕,拒絕方式為拋出FlowException。
- Warm Up(RuleConstant.CONTROL_BEHAVIOR_WARM_UP)方式,即預(yù)熱/冷啟動(dòng)方式。當(dāng)系統(tǒng)長(zhǎng)期處于低水位的情況下,當(dāng)流量突然增加時(shí),直接把系統(tǒng)拉升到高水位可能瞬間把系統(tǒng)壓垮。通過(guò)"冷啟動(dòng)",讓通過(guò)的流量緩慢增加,在一定時(shí)間內(nèi)逐漸增加到閾值上限,給冷系統(tǒng)一個(gè)預(yù)熱的時(shí)間,避免冷系統(tǒng)被壓垮。
- 勻速排隊(duì)(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER)方式會(huì)嚴(yán)格控制請(qǐng)求通過(guò)的間隔時(shí)間,也即是讓請(qǐng)求以均勻的速度通過(guò),對(duì)應(yīng)的是漏桶算法。
流控節(jié)點(diǎn)選擇
public class FlowRuleChecker {
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
// The limit app should not be empty.
String limitApp = rule.getLimitApp();
int strategy = rule.getStrategy();
String origin = context.getOrigin();
if (limitApp.equals(origin) && filterOrigin(origin)) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Matches limit origin, return origin statistic node.
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
// 實(shí)際訪問(wèn)的分支
if (strategy == RuleConstant.STRATEGY_DIRECT) {
// Return the cluster node.
return node.getClusterNode();
}
return selectReferenceNode(rule, context, node);
} else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
&& FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
return selectReferenceNode(rule, context, node);
}
return null;
}
}
- 根據(jù)請(qǐng)求和策略來(lái)來(lái)返回?cái)?shù)據(jù)統(tǒng)計(jì)節(jié)點(diǎn)用以流控限制。
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {
private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
// 負(fù)責(zé)創(chuàng)建DefaultNode,職責(zé)鏈以資源維度,slot以職責(zé)鏈維度
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
public class ClusterBuilderSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
// 以資源作為key保存的全局的集群節(jié)點(diǎn)
private static volatile Map<ResourceWrapper, ClusterNode> clusterNodeMap = new HashMap<>();
private static final Object lock = new Object();
private volatile ClusterNode clusterNode = null;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
// 將DefaultNode設(shè)置進(jìn)集群節(jié)點(diǎn)
node.setClusterNode(clusterNode);
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
}
- NodeSelectorSlot生成的DefaultNode在ClusterBuilderSlot被設(shè)置ClusterNode。
- ClusterNode記錄訪問(wèn)的統(tǒng)計(jì)數(shù)據(jù)會(huì)被用在流控當(dāng)中。