本專題所寫所感所得,來自轉(zhuǎn)轉(zhuǎn)首席架構(gòu)師和字節(jié)架構(gòu)團(tuán)隊(duì),此致,敬禮。。
一、冪等設(shè)計(jì)
1.1 定義
冪等需要考慮請(qǐng)求層面和業(yè)務(wù)層面的冪等。
- 請(qǐng)求層面
保證請(qǐng)求重復(fù)執(zhí)行和執(zhí)行一次結(jié)果相同;即f...f(f(x)) = f(x)
- 業(yè)務(wù)層面
如同一用戶不重復(fù)下單;商品不超買
1.2 目標(biāo)
- 請(qǐng)求重試不出問題
- 避免結(jié)果災(zāi)難性(重復(fù)轉(zhuǎn)賬、多交易等)
1.3 冪等范圍
冪等范圍主要是針對(duì)請(qǐng)求對(duì)數(shù)據(jù)造成改變。以下從兩個(gè)維度判斷冪等范圍。
- 讀/寫請(qǐng)求層面:寫請(qǐng)求
-
架構(gòu)層層面:數(shù)據(jù)訪問層
二、分布式鎖設(shè)計(jì)
業(yè)務(wù)層面的冪等存在并發(fā)消費(fèi)的可能性,需要轉(zhuǎn)化為串行消費(fèi)。本質(zhì)上就是分布式鎖的問題。
2.1 定義與目的
分布式鎖是在分布式環(huán)境下,鎖定全局唯一資源,使得請(qǐng)求處理串行化,實(shí)現(xiàn)類似于互斥鎖的效果。
分布式鎖的目的是:
- 防止重復(fù)下單,解決業(yè)務(wù)層冪等問題
- 解決MQ消息消費(fèi)冪等性問題,如發(fā)送消息重復(fù)、消息消費(fèi)端去重等
- 狀態(tài)的修改行為需要串行處理,避免出現(xiàn)數(shù)據(jù)錯(cuò)亂
2.2 高可用分布式鎖設(shè)計(jì)
2.2.1 目標(biāo)
- 強(qiáng)一致性
- 服務(wù)高可用、系統(tǒng)穩(wěn)健
- 鎖自動(dòng)續(xù)約及其自動(dòng)釋放
- 代碼高度抽象業(yè)務(wù)接入極簡(jiǎn)
- 可視化管理后臺(tái),監(jiān)控及管理
2.2.2 特點(diǎn)
- 互斥性:和我們本地鎖一樣互斥性是最基本,但是分布式鎖需要保證在不同節(jié)點(diǎn)的不同線程的互斥。
- 可重入性:同一個(gè)節(jié)點(diǎn)上的同一個(gè)線程如果獲取了鎖之后那么也可以再次獲取這個(gè)鎖。
- 鎖超時(shí):和本地鎖一樣支持鎖超時(shí),防止死鎖。
- 高效,高可用:加鎖和解鎖需要高效,同時(shí)也需要保證高可用防止分布式鎖失效,可以增加降級(jí)。
- 支持阻塞和非阻塞:和ReentrantLock一樣支持lock和trylock以及tryLock(long timeOut)。
- 支持公平鎖和非公平鎖(可選):公平鎖的意思是按照請(qǐng)求加鎖的順序獲得鎖,非公平鎖就相反是無序的。這個(gè)一般來說實(shí)現(xiàn)的比較少。
2.2.3 方案對(duì)比
| mysql | redis | zookeeper | etcd | |
|---|---|---|---|---|
| 一致性算法 | 無 | 無 | paxos | raft |
| CAP | cp | ap | cp | cp |
| 高可用 | 主從 | 主從 | N+1可用(奇數(shù)) | N+1可用(奇數(shù)) |
| 接口類型 | sql | 客戶端 | 客戶端 | http/grpc |
| 實(shí)現(xiàn) | select * from update | setnx + lua | crateephemeral | restful api |
- redis是ap模型,無法保證數(shù)據(jù)一致性
- zk對(duì)鎖實(shí)現(xiàn)使用創(chuàng)建臨時(shí)節(jié)點(diǎn)和watch機(jī)制,執(zhí)行效率、拓展能力、社區(qū)活躍度都不如etcd
2.3 Mysql分布式鎖
首先來說一下Mysql分布式鎖的實(shí)現(xiàn)原理,相對(duì)來說這個(gè)比較容易理解,畢竟數(shù)據(jù)庫和我們開發(fā)人員在平時(shí)的開發(fā)中息息相關(guān)。對(duì)于分布式鎖我們可以創(chuàng)建一個(gè)鎖表:

前面我們所說的lock(),trylock(long timeout),trylock()這幾個(gè)方法可以用下面的偽代碼實(shí)現(xiàn)。
2.3.1 lock()
lock一般是阻塞式的獲取鎖,意思就是不獲取到鎖誓不罷休,那么我們可以寫一個(gè)死循環(huán)來執(zhí)行其操作:

mysqlLock.lcok內(nèi)部是一個(gè)sql,為了達(dá)到可重入鎖的效果那么我們應(yīng)該先進(jìn)行查詢,如果有值,那么需要比較node_info是否一致,這里的node_info可以用機(jī)器IP和線程名字來表示,如果一致那么就加可重入鎖count的值,如果不一致那么就返回false。如果沒有值那么直接插入一條數(shù)據(jù)。偽代碼如下:

