Guava RateLimiter代碼與限流邏輯梳理

前言

最近需要在網(wǎng)關(guān)層做一個限流的需求,由于需要對一個機房內(nèi)的集群做統(tǒng)一的限流管理,所以可能需要用到redis,而且spring cloud本身的Hystrix也不行了(因為Hystrix只是針對單機)。因此需要考慮自行實現(xiàn)限流邏輯,所以針對目前比較主流的限流 令牌桶算法及其實現(xiàn)(Guava RateLimiter)做了一個調(diào)研。

令牌桶(Token Bucket)

令牌桶算法是網(wǎng)絡(luò)流量整形(Traffic Shaping)和速率控制(Rate Limiting)中比較常用的一種算法,還有另一種漏桶算法,這里就不多展開了。一般來說,令牌桶算法可以控制網(wǎng)絡(luò)中單位時間內(nèi)的請求數(shù)目,并在一定程度上允許突發(fā)數(shù)據(jù)的產(chǎn)生。

令牌桶的原理是有一個桶來裝令牌,系統(tǒng)會以恒定的(1/QPS)時間間隔往桶里面放入令牌,如果桶已經(jīng)滿了就不能再加了,當(dāng)一個請求進來的時候,需要到桶里面獲取自己請求所需的相應(yīng)數(shù)目的令牌,如果獲取成功,則進行請求操作;若不能,則拒絕服務(wù),或者等待直到有足夠的令牌為止。


令牌桶原理

RateLimiter概述

Google的Guava庫中提供了一個基于令牌桶算法的限流工具類RateLimiter。在該類的子類SmoothRateLimiter中有一大段關(guān)于如何設(shè)計RateLimiter的描述,這里大致翻譯一下:

如何設(shè)計一個限流器,并且為什么這么設(shè)計?

限流器最主要的功能是保證一個穩(wěn)定的速率,這里穩(wěn)定速率指的是通常情況下的最高速率。這個機制通常通過控制流入的請求來保證,比如對于一個請求,當(dāng)達到最高速率時,我們需要計算它需要受限制等待的時間,并讓它等待直到有權(quán)訪問為止。

保證QPS為穩(wěn)定速率的最簡單的方式是保存上一個授權(quán)請求的時間戳,然后保證在接下來的1/QPS 秒內(nèi)沒有其他請求進入。舉例來說,對于QPS=5的需求,如果我們能保證沒有請求能夠在上個請求之后的200ms內(nèi)獲得授權(quán),那我們就實現(xiàn)了限流。如果有個請求在上個授權(quán)請求之后的100ms到來,那么我們需要做的就是讓它再等待100ms。以此類推,對于并發(fā)的15個請求,總共會花掉3秒鐘。

值得注意的一點是,這樣的機制只保存了非常少的關(guān)于過去的記憶,因為它只需要記錄最近一次請求。那么如果在一個請求獲得授權(quán)之前的很長時間都沒有請求時會發(fā)生什么呢?限流器會立馬忘記關(guān)于過去的寂寞(低利用率),而只記錄新的請求的時間戳,然后下個請求也只能在這個請求之后的1/QPS時間間隔之后才能獲得授權(quán),這顯然與我們期望的QPS不太匹配,并最終會導(dǎo)致低利用率或者請求溢出。

過去的低利用率意味著大量的資源空閑,因此,限流器應(yīng)該加速利用這些空閑資源。比較典型的場景就是在網(wǎng)絡(luò)帶寬上,低利用率意味著有多的緩存空間可以立即使用。

但是在另一方面,一段時間的低利用率也意味著“服務(wù)對于新來的請求沒有準(zhǔn)備好”。這有點隱晦,舉個列子,服務(wù)的緩存可能陳舊,請求更有可能觸發(fā)耗時操作。

