系列
開篇
- sentinel 處理流程是基于slot鏈(ProcessorSlotChain)來完成的,如限流熔斷等,其中重要的一個slot就是StatisticSlot,它是做各種數(shù)據(jù)統(tǒng)計的,而限流熔斷的數(shù)據(jù)判斷來源就是StatisticSlot。
- StatisticSlot的各種數(shù)據(jù)統(tǒng)計都是基于滑動窗口來完成的,因此本文就重點分析StatisticSlot的滑動窗口統(tǒng)計機制。
- StatisticSlot的滑動窗口需要了解統(tǒng)計指標的數(shù)據(jù)結(jié)構(gòu)、滑動窗口的窗口定位,指標保存等概念。
StatisticNode
public class StatisticNode implements Node {
// 對每秒指標統(tǒng)計
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
// 每分鐘指標統(tǒng)計
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
private LongAdder curThreadNum = new LongAdder();
private long lastFetchTime = -1;
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
}
- 采集指標的統(tǒng)計節(jié)點,負責統(tǒng)計相關(guān)的采集指標。
- StatisticNode包含rollingCounterInSecond和rollingCounterInMinute。
- rollingCounterInSecond是對每秒指標的統(tǒng)計。
- rollingCounterInMinute是對每分鐘指標的統(tǒng)計。
- rollingCounterInSecond和rollingCounterInMinute是ArrayMetric,負責保存統(tǒng)計指標。
統(tǒng)計指標
- 統(tǒng)計指標使用ArrayMetric進行承載。
- ArrayMetric內(nèi)部是滑動窗口LeapArray對象。
- LeapArray的每個元素為WindowWrap。
- WindowWrap內(nèi)部包含MetricBucket。
ArrayMetric
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
}
- ArrayMetric作為保存指標的數(shù)組,通過滑動窗口LeapArray保存MetricBucket。
- MetricBucket代表統(tǒng)計指標,LeapArray代表滑動窗口,滑動窗口的每個窗口是MetricBucket對象。
LeapArray
public class BucketLeapArray extends LeapArray<MetricBucket> {
public BucketLeapArray(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
}
}
public abstract class LeapArray<T> {
protected int windowLengthInMs;
protected int sampleCount;
protected int intervalInMs;
private double intervalInSecond;
protected final AtomicReferenceArray<WindowWrap<T>> array;
private final ReentrantLock updateLock = new ReentrantLock();
public LeapArray(int sampleCount, int intervalInMs) {
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
}
- LeapArray作為滑動窗口,BucketLeapArray作為其一種具體的實現(xiàn)。
- LeapArray通過AtomicReferenceArray<WindowWrap<T>> array來實現(xiàn)滑動窗口。
- 滑動窗口的統(tǒng)計指標MetricBucket通過WindowWrap進行包裝。
WindowWrap
public class WindowWrap<T> {
private final long windowLengthInMs; // 時間窗口的長度
private long windowStart; // 時間窗口開始時間
private T value; // MetricBucket對象,保存各個指標數(shù)據(jù)
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
this.windowLengthInMs = windowLengthInMs;
this.windowStart = windowStart;
this.value = value;
}
}
- WindowWrap作為滑動窗口的每個元素的承載,內(nèi)部保存MetricBucket。
MetricBucket
public class MetricBucket {
private final LongAdder[] counters;
private volatile long minRt;
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
}
public enum MetricEvent {
PASS, // 正常通過
BLOCK, // 阻塞
EXCEPTION, // 異常
SUCCESS, // 成功
RT, // RT統(tǒng)計
OCCUPIED_PASS // 搶占通過
}
- MetricBucket內(nèi)部保存各個統(tǒng)計指標MetricEvent的LongAdder數(shù)組。
- MetricEvent的枚舉值代表各個采集指標。
滑動窗口定位
public abstract class LeapArray<T> {
protected int windowLengthInMs; // 時間窗口的長度
protected int sampleCount; // 時間窗口的個數(shù)
protected int intervalInMs;
private double intervalInSecond;
protected final AtomicReferenceArray<WindowWrap<T>> array;
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 根據(jù)當前時間和時間窗口的長度進行計算獲取窗口下標
int idx = calculateTimeIdx(timeMillis);
// 獲取指定下標的時間窗口的開始時間
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 1.為空表示當前時間窗口為初始化過,創(chuàng)建WindowWrap并cas設(shè)置到array中
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
// 2.獲取的時間窗口正好對應(yīng)當前時間,直接返回
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
} else if (windowStart > old.windowStart()) {
// 3.獲取的時間窗口為老的,進行窗口reset操作復(fù)用
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 4.時間回撥了,正常情況下不會走到這里
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
}
public class BucketLeapArray extends LeapArray<MetricBucket> {
public BucketLeapArray(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
}
@Override
public MetricBucket newEmptyBucket(long time) {
return new MetricBucket();
}
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// 重置窗口的開始時間和對應(yīng)的統(tǒng)計值
w.resetTo(startTime);
w.value().reset();
return w;
}
}
- 1.為空表示當前時間窗口為初始化過,創(chuàng)建WindowWrap并cas設(shè)置到array中
- 2.獲取的時間窗口正好對應(yīng)當前時間,直接返回
- 3.獲取的時間窗口為老的,進行窗口reset操作復(fù)用。reset操作負責重置時間窗口的開始時間和窗口統(tǒng)計值。
- 4.時間回撥了正常情況下不會走到這里
指標保存
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
@Override
public void addPass(int count) {
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
}
public class MetricBucket {
private final LongAdder[] counters;
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
public MetricBucket add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
return this;
}
}
- currentWindow返回當前時間對應(yīng)的滑動窗口。
- addPass通過add指定類型的MetricEvent指標到LongAdder當中。