需要注意的是這一段代碼需要加事務(wù),必須要保證這一系列操作的原子性。
2.3.2 tryLock()和tryLock(long timeout)
tryLock()是非阻塞獲取鎖,如果獲取不到那么就會(huì)馬上返回,代碼可以如下:

tryLock(long timeout)實(shí)現(xiàn)如下:

mysqlLock.lock和上面一樣,但是要注意的是select ... for update這個(gè)是阻塞的獲取行鎖,如果同一個(gè)資源并發(fā)量較大還是有可能會(huì)退化成阻塞的獲取鎖。
2.3.3 unlock()
unlock的話如果這里的count為1那么可以刪除,如果大于1那么需要減去1。

2.3.4 鎖超時(shí)
我們有可能會(huì)遇到我們的機(jī)器節(jié)點(diǎn)掛了,那么這個(gè)鎖就不會(huì)得到釋放,我們可以啟動(dòng)一個(gè)定時(shí)任務(wù),通過計(jì)算一般我們處理任務(wù)的一般的時(shí)間,比如是5ms,那么我們可以稍微擴(kuò)大一點(diǎn),當(dāng)這個(gè)鎖超過20ms沒有被釋放我們就可以認(rèn)定是節(jié)點(diǎn)掛了然后將其直接釋放。
2.3.5 Mysql小結(jié)
- 適用場(chǎng)景: Mysql分布式鎖一般適用于資源不存在數(shù)據(jù)庫,如果數(shù)據(jù)庫存在比如訂單,那么可以直接對(duì)這條數(shù)據(jù)加行鎖,不需要我們上面多的繁瑣的步驟,比如一個(gè)訂單,那么我們可以用select * from order_table where id = 'xxx' for update進(jìn)行加行鎖,那么其他的事務(wù)就不能對(duì)其進(jìn)行修改。
- 優(yōu)點(diǎn):理解起來簡(jiǎn)單,不需要維護(hù)額外的第三方中間件(比如Redis,Zk)。
- 缺點(diǎn):雖然容易理解但是實(shí)現(xiàn)起來較為繁瑣,需要自己考慮鎖超時(shí),加事務(wù)等等。性能局限于數(shù)據(jù)庫,一般對(duì)比緩存來說性能較低。對(duì)于高并發(fā)的場(chǎng)景并不是很適合。
2.3.6 樂觀鎖
前面我們介紹的都是悲觀鎖,這里想額外提一下樂觀鎖,在我們實(shí)際項(xiàng)目中也是經(jīng)常實(shí)現(xiàn)樂觀鎖,因?yàn)槲覀兗有墟i的性能消耗比較大,通常我們會(huì)對(duì)于一些競(jìng)爭(zhēng)不是那么激烈,但是其又需要保證我們并發(fā)的順序執(zhí)行使用樂觀鎖進(jìn)行處理,我們可以對(duì)我們的表加一個(gè)版本號(hào)字段,那么我們查詢出來一個(gè)版本號(hào)之后,update或者delete的時(shí)候需要依賴我們查詢出來的版本號(hào),判斷當(dāng)前數(shù)據(jù)庫和查詢出來的版本號(hào)是否相等,如果相等那么就可以執(zhí)行,如果不等那么就不能執(zhí)行。這樣的一個(gè)策略很像我們的CAS(Compare And Swap),比較并交換是一個(gè)原子操作。這樣我們就能避免加select * for update行鎖的開銷。
2.4 基于redis分布式鎖
redis是單線程的,所以能保證線程串行處理,但因?yàn)閞edis分布式鎖是ap模型,不是cp模型,無法實(shí)現(xiàn)強(qiáng)一致性。但因?qū)崿F(xiàn)簡(jiǎn)單,接入成本低,如果對(duì)數(shù)據(jù)一致性要求不那么高,可以選擇此方式。
2.4.1 Redis分布式鎖簡(jiǎn)單實(shí)現(xiàn)
熟悉Redis的同學(xué)那么肯定對(duì)setNx(set if not exist)方法不陌生,如果不存在則更新,其可以很好的用來實(shí)現(xiàn)我們的分布式鎖。對(duì)于某個(gè)資源加鎖我們只需要
setNx resourceName value
這里有個(gè)問題,加鎖了之后如果機(jī)器宕機(jī)那么這個(gè)鎖就不會(huì)得到釋放所以會(huì)加入過期時(shí)間,加入過期時(shí)間需要和setNx同一個(gè)原子操作,在Redis2.8之前我們需要使用Lua腳本達(dá)到我們的目的,但是redis2.8之后redis支持nx和ex操作是同一原子操作。
set resourceName value ex 5 nx
2.4.2 Redission
Javaer都知道Jedis,Jedis是Redis的Java實(shí)現(xiàn)的客戶端,其API提供了比較全面的Redis命令的支持。Redission也是Redis的客戶端,相比于Jedis功能簡(jiǎn)單。Jedis簡(jiǎn)單使用阻塞的I/O和redis交互,Redission通過Netty支持非阻塞I/O。Jedis最新版本2.9.0是2016年的快3年了沒有更新,而Redission最新版本是2018.10月更新。
Redission封裝了鎖的實(shí)現(xiàn),其繼承了java.util.concurrent.locks.Lock的接口,讓我們像操作我們的本地Lock一樣去操作Redission的Lock,下面介紹一下其如何實(shí)現(xiàn)分布式鎖。