為了處理這種兩難的情況,我們需要添加另外一個衡量維度,通過storedPermits來描述過去的低利用率。當(dāng)這個變量為0的時候,代表沒有低利用率存在,隨著低利用率持續(xù)增加時,storedPermits能夠到達maxStoredPermits。因此,當(dāng)獲取permits令牌發(fā)生時,拿到的令牌通常會來自兩個部分:

  • stored permits (過去留存的令牌,當(dāng)有低利用率存在就能使用)
  • fresh permits(stored permits之后還有剩余的permits,我們認(rèn)為他需要新鮮fresh的permit令牌來保證)

我們通過下面這個例子來說明:

我們有一個限流器每秒產(chǎn)生一個令牌,即保證QPS=1。如果有一秒限流器沒有請求進來,那么我們對storedPermits加1。假定在過去的10秒內(nèi)都沒有請求進來,那storedPermits就會增加至10(假定maxStoredPermits>10)。這時一個請求到來并申請獲取3個令牌,我們可以直接從storedPermits中提出來3個令牌支持,并且storedPermits減少到7。在此之后又來了一個請求申請獲取10個令牌,這時候我們從storedPermits中直接提取完剩下來的7個令牌,余下的3個令牌我們需要等待限流器放入3個fresh pertmits才能完成這個請求的授權(quán)訪問。

我們知道我們的QPS=1,所以我們需要等待3秒才能拿到新的3個fresh permits。那我們拿到7個stored permits需要多少時間呢?根據(jù)上面的討論,這個問題沒有一個標(biāo)準(zhǔn)答案。如果我們想加速處理來快速填滿過去低利用率帶來的損失,那我們肯定希望我們拿到stored permits的速度快于fresh permits,因為低利用率代表了更多空閑資源可以利用。如果我們主要的關(guān)注點在防溢出上,那stored permits的提取速度應(yīng)該要比fresh permits要慢。因此我們需要一個函數(shù)來衡量stored permits和受控制等待時間之間的關(guān)聯(lián)。這個函數(shù)就是storedPermitsToWaitTime。(后面的描述會比較復(fù)雜,這里不進行深入展開了。對于我們一般使用的SmoothBursty,這個函數(shù)恒定返回0,即立即獲取storedPermits)。

最后,讓我們考慮一個場景,有個QPS=1的限流器,當(dāng)限流器空閑時來了一個請求需要獲取100個令牌,這時候我們應(yīng)該直接等待100秒再開始處理?這樣的情況多半會使得結(jié)果毫無意義。一種更好的策略是對這個請求放行,就像獲取1個令牌一樣,然后推遲后續(xù)的請求。換句話說,我們允許立即完成對這個請求的授權(quán),然后后續(xù)的請求進來就至少得等100s的時間。這保證了請求完成的及時。

這個策略產(chǎn)生了非常重要的結(jié)果,就是限流器不會記錄最近的請求時間,而是記錄下一個請求可用的期望時間。這也保證了我們有能力判斷在一個timeout時間段內(nèi)一個請求是否能夠獲取到令牌。另一方面,根據(jù)這個期望時間,我們可以很好地判斷一個限流器的未使用時間,一旦這個期望時間在當(dāng)前時間之前,那么當(dāng)前時間與期望時間的差值就是限流器未使用的時長,而這個時長也可以轉(zhuǎn)換到stored permits上(根據(jù)前文所述storedPermits隨著空閑時間增長)

RateLimiter代碼解析

不知道前面給大家所翻譯的大家是否看著頭疼,先給大家說聲抱歉,從小語文就不太好。接下來我們從代碼著手,希望能給大家一個比較清晰的視角。

首先我們看下幾個比較關(guān)鍵的變量:

 /**
   * The currently stored permits. 目前保存下來的令牌數(shù)目
   */
  double storedPermits;

  /**
   * The maximum number of stored permits.最大的令牌保存量,即桶大小
   */
  double maxPermits;

  /**
   * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
   * per second has a stable interval of 200ms.
   兩個請求之間的間隙,也就是添加一個令牌到桶中的時間間隔。
   */
  double stableIntervalMicros;

