Sentinel滑動窗口介紹

系列

開篇

  • sentinel 處理流程是基于slot鏈(ProcessorSlotChain)來完成的,如限流熔斷等,其中重要的一個slot就是StatisticSlot,它是做各種數(shù)據(jù)統(tǒng)計的,而限流熔斷的數(shù)據(jù)判斷來源就是StatisticSlot。
  • StatisticSlot的各種數(shù)據(jù)統(tǒng)計都是基于滑動窗口來完成的,因此本文就重點分析StatisticSlot的滑動窗口統(tǒng)計機制。
  • StatisticSlot的滑動窗口需要了解統(tǒng)計指標的數(shù)據(jù)結(jié)構(gòu)、滑動窗口的窗口定位,指標保存等概念。


StatisticNode

public class StatisticNode implements Node {
    // 對每秒指標統(tǒng)計
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);
    // 每分鐘指標統(tǒng)計
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
    private LongAdder curThreadNum = new LongAdder();
    private long lastFetchTime = -1;


    @Override
    public void addPassRequest(int count) {
        rollingCounterInSecond.addPass(count);
        rollingCounterInMinute.addPass(count);
    }
}
  • 采集指標的統(tǒng)計節(jié)點,負責統(tǒng)計相關(guān)的采集指標。
  • StatisticNode包含rollingCounterInSecond和rollingCounterInMinute。
  • rollingCounterInSecond是對每秒指標的統(tǒng)計。
  • rollingCounterInMinute是對每分鐘指標的統(tǒng)計。
  • rollingCounterInSecond和rollingCounterInMinute是ArrayMetric,負責保存統(tǒng)計指標。


統(tǒng)計指標

  • 統(tǒng)計指標使用ArrayMetric進行承載。
  • ArrayMetric內(nèi)部是滑動窗口LeapArray對象。
  • LeapArray的每個元素為WindowWrap。
  • WindowWrap內(nèi)部包含MetricBucket。


ArrayMetric

public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }

    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }
}
  • ArrayMetric作為保存指標的數(shù)組,通過滑動窗口LeapArray保存MetricBucket。
  • MetricBucket代表統(tǒng)計指標,LeapArray代表滑動窗口,滑動窗口的每個窗口是MetricBucket對象。


LeapArray

public class BucketLeapArray extends LeapArray<MetricBucket> {
    public BucketLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
    }
}

public abstract class LeapArray<T> {
    protected int windowLengthInMs;
    protected int sampleCount;
    protected int intervalInMs;
    private double intervalInSecond;
    protected final AtomicReferenceArray<WindowWrap<T>> array;
    private final ReentrantLock updateLock = new ReentrantLock();

    public LeapArray(int sampleCount, int intervalInMs) {
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.intervalInSecond = intervalInMs / 1000.0;
        this.sampleCount = sampleCount;
        this.array = new AtomicReferenceArray<>(sampleCount);
    }
}
  • LeapArray作為滑動窗口,BucketLeapArray作為其一種具體的實現(xiàn)。
  • LeapArray通過AtomicReferenceArray<WindowWrap<T>> array來實現(xiàn)滑動窗口。
  • 滑動窗口的統(tǒng)計指標MetricBucket通過WindowWrap進行包裝。


WindowWrap

public class WindowWrap<T> {

    private final long windowLengthInMs; // 時間窗口的長度
    private long windowStart; // 時間窗口開始時間
    private T value; // MetricBucket對象,保存各個指標數(shù)據(jù)

    public WindowWrap(long windowLengthInMs, long windowStart, T value) {
        this.windowLengthInMs = windowLengthInMs;
        this.windowStart = windowStart;
        this.value = value;
    }
}
  • WindowWrap作為滑動窗口的每個元素的承載,內(nèi)部保存MetricBucket。


MetricBucket

public class MetricBucket {

    private final LongAdder[] counters;
    private volatile long minRt;

    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];
        for (MetricEvent event : events) {
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();
    }
}