Redission不僅提供了Java自帶的一些方法(lock,tryLock),還提供了異步加鎖,對(duì)于異步編程更加方便。 由于內(nèi)部源碼較多,就不貼源碼了,這里用文字?jǐn)⑹鰜矸治鏊侨绾渭渔i的,這里分析一下tryLock方法:
-
嘗試加鎖:首先會(huì)嘗試進(jìn)行加鎖,由于保證操作是原子性,那么就只能使用lua腳本,相關(guān)的lua腳本如下:
可以看見他并沒有使用我們的sexNx來進(jìn)行操作,而是使用的hash結(jié)構(gòu),我們的每一個(gè)需要鎖定的資源都可以看做是一個(gè)HashMap,鎖定資源的節(jié)點(diǎn)信息是Key,鎖定次數(shù)是value。通過這種方式可以很好的實(shí)現(xiàn)可重入的效果,只需要對(duì)value進(jìn)行加1操作,就能進(jìn)行可重入鎖。當(dāng)然這里也可以用之前我們說的本地計(jì)數(shù)進(jìn)行優(yōu)化。
- 如果嘗試加鎖失敗,判斷是否超時(shí),如果超時(shí)則返回false。
- 如果加鎖失敗之后,沒有超時(shí),那么需要在名字為redisson_lock__channel+lockName的channel上進(jìn)行訂閱,用于訂閱解鎖消息,然后一直阻塞直到超時(shí),或者有解鎖消息。
- 重試步驟以上三步,直到最后獲取到鎖,或者某一步獲取鎖超時(shí)。
對(duì)于我們的unlock方法比較簡(jiǎn)單也是通過lua腳本進(jìn)行解鎖,如果是可重入鎖,只是減1。如果是非加鎖線程解鎖,那么解鎖失敗。

Redission還有公平鎖的實(shí)現(xiàn),對(duì)于公平鎖其利用了list結(jié)構(gòu)和hashset結(jié)構(gòu)分別用來保存我們排隊(duì)的節(jié)點(diǎn),和我們節(jié)點(diǎn)的過期時(shí)間,用這兩個(gè)數(shù)據(jù)結(jié)構(gòu)幫助我們實(shí)現(xiàn)公平鎖,這里就不展開介紹了,有興趣可以參考源碼。
2.4.3 RedLock
我們想象一個(gè)這樣的場(chǎng)景當(dāng)機(jī)器A申請(qǐng)到一把鎖之后,如果Redis主宕機(jī)了,這個(gè)時(shí)候從機(jī)并沒有同步到這一把鎖,那么機(jī)器B再次申請(qǐng)的時(shí)候就會(huì)再次申請(qǐng)到這把鎖,為了解決這個(gè)問題Redis作者提出了RedLock紅鎖的算法,在Redission中也對(duì)RedLock進(jìn)行了實(shí)現(xiàn)。

通過上面的代碼,我們需要實(shí)現(xiàn)多個(gè)Redis集群,然后進(jìn)行紅鎖的加鎖,解鎖。具體的步驟如下:
- 首先生成多個(gè)Redis集群的Rlock,并將其構(gòu)造成RedLock。
- 依次循環(huán)對(duì)三個(gè)集群進(jìn)行加鎖,加鎖的過程和5.2里面一致。
- 如果循環(huán)加鎖的過程中加鎖失敗,那么需要判斷加鎖失敗的次數(shù)是否超出了最大值,這里的最大值是根據(jù)集群的個(gè)數(shù),比如三個(gè)那么只允許失敗一個(gè),五個(gè)的話只允許失敗兩個(gè),要保證多數(shù)成功。
- 加鎖的過程中需要判斷是否加鎖超時(shí),有可能我們?cè)O(shè)置加鎖只能用3ms,第一個(gè)集群加鎖已經(jīng)消耗了3ms了。那么也算加鎖失敗。
- 3,4步里面加鎖失敗的話,那么就會(huì)進(jìn)行解鎖操作,解鎖會(huì)對(duì)所有的集群在請(qǐng)求一次解鎖。
可以看見RedLock基本原理是利用多個(gè)Redis集群,用多數(shù)的集群加鎖成功,減少Redis某個(gè)集群出故障,造成分布式鎖出現(xiàn)問題的概率。
2.4.4 Redis小結(jié)
- 優(yōu)點(diǎn):對(duì)于Redis實(shí)現(xiàn)簡(jiǎn)單,性能對(duì)比ZK和Mysql較好。如果不需要特別復(fù)雜的要求,那么自己就可以利用setNx進(jìn)行實(shí)現(xiàn),如果自己需要復(fù)雜的需求的話那么可以利用或者借鑒Redission。對(duì)于一些要求比較嚴(yán)格的場(chǎng)景來說的話可以使用RedLock。
- 缺點(diǎn):需要維護(hù)Redis集群,如果要實(shí)現(xiàn)RedLock那么需要維護(hù)更多的集群。
2.5 基于ZK分布式鎖
ZooKeeper也是我們常見的實(shí)現(xiàn)分布式鎖方法,相比于數(shù)據(jù)庫如果沒了解過ZooKeeper可能上手比較難一些。ZooKeeper是以Paxos算法為基礎(chǔ)分布式應(yīng)用程序協(xié)調(diào)服務(wù)。Zk的數(shù)據(jù)節(jié)點(diǎn)和文件目錄類似,所以我們可以用此特性實(shí)現(xiàn)分布式鎖。我們以某個(gè)資源為目錄,然后這個(gè)目錄下面的節(jié)點(diǎn)就是我們需要獲取鎖的客戶端,未獲取到鎖的客戶端注冊(cè)需要注冊(cè)Watcher到上一個(gè)客戶端,可以用下圖表示。