/**
   * The time when the next request (no matter its size) will be granted. After granting a
   * request, this is pushed further in the future. Large requests push this further than small
   * requests.
   下一個請求能夠被授權(quán)的期望時間,當(dāng)一個請求被授權(quán)之后(通過acquire可以預(yù)定),這個時間會被繼續(xù)往后推,大令牌量的請求會比少量的請求推的更遠。
   */
  private long nextFreeTicketMicros = 0L; // could be either in the past or future 有可能在過去或者將來

這幾個變量跟我們之前提到的幾個概念息息相關(guān),相信大家還記得。

接下來是幾個常用的變量:

/**
   * The underlying timer; used both to measure elapsed time and sleep as necessary. A separate
   * object to facilitate testing.
   底層計時器,通過它來進行時間的計算和線程sleep
   */
  private final SleepingStopwatch stopwatch;

  // Can't be initialized in the constructor because mocks don't call the constructor. 非直接用的互斥鎖
  private volatile Object mutexDoNotUseDirectly;
  /**
  雙重判定的互斥鎖構(gòu)建過程,線程安全
  **/
  private Object mutex() {
    Object mutex = mutexDoNotUseDirectly;
    if (mutex == null) {
      synchronized (this) {
        mutex = mutexDoNotUseDirectly;
        if (mutex == null) {
          mutexDoNotUseDirectly = mutex = new Object();
        }
      }
    }
    return mutex;
  }

我們從最簡單的acquire入手:

/**
   * Acquires a single permit from this {@code RateLimiter}, blocking until the
   * request can be granted. Tells the amount of time slept, if any.
   *
   * <p>This method is equivalent to {@code acquire(1)}.
   *
   * @return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited
   * @since 16.0 (present in 13.0 with {@code void} return type})
   */
  public double acquire() {
    return acquire(1);
  }

  /**
   * Acquires the given number of permits from this {@code RateLimiter}, blocking until the
   * request can be granted. Tells the amount of time slept, if any.
   *
   * @param permits the number of permits to acquire
   * @return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited
   * @throws IllegalArgumentException if the requested number of permits is negative or zero
   * @since 16.0 (present in 13.0 with {@code void} return type})
   */
  public double acquire(int permits) {
    long microsToWait = reserve(permits);
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
  }

這兩段代碼比較簡單,第一個就是把acquire方法調(diào)用委托到acquire(1),第二個稍微復(fù)雜一點,首先調(diào)用reserve()方法得到獲取permits個令牌需要的等待時間,然后通過stopwatch直接無中斷地sleep這么長的時間,最后返回等待的時間毫秒數(shù)。那我們再深入reserve方法:

  /**
   * Reserves the given number of permits from this {@code RateLimiter} for future use, returning
   * the number of microseconds until the reservation can be consumed.
   *
   * @return time in microseconds to wait until the resource can be acquired, never negative
   */
  final long reserve(int permits) {
    checkPermits(permits);
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }

首先做一些參數(shù)檢驗,然后獲取互斥鎖,接著調(diào)用reserveAndGetWaitTime,傳入需要獲取的令牌數(shù)和當(dāng)前的毫秒數(shù)。(插句題外話,不得不服google的代碼質(zhì)量,從注釋到命名,一目了然)

   /**
   * Reserves next ticket and returns the wait time that the caller must wait for.
   *
   * @return the required wait time, never negative
   */
  final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    return max(momentAvailable - nowMicros, 0);
  }

