用 Go + Redis 實(shí)現(xiàn)分布式鎖

  為什么需要分布式鎖

  用戶下單

  鎖住 uid,防止重復(fù)下單。

  庫存扣減

  鎖住庫存,防止超賣。

  余額扣減

  鎖住賬戶,防止并發(fā)操作。 分布式系統(tǒng)中共享同一個(gè)資源時(shí)往往需要分布式鎖來保證變更資源一致性。

  分布式鎖需要具備特性

  排他性

  鎖的基本特性,并且只能被第一個(gè)持有者持有。

  防死鎖

  高并發(fā)場(chǎng)景下臨界資源一旦發(fā)生死鎖非常難以排查,通??梢酝ㄟ^設(shè)置超時(shí)時(shí)間到期自動(dòng)釋放鎖來規(guī)避。

  可重入

  鎖持有者支持可重入,防止鎖持有者再次重入時(shí)鎖被超時(shí)釋放。

  高性能高可用

  鎖是代碼運(yùn)行的關(guān)鍵前置節(jié)點(diǎn),一旦不可用則業(yè)務(wù)直接就報(bào)故障了。高并發(fā)場(chǎng)景下,高性能高可用是基本要求。

  實(shí)現(xiàn) Redis 鎖應(yīng)先掌握哪些知識(shí)點(diǎn)

  set 命令

  > SET key value [EX seconds] [PX milliseconds] [NX|XX]

  EX second :設(shè)置鍵的過期時(shí)間為 second 秒。 SET key value EX second 效果等同于 SETEX key second value 。

  PX millisecond :設(shè)置鍵的過期時(shí)間為 millisecond 毫秒。 SET key value PX millisecond 效果等同于 PSETEX key millisecond value 。

  NX :只在鍵不存在時(shí),才對(duì)鍵進(jìn)行設(shè)置操作。 SET key value NX 效果等同于 SETNX key value 。

  XX :只在鍵已經(jīng)存在時(shí),才對(duì)鍵進(jìn)行設(shè)置操作。

  Redis.lua 腳本

  使用 redis lua 腳本能將一系列命令操作封裝成 pipline 實(shí)現(xiàn)整體操作的原子性。

  go-zero 分布式鎖 RedisLock 源碼分析

  core/stores/redis/redislock.go

  加鎖流程

  -- KEYS[1]: 鎖key

  -- ARGV[1]: 鎖value,隨機(jī)字符串

  -- ARGV[2]: 過期時(shí)間

  -- 判斷鎖key持有的value是否等于傳入的value

  -- 如果相等說明是再次獲取鎖并更新獲取時(shí)間,防止重入時(shí)過期

  -- 這里說明是“可重入鎖”

  if redis.call("GET", KEYS[1]) == ARGV[1] then

  -- 設(shè)置

  redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])

  return "OK"

  else

  -- 鎖key.value不等于傳入的value則說明是第一次獲取鎖

  -- SET key value NX PX timeout : 當(dāng)key不存在時(shí)才設(shè)置key的值

  -- 設(shè)置成功會(huì)自動(dòng)返回“OK”,設(shè)置失敗返回“NULL Bulk Reply”

  -- 為什么這里要加“NX”呢,因?yàn)樾枰乐拱褎e人的鎖給覆蓋了

  return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])

  end

  解鎖流程

  -- 釋放鎖

  -- 不可以釋放別人的鎖

  if redis.call("GET", KEYS[1]) == ARGV[1] then

  -- 執(zhí)行成功返回“1”

  return redis.call("DEL", KEYS[1])

  else

  return 0

  end

  源碼解析

  package redis

  import (

  "math/rand"

  "strconv"

  "sync/atomic"

  "time"

  red "github.com/go-redis/redis"

  "github.com/tal-tech/go-zero/core/logx"

  )

  const (

  letters? ? = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

  lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then

  redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])

  return "OK"

  else

  return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])

  end`

  delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then

  return redis.call("DEL", KEYS[1])

  else

  return 0

  end`

  randomLen = 16

  // 默認(rèn)超時(shí)時(shí)間,防止死鎖

  tolerance? ? ? = 500 // milliseconds

  millisPerSecond = 1000

  )

  // A RedisLock is a redis lock.

  type RedisLock struct {

  // redis客戶端

  store *Redis

  // 超時(shí)時(shí)間

  seconds uint32

  // 鎖key

  key string

  // 鎖value,防止鎖被別人獲取到

  id string

  }

  func init() {

  rand.Seed(time.Now().UnixNano())

  }

  // NewRedisLock returns a RedisLock.

  func NewRedisLock(store *Redis, key string) *RedisLock {

  return &RedisLock{

  store: store,

  key:? key,

  // 獲取鎖時(shí),鎖的值通過隨機(jī)字符串生成

  // 實(shí)際上go-zero提供更加高效的隨機(jī)字符串生成方式

  // 見core/stringx/random.go:Randn

  id:? ? randomStr(randomLen),

  }

  }

  // Acquire acquires the lock.

  // 加鎖

  func (rl *RedisLock) Acquire() (bool, error) {

  // 獲取過期時(shí)間

  seconds := atomic.LoadUint32(&rl.seconds)

  // 默認(rèn)鎖過期時(shí)間為500ms,防止死鎖

  resp, err := rl.store.Eval(lockCommand, []string{rl.key}, []string{

  rl.id, strconv.Itoa(int(seconds)*millisPerSecond + tolerance),

  })

  if err == red.Nil {

  return false, nil

  } else if err != nil {

  logx.Errorf("Error on acquiring lock for %s, %s", rl.key, err.Error())

  return false, err

  } else if resp == nil {

  return false, nil

  }

  reply, ok := resp.(string)

  if ok && reply == "OK" {

  return true, nil

  }

  logx.Errorf("Unknown reply when acquiring lock for %s: %v", rl.key, resp)

  return false, nil

  }

  // Release releases the lock.

  // 釋放鎖

  func (rl *RedisLock) Release() (bool, error) {

  resp, err := rl.store.Eval(delCommand, []string{rl.key}, []string{rl.id})

  if err != nil {

  return false, err

  }

  reply, ok := resp.(int64)

  if !ok {

  return false, nil

  }

  return reply == 1, nil

  }

  // SetExpire sets the expire.

  // 需要注意的是需要在Acquire()之前調(diào)用

  // 不然默認(rèn)為500ms自動(dòng)釋放

  func (rl *RedisLock) SetExpire(seconds int) {

  atomic.StoreUint32(&rl.seconds, uint32(seconds))

  }

  func randomStr(n int) string {

  b := make([]byte, n)

  for i := range b {

  b[i] = letters[rand.Intn(len(letters))]

  }

  return string(b)

  }

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容