/lock是我們用于加鎖的目錄,/resource_name是我們鎖定的資源,其下面的節(jié)點(diǎn)按照我們加鎖的順序排列。
2.5.1 Curator
Curator封裝了Zookeeper底層的Api,使我們更加容易方便的對(duì)Zookeeper進(jìn)行操作,并且它封裝了分布式鎖的功能,這樣我們就不需要再自己實(shí)現(xiàn)了。
Curator實(shí)現(xiàn)了可重入鎖(InterProcessMutex),也實(shí)現(xiàn)了不可重入鎖(InterProcessSemaphoreMutex)。在可重入鎖中還實(shí)現(xiàn)了讀寫鎖。
2.5.2 InterProcessMutex
InterProcessMutex是Curator實(shí)現(xiàn)的可重入鎖,我們可以通過下面的一段代碼實(shí)現(xiàn)我們的可重入鎖:

我們利用acuire進(jìn)行加鎖,release進(jìn)行解鎖。
加鎖的流程具體如下:
- 首先進(jìn)行可重入的判定:這里的可重入鎖記錄在ConcurrentMap<Thread, LockData> threadData這個(gè)Map里面,如果threadData.get(currentThread)是有值的那么就證明是可重入鎖,然后記錄就會(huì)加1。我們之前的Mysql其實(shí)也可以通過這種方法去優(yōu)化,可以不需要count字段的值,將這個(gè)維護(hù)在本地可以提高性能。
- 然后在我們的資源目錄下創(chuàng)建一個(gè)節(jié)點(diǎn):比如這里創(chuàng)建一個(gè)/0000000002這個(gè)節(jié)點(diǎn),這個(gè)節(jié)點(diǎn)需要設(shè)置為EPHEMERAL_SEQUENTIAL也就是臨時(shí)節(jié)點(diǎn)并且有序。
- 獲取當(dāng)前目錄下所有子節(jié)點(diǎn),判斷自己的節(jié)點(diǎn)是否位于子節(jié)點(diǎn)第一個(gè)。
- 如果是第一個(gè),則獲取到鎖,那么可以返回。
- 如果不是第一個(gè),則證明前面已經(jīng)有人獲取到鎖了,那么需要獲取自己節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)。/0000000002的前一個(gè)節(jié)點(diǎn)是/0000000001,我們獲取到這個(gè)節(jié)點(diǎn)之后,再上面注冊(cè)Watcher(這里的watcher其實(shí)調(diào)用的是object.notifyAll(),用來解除阻塞)。
- object.wait(timeout)或object.wait():進(jìn)行阻塞等待這里和我們第5步的watcher相對(duì)應(yīng)。
解鎖的具體流程:
- 首先進(jìn)行可重入鎖的判定:如果有可重入鎖只需要次數(shù)減1即可,減1之后加鎖次數(shù)為0的話繼續(xù)下面步驟,不為0直接返回。
- 刪除當(dāng)前節(jié)點(diǎn)。
- 刪除threadDataMap里面的可重入鎖的數(shù)據(jù)。
2.5.3 讀寫鎖
Curator提供了讀寫鎖,其實(shí)現(xiàn)類是InterProcessReadWriteLock,這里的每個(gè)節(jié)點(diǎn)都會(huì)加上前綴:
private static final String READ_LOCK_NAME = "__READ__";
private static final String WRITE_LOCK_NAME = "__WRIT__";
根據(jù)不同的前綴區(qū)分是讀鎖還是寫鎖,對(duì)于讀鎖,如果發(fā)現(xiàn)前面有寫鎖,那么需要將watcher注冊(cè)到和自己最近的寫鎖。寫鎖的邏輯和我們之前2.5.2分析的依然保持不變。
2.5.4鎖超時(shí)
Zookeeper不需要配置鎖超時(shí),由于我們?cè)O(shè)置節(jié)點(diǎn)是臨時(shí)節(jié)點(diǎn),我們的每個(gè)機(jī)器維護(hù)著一個(gè)ZK的session,通過這個(gè)session,ZK可以判斷機(jī)器是否宕機(jī)。如果我們的機(jī)器掛掉的話,那么這個(gè)臨時(shí)節(jié)點(diǎn)對(duì)應(yīng)的就會(huì)被刪除,所以我們不需要關(guān)心鎖超時(shí)。
2.5.5 ZK小結(jié)
- 優(yōu)點(diǎn):ZK可以不需要關(guān)心鎖超時(shí)時(shí)間,實(shí)現(xiàn)起來有現(xiàn)成的第三方包,比較方便,并且支持讀寫鎖,ZK獲取鎖會(huì)按照加鎖的順序,所以其是公平鎖。對(duì)于高可用利用ZK集群進(jìn)行保證。
- 缺點(diǎn):ZK需要額外維護(hù),增加維護(hù)成本,性能和Mysql相差不大,依然比較差。并且需要開發(fā)人員了解ZK是什么。
2.6 基于etcd分布式鎖
2.6.1 機(jī)制
etcd 支持以下功能,正是依賴這些功能來實(shí)現(xiàn)分布式鎖的:
- Lease 機(jī)制:即租約機(jī)制(TTL,Time To Live),Etcd 可以為存儲(chǔ)的 KV 對(duì)設(shè)置租約,當(dāng)租約到期,KV 將失效刪除;同時(shí)也支持續(xù)約,即 KeepAlive。
- Revision 機(jī)制:每個(gè) key 帶有一個(gè) Revision 屬性值,etcd 每進(jìn)行一次事務(wù)對(duì)應(yīng)的全局 Revision 值都會(huì)加一,因此每個(gè) key 對(duì)應(yīng)的 Revision 屬性值都是全局唯一的。通過比較 Revision 的大小就可以知道進(jìn)行寫操作的順序。
- 在實(shí)現(xiàn)分布式鎖時(shí),多個(gè)程序同時(shí)搶鎖,根據(jù) Revision 值大小依次獲得鎖,可以避免 “羊群效應(yīng)” (也稱 “驚群效應(yīng)”),實(shí)現(xiàn)公平鎖。
- Prefix 機(jī)制:即前綴機(jī)制,也稱目錄機(jī)制??梢愿鶕?jù)前綴(目錄)獲取該目錄下所有的 key 及對(duì)應(yīng)的屬性(包括 key, value 以及 revision 等)。
- Watch 機(jī)制:即監(jiān)聽機(jī)制,Watch 機(jī)制支持 Watch 某個(gè)固定的 key,也支持 Watch 一個(gè)目錄(前綴機(jī)制),當(dāng)被 Watch 的 key 或目錄發(fā)生變化,客戶端將收到通知。
2.6.2 過程
實(shí)現(xiàn)過程:
- 步驟 1: 準(zhǔn)備
客戶端連接 Etcd,以 /lock/mylock 為前綴創(chuàng)建全局唯一的 key,假設(shè)第一個(gè)客戶端對(duì)應(yīng)的 key="/lock/mylock/UUID1",第二個(gè)為 key="/lock/mylock/UUID2";客戶端分別為自己的 key 創(chuàng)建租約 - Lease,租約的長度根據(jù)業(yè)務(wù)耗時(shí)確定,假設(shè)為 15s;
- 步驟 2: 創(chuàng)建定時(shí)任務(wù)作為租約的“心跳”
當(dāng)一個(gè)客戶端持有鎖期間,其它客戶端只能等待,為了避免等待期間租約失效,客戶端需創(chuàng)建一個(gè)定時(shí)任務(wù)作為“心跳”進(jìn)行續(xù)約。此外,如果持有鎖期間客戶端崩潰,心跳停止,key 將因租約到期而被刪除,從而鎖釋放,避免死鎖。
- 步驟 3: 客戶端將自己全局唯一的 key 寫入 Etcd
進(jìn)行 put 操作,將步驟 1 中創(chuàng)建的 key 綁定租約寫入 Etcd,根據(jù) Etcd 的 Revision 機(jī)制,假設(shè)兩個(gè)客戶端 put 操作返回的 Revision 分別為 1、2,客戶端需記錄 Revision 用以接下來判斷自己是否獲得鎖。
- 步驟 4: 客戶端判斷是否獲得鎖
客戶端以前綴 /lock/mylock 讀取 keyValue 列表(keyValue 中帶有 key 對(duì)應(yīng)的 Revision),判斷自己 key 的 Revision 是否為當(dāng)前列表中最小的,如果是則認(rèn)為獲得鎖;否則監(jiān)聽列表中前一個(gè) Revision 比自己小的 key 的刪除事件,一旦監(jiān)聽到刪除事件或者因租約失效而刪除的事件,則自己獲得鎖。
- 步驟 5: 執(zhí)行業(yè)務(wù)
獲得鎖后,操作共享資源,執(zhí)行業(yè)務(wù)代碼。
- 步驟 6: 釋放鎖
完成業(yè)務(wù)流程后,刪除對(duì)應(yīng)的key釋放鎖。
2.6.3 實(shí)現(xiàn)
自帶的 etcdctl 可以模擬鎖的使用:
// 第一個(gè)終端
$ ./etcdctl lock mutex1
mutex1/326963a02758b52d
// 第二終端
$ ./etcdctl lock mutex1
// 當(dāng)?shù)谝粋€(gè)終端結(jié)束了,第二個(gè)終端會(huì)顯示
mutex1/326963a02758b531
在etcd的clientv3包中,實(shí)現(xiàn)了分布式鎖。使用起來和mutex是類似的,為了了解其中的工作機(jī)制,這里簡(jiǎn)要的做一下總結(jié)。
etcd分布式鎖的實(shí)現(xiàn)在go.etcd.io/etcd/clientv3/concurrency包中,主要提供了以下幾個(gè)方法:
* func NewMutex(s *Session, pfx string) *Mutex, 用來新建一個(gè)mutex
* func (m *Mutex) Lock(ctx context.Context) error,它會(huì)阻塞直到拿到了鎖,并且支持通過context來取消獲取鎖。
* func (m *Mutex) Unlock(ctx context.Context) error,解鎖
因此在使用etcd提供的分布式鎖式非常簡(jiǎn)單,通常就是實(shí)例化一個(gè)mutex,然后嘗試搶占鎖,之后進(jìn)行業(yè)務(wù)處理,最后解鎖即可。
demo:
package main
import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"log"
"os"
"os/signal"
"time"
)
func main() {
c := make(chan os.Signal)
signal.Notify(c)
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
lockKey := "/lock"
go func () {
session, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
m := concurrency.NewMutex(session, lockKey)
if err := m.Lock(context.TODO()); err != nil {
log.Fatal("go1 get mutex failed " + err.Error())
}
fmt.Printf("go1 get mutex sucess\n")
fmt.Println(m)
time.Sleep(time.Duration(10) * time.Second)
m.Unlock(context.TODO())
fmt.Printf("go1 release lock\n")
}()
go func() {
time.Sleep(time.Duration(2) * time.Second)
session, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
m := concurrency.NewMutex(session, lockKey)
if err := m.Lock(context.TODO()); err != nil {
log.Fatal("go2 get mutex failed " + err.Error())
}
fmt.Printf("go2 get mutex sucess\n")
fmt.Println(m)
time.Sleep(time.Duration(2) * time.Second)
m.Unlock(context.TODO())
fmt.Printf("go2 release lock\n")
}()
<-c
}
2.6.4 原理
Lock()函數(shù)的實(shí)現(xiàn)很簡(jiǎn)單:
// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
s := m.s
client := m.s.Client()
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
} else {
m.hdr = hdr
}
return werr
}
首先通過一個(gè)事務(wù)來嘗試加鎖,這個(gè)事務(wù)主要包含了4個(gè)操作: cmp、put、get、getOwner。需要注意的是,key是由pfx和Lease()組成的。
- cmp: 比較加鎖的key的修訂版本是否是0。如果是0就代表這個(gè)鎖不存在。
- put: 向加鎖的key中存儲(chǔ)一個(gè)空值,這個(gè)操作就是一個(gè)加鎖的操作,但是這把鎖是有超時(shí)時(shí)間的,超時(shí)的時(shí)間是session的默認(rèn)時(shí)長。超時(shí)是為了防止鎖沒有被正常釋放導(dǎo)致死鎖。
- get: get就是通過key來查詢
- getOwner: 注意這里是用m.pfx來查詢的,并且?guī)Я瞬樵儏?shù)WithFirstCreate()。使用pfx來查詢是因?yàn)槠渌膕ession也會(huì)用同樣的pfx來嘗試加鎖,并且因?yàn)槊總€(gè)LeaseID都不同,所以第一次肯定會(huì)put成功。但是只有最早使用這個(gè)pfx的session才是持有鎖的,所以這個(gè)getOwner的含義就是這樣的。
接下來才是通過判斷來檢查是否持有鎖
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
m.myRev是當(dāng)前的版本號(hào),resp.Succeeded是cmp為true時(shí)值為true,否則是false。這里的判斷表明當(dāng)同一個(gè)session非第一次嘗試加鎖,當(dāng)前的版本號(hào)應(yīng)該取這個(gè)key的最新的版本號(hào)。
下面是取得鎖的持有者的key。如果當(dāng)前沒有人持有這把鎖,那么默認(rèn)當(dāng)前會(huì)話獲得了鎖。或者鎖持有者的版本號(hào)和當(dāng)前的版本號(hào)一致, 那么當(dāng)前的會(huì)話就是鎖的持有者。
// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
} else {
m.hdr = hdr
}
上面這段代碼就很好理解了,因?yàn)樽叩竭@里說明沒有獲取到鎖,那么這里等待鎖的刪除。
waitDeletes方法的實(shí)現(xiàn)也很簡(jiǎn)單,但是需要注意的是,這里的getOpts只會(huì)獲取比當(dāng)前會(huì)話版本號(hào)更低的key,然后去監(jiān)控最新的key的刪除。等這個(gè)key刪除了,自己也就拿到鎖了。
這種分布式鎖的實(shí)現(xiàn)和我一開始的預(yù)想是不同的。它不存在鎖的競(jìng)爭(zhēng),不存在重復(fù)的嘗試加鎖的操作。而是通過使用統(tǒng)一的前綴pfx來put,然后根據(jù)各自的版本號(hào)來排隊(duì)獲取鎖。效率非常的高。避免了驚群效應(yīng)
如圖所示,共有4個(gè)session來加鎖,那么根據(jù)revision來排隊(duì),獲取鎖的順序?yàn)閟ession2 -> session3 -> session1 -> session4。
這里面需要注意一個(gè)驚群效應(yīng),每一個(gè)client在鎖住/lock這個(gè)path的時(shí)候,實(shí)際都已經(jīng)插入了自己的數(shù)據(jù),類似/lock/LEASE_ID,并且返回了各自的index(就是raft算法里面的日志索引),而只有最小的才算是拿到了鎖,其他的client需要watch等待。例如client1拿到了鎖,client2和client3在等待,而client2拿到的index比client3的更小,那么對(duì)于client1刪除鎖之后,client3其實(shí)并不關(guān)心,并不需要去watch。所以綜上,等待的節(jié)點(diǎn)只需要watch比自己index小并且差距最小的節(jié)點(diǎn)刪除事件即可。
2.6.5 基于 ETCD的選主
2.6.5.1 機(jī)制
etcd有多種使用場(chǎng)景,Master選舉是其中一種。說起Master選舉,過去常常使用zookeeper,通過創(chuàng)建EPHEMERAL_SEQUENTIAL節(jié)點(diǎn)(臨時(shí)有序節(jié)點(diǎn)),我們選擇序號(hào)最小的節(jié)點(diǎn)作為Master,邏輯直觀,實(shí)現(xiàn)簡(jiǎn)單是其優(yōu)勢(shì),但是要實(shí)現(xiàn)一個(gè)高健壯性的選舉并不簡(jiǎn)單,同時(shí)zookeeper繁雜的擴(kuò)縮容機(jī)制也是沉重的負(fù)擔(dān)。
master 選舉根本上也是搶鎖,與zookeeper直觀選舉邏輯相比,etcd的選舉則需要在我們熟悉它的一系列基本概念后,調(diào)動(dòng)我們充分的想象力:
MVCC,key存在版本屬性,沒被創(chuàng)建時(shí)版本號(hào)為0;
CAS操作,結(jié)合MVCC,可以實(shí)現(xiàn)競(jìng)選邏輯,if(version == 0) set(key,value),通過原子操作,確保只有一臺(tái)機(jī)器能set成功;
Lease租約,可以對(duì)key綁定一個(gè)租約,租約到期時(shí)沒預(yù)約,這個(gè)key就會(huì)被回收;
Watch監(jiān)聽,監(jiān)聽key的變化事件,如果key被刪除,則重新發(fā)起競(jìng)選。
至此,etcd選舉的邏輯大體清晰了,但這一系列操作與zookeeper相比復(fù)雜很多,有沒有已經(jīng)封裝好的庫可以直接拿來用?etcd clientv3 concurrency中有對(duì)選舉及分布式鎖的封裝。后面進(jìn)一步發(fā)現(xiàn),etcdctl v3里已經(jīng)有master選舉的實(shí)現(xiàn)了,下面針對(duì)這部分代碼進(jìn)行簡(jiǎn)單注釋,在最后參考這部分代碼實(shí)現(xiàn)自己的選舉邏輯。
2.6.5.2 etcd選主的實(shí)現(xiàn)
官方示例:https://github.com/etcd-io/etcd/blob/master/clientv3/concurrency/example_election_test.go
如crontab 示例:
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency"
"log"
"time"
)
const prefix = "/election-demo"
const prop = "local"
func main() {
endpoints := []string{"szth-cce-devops00.szth.baidu.com:8379"}
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
campaign(cli, prefix, prop)
}
func campaign(c *clientv3.Client, election string, prop string) {
for {
// 租約到期時(shí)間:5s
s, err := concurrency.NewSession(c, concurrency.WithTTL(5))
if err != nil {
fmt.Println(err)
continue
}
e := concurrency.NewElection(s, election)
ctx := context.TODO()
log.Println("開始競(jìng)選")
err = e.Campaign(ctx, prop)
if err != nil {
log.Println("競(jìng)選 leader失敗,繼續(xù)")
switch {
case err == context.Canceled:
return
default:
continue
}
}
log.Println("獲得leader")
if err := doCrontab(); err != nil {
log.Println("調(diào)用主方法失敗,辭去leader,重新競(jìng)選")
_ = e.Resign(ctx)
continue
}
return
}
}
func doCrontab() error {
for {
fmt.Println("doCrontab")
time.Sleep(time.Second * 4)
//return fmt.Errorf("sss")
}
}
2.6.5.3 etcd選主的原理
/*
* 發(fā)起競(jìng)選
* 未當(dāng)選leader前,會(huì)一直阻塞在Campaign調(diào)用
* 當(dāng)選leader后,等待SIGINT、SIGTERM或session過期而退出
* https://github.com/etcd-io/etcd/blob/master/etcdctl/ctlv3/command/elect_command.go
*/
func campaign(c *clientv3.Client, election string, prop string) error {
//NewSession函數(shù)中創(chuàng)建了一個(gè)lease,默認(rèn)是60s TTL,并會(huì)調(diào)用KeepAlive,永久為這個(gè)lease自動(dòng)續(xù)約(2/3生命周期的時(shí)候執(zhí)行續(xù)約操作)
s, err := concurrency.NewSession(c)
if err != nil {
return err
}
e := concurrency.NewElection(s, election)
ctx, cancel := context.WithCancel(context.TODO())
donec := make(chan struct{})
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigc
cancel()
close(donec)
}()
//競(jìng)選邏輯,將展開分析
if err = e.Campaign(ctx, prop); err != nil {
return err
}
// print key since elected
resp, err := c.Get(ctx, e.Key())
if err != nil {
return err
}
display.Get(*resp)
select {
case <-donec:
case <-s.Done():
return errors.New("elect: session expired")
}
return e.Resign(context.TODO())
}
/*
* 類似于zookeeper的臨時(shí)有序節(jié)點(diǎn),etcd的選舉也是在相應(yīng)的prefix path下面創(chuàng)建key,該key綁定了lease并根據(jù)lease id進(jìn)行命名,
* key創(chuàng)建后就有revision號(hào),這樣使得在prefix path下的key也都是按revision有序
* https://github.com/etcd-io/etcd/blob/master/clientv3/concurrency/election.go
*/
func (e *Election) Campaign(ctx context.Context, val string) error {
s := e.session
client := e.session.Client()
//真正創(chuàng)建的key名為:prefix + lease id
k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
//Txn:transaction,依靠Txn進(jìn)行創(chuàng)建key的CAS操作,當(dāng)key不存在時(shí)才會(huì)成功創(chuàng)建
txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
txn = txn.Else(v3.OpGet(k))
resp, err := txn.Commit()
if err != nil {
return err
}
e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
//如果key已存在,則創(chuàng)建失??;
//當(dāng)key的value與當(dāng)前value不等時(shí),如果自己為leader,則不用重新執(zhí)行選舉直接設(shè)置value;
//否則報(bào)錯(cuò)。
if !resp.Succeeded {
kv := resp.Responses[0].GetResponseRange().Kvs[0]
e.leaderRev = kv.CreateRevision
if string(kv.Value) != val {
if err = e.Proclaim(ctx, val); err != nil {
e.Resign(ctx)
return err
}
}
}
//一直阻塞,直到確認(rèn)自己的create revision為當(dāng)前path中最小,從而確認(rèn)自己當(dāng)選為leader
_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
if err != nil {
// clean up in case of context cancel
select {
case <-ctx.Done():
e.Resign(client.Ctx())
default:
e.leaderSession = nil
}
return err
}
e.hdr = resp.Header
return nil
}
2.7 分布式鎖的安全問題
下面我們來討論一下分布式鎖的安全問題:
-
長時(shí)間的GC pause:熟悉Java的同學(xué)肯定對(duì)GC不陌生,在GC的時(shí)候會(huì)發(fā)生STW(stop-the-world),例如CMS垃圾回收器,他會(huì)有兩個(gè)階段進(jìn)行STW防止引用繼續(xù)進(jìn)行變化。那么有可能會(huì)出現(xiàn)下面圖(引用至Martin反駁Redlock的文章)中這個(gè)情況:
client1獲取了鎖并且設(shè)置了鎖的超時(shí)時(shí)間,但是client1之后出現(xiàn)了STW,這個(gè)STW時(shí)間比較長,導(dǎo)致分布式鎖進(jìn)行了釋放,client2獲取到了鎖,這個(gè)時(shí)候client1恢復(fù)了鎖,那么就會(huì)出現(xiàn)client1,2同時(shí)獲取到鎖,這個(gè)時(shí)候分布式鎖不安全問題就出現(xiàn)了。這個(gè)其實(shí)不僅僅局限于RedLock,對(duì)于我們的ZK,Mysql一樣的有同樣的問題。
- 時(shí)鐘發(fā)生跳躍:對(duì)于Redis服務(wù)器如果其時(shí)間發(fā)生了向跳躍,那么肯定會(huì)影響我們鎖的過期時(shí)間,那么我們的鎖過期時(shí)間就不是我們預(yù)期的了,也會(huì)出現(xiàn)client1和client2獲取到同一把鎖,那么也會(huì)出現(xiàn)不安全,這個(gè)對(duì)于Mysql也會(huì)出現(xiàn)。但是ZK由于沒有設(shè)置過期時(shí)間,那么發(fā)生跳躍也不會(huì)受影響。
- 長時(shí)間的網(wǎng)絡(luò)I/O:這個(gè)問題和我們的GC的STW很像,也就是我們這個(gè)獲取了鎖之后我們進(jìn)行網(wǎng)絡(luò)調(diào)用,其調(diào)用時(shí)間由可能比我們鎖的過期時(shí)間都還長,那么也會(huì)出現(xiàn)不安全的問題,這個(gè)Mysql也會(huì)有,ZK也不會(huì)出現(xiàn)這個(gè)問題。
對(duì)于這三個(gè)問題,在網(wǎng)上包括Redis作者在內(nèi)發(fā)起了很多討論。
2.7.1 GC的STW
對(duì)于這個(gè)問題可以看見基本所有的都會(huì)出現(xiàn)問題,對(duì)于ZK這種他會(huì)生成一個(gè)自增的序列,那么我們真正進(jìn)行對(duì)資源操作的時(shí)候,需要判斷當(dāng)前序列是否是最新,有點(diǎn)類似于我們樂觀鎖。當(dāng)然這個(gè)解法Redis作者進(jìn)行了反駁,你既然都能生成一個(gè)自增的序列了那么你完全不需要加鎖了,也就是可以按照類似于Mysql樂觀鎖的解法去做。
我自己認(rèn)為這種解法增加了復(fù)雜性,當(dāng)我們對(duì)資源操作的時(shí)候需要增加判斷序列號(hào)是否是最新,無論用什么判斷方法都會(huì)增加復(fù)雜度。
2.7.2 時(shí)鐘發(fā)生跳躍
RedLock不安全很大的原因也是因?yàn)闀r(shí)鐘的跳躍,因?yàn)殒i過期強(qiáng)依賴于時(shí)間,但是ZK不需要依賴時(shí)間,依賴每個(gè)節(jié)點(diǎn)的Session。Redis作者也給出了解答:對(duì)于時(shí)間跳躍分為人為調(diào)整和NTP自動(dòng)調(diào)整。
- 人為調(diào)整:人為調(diào)整影響的那么完全可以人為不調(diào)整,這個(gè)是處于可控的。
- NTP自動(dòng)調(diào)整:這個(gè)可以通過一定的優(yōu)化,把跳躍時(shí)間控制的可控范圍內(nèi),雖然會(huì)跳躍,但是是完全可以接受的。
2.7.3長時(shí)間的網(wǎng)絡(luò)I/O
對(duì)于這個(gè)問題的優(yōu)化可以控制網(wǎng)絡(luò)調(diào)用的超時(shí)時(shí)間,把所有網(wǎng)絡(luò)調(diào)用的超時(shí)時(shí)間相加,那么我們鎖過期時(shí)間其實(shí)應(yīng)該大于這個(gè)時(shí)間,當(dāng)然也可以通過優(yōu)化網(wǎng)絡(luò)調(diào)用比如串行改成并行,異步化等??梢詤⒖嘉恼? 并行化-你的高并發(fā)大殺器,異步化-你的高并發(fā)大殺器