這一段代碼通過調(diào)用reserveEarliestAvailable來得到該請求能夠獲取令牌授權(quán)的毫秒時刻,然后通過運算返回得到需要等待的毫秒數(shù),我們繼續(xù)看reserveEarliestAvailable方法:

   /**
   * Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time.
   * 這個函數(shù)功能是在每次請求調(diào)用產(chǎn)生時更新限流器的令牌數(shù)
   */
  void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    // 如果下次能授權(quán)的毫秒數(shù)在現(xiàn)在的毫秒計數(shù)之前
    // 說明這個限流器已經(jīng)有一段時間沒有使用了
    // 需要計算這段時間產(chǎn)生的stored permits
    // 否則說明這段時間限流器一直有請求進來,則不需要更新
    if (nowMicros > nextFreeTicketMicros) {
      //stored permits 最多為maxPermits,
      //大小根據(jù)這段空閑時間長度(nowMicros - nextFreeTicketMicros)確定
      storedPermits = min(maxPermits,
          storedPermits
            + (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros());
      //更新nextFreeTiecket為now
      nextFreeTicketMicros = nowMicros;
    }
  }


  /*
    直接返回放入令牌間隔,即 1 / QPS * 1000(毫秒)
  */
  @Override
  double coolDownIntervalMicros() {
      return stableIntervalMicros;
  }


 /*
  在當(dāng)前場景下,對于storedPermits,我們的策略是立即獲取,因此沒有wait time,返回0
*/
  @Override
  long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      return 0L;
  }

  @Override
  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    // 更新令牌桶
    resync(nowMicros);
    //保存nextFreeTicketMicros
    long returnValue = nextFreeTicketMicros;
    //獲取這次能夠使用的storedPermits
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    //計算需要等待獲取的fresh permits
    double freshPermits = requiredPermits - storedPermitsToSpend;
    //總的等待時間等于storedPermits的等待時間加上fresh permit的等待時間
    //fresh的等待時間就是放入令牌的間隔*fresh permits數(shù)目
    long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
        + (long) (freshPermits * stableIntervalMicros);
  
    // 增加nextFreeTicketMicros, 這里支持預(yù)定
    try {
      this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros);
    } catch (ArithmeticException e) {
      this.nextFreeTicketMicros = Long.MAX_VALUE;
    }
    //更新stored permits
    this.storedPermits -= storedPermitsToSpend;
    //因為支持預(yù)定,所以返回的是這些計算之前nextFreeTicketMicros作為需要wait的時間
    //而不是計算后的
    return returnValue;
  }

這段代碼的講解我大部分已經(jīng)寫在代碼注釋里面了,需要說明的是,我最開始一直在想按照令牌桶算法的描述,應(yīng)該有一個定時插入令牌的過程,但是我看了下確實沒有多的線程同步機制來做這個事兒,原來Guava中采用了觸發(fā)式的更新令牌桶機制。原理就是在每次請求到來的時候去完成令牌桶中令牌插入工作和其他屬性如nextFreeTicketMicros的更新工作,這樣減少了線程使用, 節(jié)約了資源,并且也簡化了操作。這個功能在resync函數(shù)代碼中完成。需要值得注意的是,因為Guava的實現(xiàn)支持令牌預(yù)定功能,即當(dāng)限流器當(dāng)前處于空閑狀態(tài)時,一個大量令牌請求進來的時候,可以提前預(yù)授權(quán)給他足夠的令牌讓它能夠立即執(zhí)行,并推遲后續(xù)請求的等待時間(如之前所述),因此才會出現(xiàn)nowMicros < nextFreeTicketMicro的情況,而這種情況就說明當(dāng)前仍處于對于之前一個請求的預(yù)授權(quán)階段,不需要更新storedPermits,否則就還是nowMicros >= nextFreeTicketMicro的情況。