public enum MetricEvent {
    PASS, // 正常通過
    BLOCK, // 阻塞
    EXCEPTION, // 異常
    SUCCESS, // 成功
    RT, // RT統(tǒng)計
    OCCUPIED_PASS // 搶占通過
}
  • MetricBucket內(nèi)部保存各個統(tǒng)計指標MetricEvent的LongAdder數(shù)組。
  • MetricEvent的枚舉值代表各個采集指標。


滑動窗口定位

public abstract class LeapArray<T> {

    protected int windowLengthInMs; // 時間窗口的長度
    protected int sampleCount; // 時間窗口的個數(shù)
    protected int intervalInMs;
    private double intervalInSecond;
    protected final AtomicReferenceArray<WindowWrap<T>> array;

    public WindowWrap<T> currentWindow() {
        return currentWindow(TimeUtil.currentTimeMillis());
    }

    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        // 根據(jù)當前時間和時間窗口的長度進行計算獲取窗口下標
        int idx = calculateTimeIdx(timeMillis);
        // 獲取指定下標的時間窗口的開始時間
        long windowStart = calculateWindowStart(timeMillis);

        /*
         * Get bucket item at given time from the array.
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
         */
        while (true) {
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                // 1.為空表示當前時間窗口為初始化過,創(chuàng)建WindowWrap并cas設(shè)置到array中
                /*
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            bucket is empty, so create new and update
                 *
                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                 * then try to update circular array via a CAS operation. Only one thread can
                 * succeed to update, while other threads yield its time slice.
                 */
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                // 2.獲取的時間窗口正好對應(yīng)當前時間,直接返回
                /*
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 *
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                 */
                return old;
            } else if (windowStart > old.windowStart()) {
                // 3.獲取的時間窗口為老的,進行窗口reset操作復(fù)用
                /*
                 *   (old)
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 *          startTime of Bucket 2: 400, deprecated, should be reset
                 *
                 * If the start timestamp of old bucket is behind provided time, that means
                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                 * Note that the reset and clean-up operations are hard to be atomic,
                 * so we need a update lock to guarantee the correctness of bucket update.
                 *
                 * The update lock is conditional (tiny scope) and will take effect only when
                 * bucket is deprecated, so in most cases it won't lead to performance loss.
                 */
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // 4.時間回撥了,正常情況下不會走到這里
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }

    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
        long timeId = timeMillis / windowLengthInMs;
        // Calculate current index so we can map the timestamp to the leap array.
        return (int)(timeId % array.length());
    }

    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
        return timeMillis - timeMillis % windowLengthInMs;
    }
}


public class BucketLeapArray extends LeapArray<MetricBucket> {

    public BucketLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
    }

    @Override
    public MetricBucket newEmptyBucket(long time) {
        return new MetricBucket();
    }

    @Override
    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
        // 重置窗口的開始時間和對應(yīng)的統(tǒng)計值
        w.resetTo(startTime);
        w.value().reset();
        return w;
    }
}
  • 1.為空表示當前時間窗口為初始化過,創(chuàng)建WindowWrap并cas設(shè)置到array中
  • 2.獲取的時間窗口正好對應(yīng)當前時間,直接返回
  • 3.獲取的時間窗口為老的,進行窗口reset操作復(fù)用。reset操作負責重置時間窗口的開始時間和窗口統(tǒng)計值。
  • 4.時間回撥了正常情況下不會走到這里


指標保存

public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }

    @Override
    public void addPass(int count) {
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        wrap.value().addPass(count);
    }
}

public class MetricBucket {
    private final LongAdder[] counters;

    public void addPass(int n) {
        add(MetricEvent.PASS, n);
    }

    public MetricBucket add(MetricEvent event, long n) {
        counters[event.ordinal()].add(n);
        return this;
    }
}
  • currentWindow返回當前時間對應(yīng)的滑動窗口。
  • addPass通過add指定類型的MetricEvent指標到LongAdder當中。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容