看完了acquire的流程,我們再來看tryAcquire的代碼:

 /**
   * Acquires a permit from this {@code RateLimiter} if it can be obtained
   * without exceeding the specified {@code timeout}, or returns {@code false}
   * immediately (without waiting) if the permit would not have been granted
   * before the timeout expired.
   *
   * <p>This method is equivalent to {@code tryAcquire(1, timeout, unit)}.
   *
   * @param timeout the maximum time to wait for the permit. Negative values are treated as zero.
   * @param unit the time unit of the timeout argument
   * @return {@code true} if the permit was acquired, {@code false} otherwise
   * @throws IllegalArgumentException if the requested number of permits is negative or zero
   */
  public boolean tryAcquire(long timeout, TimeUnit unit) {
    return tryAcquire(1, timeout, unit);
  }

  /**
   * Acquires permits from this {@link RateLimiter} if it can be acquired immediately without delay.
   *
   * <p>
   * This method is equivalent to {@code tryAcquire(permits, 0, anyUnit)}.
   *
   * @param permits the number of permits to acquire
   * @return {@code true} if the permits were acquired, {@code false} otherwise
   * @throws IllegalArgumentException if the requested number of permits is negative or zero
   * @since 14.0
   */
  public boolean tryAcquire(int permits) {
    return tryAcquire(permits, 0, MICROSECONDS);
  }

  /**
   * Acquires a permit from this {@link RateLimiter} if it can be acquired immediately without
   * delay.
   *
   * <p>
   * This method is equivalent to {@code tryAcquire(1)}.
   *
   * @return {@code true} if the permit was acquired, {@code false} otherwise
   * @since 14.0
   */
  public boolean tryAcquire() {
    return tryAcquire(1, 0, MICROSECONDS);
  }

這還是很簡單。。層層委托。我們來看最后這個tryAcquire

/**
   * Acquires the given number of permits from this {@code RateLimiter} if it can be obtained
   * without exceeding the specified {@code timeout}, or returns {@code false}
   * immediately (without waiting) if the permits would not have been granted
   * before the timeout expired.
   *
   * @param permits the number of permits to acquire
   * @param timeout the maximum time to wait for the permits. Negative values are treated as zero.
   * @param unit the time unit of the timeout argument
   * @return {@code true} if the permits were acquired, {@code false} otherwise
   * @throws IllegalArgumentException if the requested number of permits is negative or zero
   */
  public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
    long timeoutMicros = max(unit.toMicros(timeout), 0);
    checkPermits(permits);
    long microsToWait;
    //獲取互斥鎖
    synchronized (mutex()) {
      //獲取當(dāng)前時間
      long nowMicros = stopwatch.readMicros();
      //判斷是否能夠在timeout時間內(nèi)能夠獲取
      if (!canAcquire(nowMicros, timeoutMicros)) {
        return false;
      } else {
       //如果判斷能夠獲取,則調(diào)用reserveAndGetWaitLength獲取等待時間
       //其實就是走了一遍acquire
        microsToWait = reserveAndGetWaitLength(permits, nowMicros);
      }
    }
    // sleep直到能獲取令牌
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return true;
  }

通過上述分析,我們知道主要邏輯在canAcquire方法內(nèi):

private boolean canAcquire(long nowMicros, long timeoutMicros) {
    return queryEarliestAvailable(nowMicros) - timeoutMicros <= nowMicros;
}

通過調(diào)用queryEarliestAvailable得到最近的令牌可用時間,然后看這個時間與now的差值是否小于timeout,如果小于則表示這個timeout內(nèi)可以獲取到令牌,返回true,否則返回false

  @Override
  final long queryEarliestAvailable(long nowMicros) {
    return nextFreeTicketMicros;
  }

在SmoothBursty實現(xiàn)中,queryEarliestAvailable的實現(xiàn)直接返回nextFreeTicketMicros,這個也很清晰,nextFreeTicketMicros本來的意義就是最近的令牌可用時間。

小結(jié)

以上就是針對Guava RateLimiter的代碼和限流邏輯的一個整體梳理,主要是針對SmoothBursty的實現(xiàn)來做的一個分析。希望大家能夠喜歡,后續(xù)可能需要考慮針對多機做一個類似的機制。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容