本篇文章會(huì)介紹以下幾點(diǎn):
- 節(jié)點(diǎn)的基本操作
- 統(tǒng)一錯(cuò)誤監(jiān)聽器
- 不可重入共享鎖 & 可重入共享鎖【例子 & 核心源碼剖析】
- 可重入讀寫共享鎖
- 信號(hào)量鎖
- 多對(duì)象共享鎖
簡介
Curator基于原生的Zookeeper Api封裝提供了更加豐富的功能,如:Leader選舉、分布式鎖等。
實(shí)際開發(fā)中,很少場景會(huì)直接使用Zookeeper原生Api開發(fā)的,這是因?yàn)镃urator比原生的Api更加易用、功能更強(qiáng)大,另外對(duì)于監(jiān)聽回調(diào)機(jī)制也做了封裝(Zookeeper原生 Api 回調(diào)監(jiān)聽一次后,后續(xù)就不會(huì)再回調(diào),需要重新設(shè)置回調(diào)機(jī)制,實(shí)現(xiàn)上比較麻煩)
有意思的是,Curator的英文解釋是博物館館長/動(dòng)物園園長,和 Zookeeper 這個(gè)動(dòng)物園管理員身份相近,但職責(zé)和功能更強(qiáng)大,這個(gè)命名十分符合這種關(guān)系,不得不說,歪果仁真會(huì)玩 :)
Curator的官方地址是:https://curator.apache.org/index.html
Curator Maven 依賴
先引入相關(guān)的依賴,接下來開始熟悉相關(guān)的Api操作吧~
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<!-- 屏蔽日志輸出,便于本地學(xué)習(xí) -->
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.6</version>
</dependency>
節(jié)點(diǎn)的基本操作
/**
* @Author: MuggleLee
* @Date: 2022/2/9 下午12:30
*/
public class CuratorUtils {
private static CuratorFramework client = null;
public static CuratorFramework getClient() {
if (client != null) {
return client;
}
client = CuratorFrameworkFactory.builder()
.connectString("zookeeper地址")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
//連接超時(shí)時(shí)間,默認(rèn)15秒
.connectionTimeoutMs(15 * 1000)
//會(huì)話超時(shí)時(shí)間,默認(rèn)60秒
.sessionTimeoutMs(60 * 1000)
.namespace("curator-study")
.build();
client.start();
return client;
}
/**
* 新建節(jié)點(diǎn)
*/
public static void create(CuratorFramework client, CreateMode createMode, String path, String content) throws Exception {
client.create().withMode(createMode).forPath(path, content.getBytes());
}
/**
* 查看節(jié)點(diǎn)
*/
public static String getData(CuratorFramework client, String path) throws Exception {
byte[] bytes = client.getData().forPath(path);
return new String(bytes);
}
/**
* 更新節(jié)點(diǎn)
*/
public static void setData(CuratorFramework client, String path, String content) throws Exception {
client.setData().forPath(path, content.getBytes());
}
/**
* 刪除節(jié)點(diǎn)
*/
public static void delete(CuratorFramework client, String path) throws Exception {
client.delete().forPath(path);
}
public static void main(String[] args) throws Exception {
String path = "/curator-demo";
CuratorFramework client = CuratorUtils.getClient();
create(client, CreateMode.PERSISTENT, path, "curator-test");
String data = getData(client, path);
System.out.println(data);
setData(client, path, "new content");
String newData = getData(client, path);
System.out.println(newData);
delete(client, path);
}
}
通過上面的例子,就能夠發(fā)現(xiàn),實(shí)現(xiàn)節(jié)點(diǎn)的crud是如此簡單。不過這只是最基礎(chǔ)簡單的需求,如果項(xiàng)目上有其他需求,比如新建TTL節(jié)點(diǎn)等等,看下Curator的Api很快就能上手~
分布式鎖
為了保證分布式環(huán)境下的原子性,這時(shí)候就輪到分布式鎖上場啦!
業(yè)界中,分布式鎖最常用的無非就是 Curator 和 Redisson ,從性能上來說,正常情況下 Redisson 會(huì)優(yōu)于 Curator,因?yàn)?Curator 需要?jiǎng)?chuàng)建臨時(shí)節(jié)點(diǎn),客戶端運(yùn)氣不好的話,只是請求到 Follower 節(jié)點(diǎn),那么 Follower 節(jié)點(diǎn)還要請求轉(zhuǎn)發(fā)到 Leader 節(jié)點(diǎn)并執(zhí)行事務(wù),肯定比 Redisson 這種純內(nèi)存操作的慢;但從可靠性和有效性來說,Curator 會(huì)比 Redisson 好,因?yàn)閾?jù)說 Redisson 底層使用 RedLock 算法保證原子性,但在極端情況下卻無法保證,另外加鎖節(jié)點(diǎn)宕機(jī)后,可能最長等待30秒后才能釋放鎖,而通過 Curator 分布式加鎖創(chuàng)建的節(jié)點(diǎn)都是臨時(shí)節(jié)點(diǎn),當(dāng)客戶端下線后,臨時(shí)節(jié)點(diǎn)也會(huì)消失,時(shí)效性好。
上面只是簡單的比較 Curator 和 Redisson,想對(duì)分布式鎖有更深入的讀者,可以參考這篇 分布式鎖看這篇就夠了
接下來,先從官方文檔看下 Curator 分布式鎖是怎么玩的

由官方文檔描述可知,Curator分布式鎖有5種類型:
- Shared Reentrant Lock:共享可重入鎖
- Shared Lock :共享鎖
- Shared Reentrant Read Write Lock:共享可重入讀寫鎖
- Shared Semaphore:共享信號(hào)量
- Multi Shared Lock:多對(duì)象共享鎖
這幾種類型的鎖都有一個(gè)共同點(diǎn):Sharded。官方的一段描述:Fully distributed locks that are globally synchronous(全局同步的分布式鎖),這也是意味著全部的客戶端都可見,可以知道鎖當(dāng)前是否有被占用。
那么這幾種類型的鎖,有什么區(qū)別,在什么場景下使用合適呢?
其實(shí)從鎖名稱就可以猜到,Sharded Reentrant Lock 和 Sharded Lock 區(qū)別在與是否可重入;當(dāng)存在讀多寫少的場景,使用 Shared Reentrant Read Write Lock 會(huì)更加合適;當(dāng)需要限制獲取某個(gè)資源的數(shù)量,可以使用 Shared Semaphore 設(shè)置信號(hào)量;如果需要同時(shí)對(duì)多個(gè)對(duì)象同時(shí)加鎖,比如發(fā)紅包的時(shí)候,除了要對(duì)紅包這個(gè)對(duì)象加鎖,還要給賬號(hào)錢包這個(gè)對(duì)象加鎖,這樣才能保證在搶紅包的同時(shí),賬號(hào)錢包的錢也不會(huì)因?yàn)槠渌l(fā)線程修改金額導(dǎo)致數(shù)據(jù)不一致了,這時(shí)候就可以使用 Multi Sharded Lock。
ps.不太了解可重入鎖概念的同學(xué),可以簡單理解為同一個(gè)客戶端可重復(fù)的加鎖;如果對(duì)信號(hào)量的使用不太了解的話,可以簡單理解類似令牌的東西,不能超過設(shè)定的數(shù)量線程獲取到鎖,作用是保護(hù)共享資源。
可重入鎖和信號(hào)量鎖可分別看下我過往寫過的文章:
多線程之ReentrantLock源碼剖析
多線程之并發(fā)類CountDownLatch、CyclicBarrier和Semaphor的使用
統(tǒng)一錯(cuò)誤監(jiān)聽器
查看官方 Curator 分布式鎖的時(shí)候,發(fā)現(xiàn)每個(gè)類型的分布式鎖介紹,都有一處相同的說明
Error Handling
It is strongly recommended that you add aConnectionStateListenerand watch for SUSPENDED and LOST state changes. If a SUSPENDED state is reported you cannot be certain that you still hold the lock unless you subsequently receive a RECONNECTED state. If a LOST state is reported it is certain that you no longer hold the lock.
大概的意思就是,開發(fā)者可以實(shí)現(xiàn)一個(gè)統(tǒng)一錯(cuò)誤處理的監(jiān)聽器,監(jiān)聽器用于監(jiān)聽 SUSPENDED 和 LOST狀態(tài),當(dāng)發(fā)生 SUSPENDED 或 LOST 會(huì)告知我們相應(yīng)的信息。
監(jiān)聽器的實(shí)現(xiàn)并不難,實(shí)現(xiàn) ConnectionStateListener接口就行,示例代碼如下:
public class CuratorConnectionStateListener implements ConnectionStateListener {
// 節(jié)點(diǎn)路徑
private String zkPath;
// 節(jié)點(diǎn)內(nèi)容
private String content;
public CuratorConnectionStateListener(String zkPath, String content) {
this.zkPath = zkPath;
this.content = content;
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.LOST) {
System.out.println("====== 連接丟失 ======");
while(true){
try {
System.err.println("====== 嘗試重新連接 ======");
if(client.getZookeeperClient().blockUntilConnectedOrTimedOut()){
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkPath, content.getBytes("UTF-8"));
break;
}
} catch (InterruptedException e) {
break;
} catch (Exception e){
// 可以根據(jù)業(yè)務(wù),在這里設(shè)置告警、故障恢復(fù)等設(shè)置
e.printStackTrace();
}
}
} else if (newState == ConnectionState.CONNECTED) {
System.out.println("====== 新建連接 ======");
} else if (newState == ConnectionState.RECONNECTED) {
System.out.println("====== 重新連接 ======");
}
}
}
不可重入共享鎖 & 可重入共享鎖
不可重入共享鎖核心類:InterProcessSemaphoreMutex
可重入共享鎖核心類:InterProcessMutex
/**
* 驗(yàn)證目的:
* 1.同一個(gè)線程是否可重復(fù)獲取鎖
* 2.不同線程獲取鎖是否互斥
*/
public static void main(String[] args) throws Exception {
String lockPath = "/sharded-lock";
CuratorFramework client = CuratorUtils.getClient();
// 添加連接狀態(tài)的監(jiān)聽器
CuratorConnectionStateListener connectionStateListener = new CuratorConnectionStateListener(lockPath, "不可重入的共享鎖");
client.getConnectionStateListenable().addListener(connectionStateListener);
// A場景:使用不可重入共享鎖
InterProcessSemaphoreMutex thread1Locks = new InterProcessSemaphoreMutex(client, lockPath);
InterProcessSemaphoreMutex thread2Locks = new InterProcessSemaphoreMutex(client, lockPath);
// B場景:使用可重入共享鎖
// InterProcessMutex thread1Locks = new InterProcessMutex(client, lockPath);
// InterProcessMutex thread2Locks = new InterProcessMutex(client, lockPath);
new Thread(() -> {
try {
if (thread1Locks.acquire(100, TimeUnit.MILLISECONDS)) {
System.out.println(Thread.currentThread().getName() + ":第一次加鎖成功");
if (thread1Locks.acquire(100, TimeUnit.MILLISECONDS)) {
System.out.println(Thread.currentThread().getName() + ":重復(fù)加鎖成功");
} else {
System.out.println(Thread.currentThread().getName() + ":重復(fù)加鎖失敗");
}
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName() + "執(zhí)行業(yè)務(wù)邏輯,花費(fèi)2秒");
} else {
System.out.println(Thread.currentThread().getName() + ":第一次加鎖失敗");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
thread1Locks.release();
System.out.println(Thread.currentThread().getName() + ":釋放鎖成功!");
} catch (Exception e) {
e.printStackTrace();
}
}
}, "線程1").start();
// 休眠的目的是讓線程1先執(zhí)行
Thread.sleep(500);
new Thread(() -> {
try {
if (thread2Locks.acquire(100, TimeUnit.MILLISECONDS)) {
System.out.println(Thread.currentThread().getName() + ":加鎖成功");
} else {
System.out.println(Thread.currentThread().getName() + ":加鎖失敗");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (thread2Locks.isAcquiredInThisProcess()) {
thread2Locks.release();
System.out.println(Thread.currentThread().getName() + ":釋放鎖成功!");
} else {
System.out.println(Thread.currentThread().getName() + ":釋放鎖失??!原因是當(dāng)前鎖對(duì)象并沒有占用鎖");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}, "線程2").start();
}
輸出結(jié)果:
線程1:第一次加鎖成功
線程1:重復(fù)加鎖失敗
線程2:加鎖失敗
線程2:釋放鎖失?。≡蚴钱?dāng)前鎖對(duì)象并沒有占用鎖
線程1執(zhí)行業(yè)務(wù)邏輯,花費(fèi)2秒
線程1:釋放鎖成功!
從輸出結(jié)果可以看出,同一個(gè)線程不允許重復(fù)加鎖,不同的線程會(huì)存在競爭關(guān)系,相互互斥。
接下來,關(guān)注一下細(xì)節(jié)部分。使用共享鎖的核心類是InterProcessSemaphoreMutex,從命名上來看,我第一次看官方文檔是一臉懵逼的,為什么有 Semaphore 這一個(gè)詞?難不成底層是用 JUC 的 Semaphore 并發(fā)鎖保證鎖互斥的?
結(jié)論先行:實(shí)際上并不是使用 JUC 的 Semaphore,不過底層實(shí)現(xiàn)是相似的思想,通過類似信號(hào)量的概念,限制加鎖的次數(shù)。
咱們先看下,當(dāng)執(zhí)行 new InterProcessSemaphoreMutex(client, path) 的時(shí)候,做了什么事情。為了驗(yàn)證上面的猜想,省略部分源碼。
public class InterProcessSemaphoreMutex implements InterProcessLock {
// 省略其他源碼
public InterProcessSemaphoreMutex(CuratorFramework client, String path) {
// 省略其他源碼
this.semaphore = new InterProcessSemaphoreV2(watcherRemoveClient, path, 1);
}
}
再重點(diǎn)看下InterProcessSemaphoreV2對(duì)象創(chuàng)建的流程
public class InterProcessSemaphoreV2 {
public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases) {
this(client, path, maxLeases, null);
}
private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count) {
// 注意這兩行代碼,鎖對(duì)象是通過 InterProcessMutex 類創(chuàng)建的,而 maxLeases 是通過初始化時(shí)傳 1 進(jìn)來
lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
this.maxLeases = (count != null) ? count.getCount() : maxLeases;
// 省略其他源碼
}
}
從 InterProcessSemaphoreV2 對(duì)象初始化過程可知,InterProcessSemaphoreV2 實(shí)際上組合使用了 InterProcessMutex 可重入共享鎖,通過 maxLeases參數(shù)限制只能成功加鎖一次,使用起來就類似信號(hào)量,限制加鎖次數(shù)。所以從共享鎖的命名來看,使用 Semaphore 就感覺合理了...
底層的加鎖邏輯,實(shí)際上都是調(diào)用 InterProcessMutex 這個(gè)類的方法,這里先賣個(gè)關(guān)子,先不看加鎖的邏輯,而是看下 release() 方法是怎樣釋放鎖的。
public void release() throws Exception {
// 省略其他源碼
Lease lease = this.lease;
this.lease = null;
lease.close();
}
lease 的英文解釋是租約、租契,咱們可以理解為加鎖的租契,加鎖成功。釋放鎖的源碼中,將lease設(shè)置為null,就相當(dāng)于丟棄了這個(gè)租契,那么再次獲取鎖的時(shí)候發(fā)現(xiàn) lease 并沒有租契,就相當(dāng)于沒有線程占用,就可以嘗試競爭加鎖了。
至于 InterProcessMutex 內(nèi)的加鎖邏輯,先看完 InterProcessMutex 的例子再具體看下源碼
只需要將上面例子中的A場景代碼注釋調(diào),然后打開B場景的代碼,其實(shí)就是使用可重入共享鎖,先看下執(zhí)行后的輸出結(jié)果為:
線程1:第一次加鎖成功
線程1:重復(fù)加鎖成功
線程2:加鎖失敗
線程2:釋放鎖失??!原因是當(dāng)前鎖對(duì)象并沒有占用鎖
線程1執(zhí)行業(yè)務(wù)邏輯,花費(fèi)2秒
線程1:釋放鎖成功!
結(jié)論先行:使用 InterProcessMutex 可以讓同一個(gè)線程重復(fù)加鎖,但不同線程獲取鎖是互斥。這個(gè)也是符合前面 InterProcessMutex 類的描述!
剖析可重入鎖底層加鎖和釋放鎖源碼
ps. 本文為了篇幅和著重核心代碼,所以會(huì)省略部分源碼。強(qiáng)烈建議讀者可以根據(jù)上述例子debug源碼,這樣才能從被動(dòng)吸收變成主動(dòng)吸收知識(shí),這樣對(duì)于底層邏輯的印象和理解才會(huì)更深入~
在 InterProcessMutex 內(nèi)會(huì)為每一個(gè)第一次加鎖成功的線程封裝為一個(gè) LockData 對(duì)象,并將當(dāng)前 LockData 對(duì)象加入 ConcurrentMap 中
// 并發(fā)集合,key是加鎖成功的線程對(duì)象,vlaue則是lockData對(duì)象
private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData = Maps.newConcurrentMap();
/**
* 加鎖的對(duì)象。owningThread代表加鎖的線程對(duì)象、lockPath代表在ZK中加鎖的路徑、lockCount代表加鎖的次數(shù)(為可重入鋪墊)
*/
private static class LockData {
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1);
private LockData(Thread owningThread, String lockPath) {
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
接下來再看一下,調(diào)用 acquire 方法實(shí)際上做了什么
public boolean acquire(long time, TimeUnit unit) throws Exception {
return internalLock(time, unit);
}
private boolean internalLock(long time, TimeUnit unit) throws Exception {
Thread currentThread = Thread.currentThread();
// 從集合中獲取當(dāng)前線程的lockData對(duì)象
InterProcessMutex.LockData lockData = threadData.get(currentThread);
// 如果集合已存在當(dāng)前線程的lockData對(duì)象,證明當(dāng)前線程已加鎖成功且還沒釋放鎖
if (lockData != null) {
// re-entering,可重入,直接返回true代表重復(fù)加鎖成功
lockData.lockCount.incrementAndGet();
return true;
}
// 走到這里,就證明當(dāng)前線程還沒成功加鎖。那么就嘗試加鎖,如果加鎖失敗返回null
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
// 加鎖成功,返回的路徑不是null;先初始化lockData對(duì)象,然后插入到threadData集合中
if (lockPath != null) {
InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
threadData.put(currentThread, newLockData);
return true;
}
// 走到這里,就證明lockPath為null,加鎖失敗返回false
return false;
}
結(jié)合注釋,可以知道可重入鎖實(shí)際上是判斷當(dāng)前線程是否存在 LockData 對(duì)象,存在的話就自增 LockData 對(duì)象內(nèi)的 lockCount 參數(shù);不存在就走 attemptLock方法嘗試加鎖,加鎖成功就返回 true 告知客戶端加鎖成功?,F(xiàn)在,咱們逐漸接近真相了,加鎖最終邏輯就是在attemptLock方法內(nèi)!
// 省略部分源碼,請結(jié)合源碼綜合查看
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
String ourPath = null;
boolean hasTheLock = false;
// 給當(dāng)前嘗試加鎖的客戶端創(chuàng)建臨時(shí)的順序節(jié)點(diǎn),并返回節(jié)點(diǎn)的路徑
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
// 嘗試加鎖
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
if (hasTheLock) {
return ourPath;
}
return null;
}
// 省略部分源碼,請結(jié)合源碼綜合查看
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
List<String> children = getSortedChildren();
// 當(dāng)前順序節(jié)點(diǎn)的名稱
String sequenceNodeName = ourPath.substring(basePath.length() + 1);
// 嘗試獲取鎖
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if (predicateResults.getsTheLock()) {
haveTheLock = true;
} else {
// 從順序節(jié)點(diǎn)中獲取上一個(gè)節(jié)點(diǎn)
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized (this) {
// 給上一個(gè)節(jié)點(diǎn)添加監(jiān)聽器
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
// 調(diào)用wait方法等待
}
}
}
return haveTheLock;
}
再進(jìn)一步看下 driver.getsTheLock的流程(StandardLockInternalsDriver 類)
// 省略部分源碼
public class StandardLockInternalsDriver implements LockInternalsDriver {
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
// 當(dāng)前節(jié)點(diǎn)在這個(gè)子目錄下的順序
int ourIndex = children.indexOf(sequenceNodeName);
// maxLeases是初始化 InterProcessMutex 的時(shí)候傳的,值固定為1。
// 什么情況下 getsTheLock 為 true?
// 只有當(dāng) ourIndex 為0才可能是true,也就是在當(dāng)前目錄下,臨時(shí)順序節(jié)點(diǎn)的索引是0才算加鎖成功
// 換言之,只有排在目錄的第一位才能加鎖成功;當(dāng)釋放鎖的時(shí)候會(huì)刪除第一位臨時(shí)順序節(jié)點(diǎn),這樣的話才可以讓下一位成功加鎖
boolean getsTheLock = ourIndex < maxLeases;
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
}
到此,加鎖的主流程源碼都已剖析了。最后再看下釋放鎖的主流程:
// 省略部分源碼
public void release() throws Exception {
Thread currentThread = Thread.currentThread();
InterProcessMutex.LockData lockData = threadData.get(currentThread);
try {
// 釋放鎖
internals.releaseLock(lockData.lockPath);
} finally {
threadData.remove(currentThread);
}
}
// 省略部分源碼
final void releaseLock(String lockPath) throws Exception {
// 刪掉路徑上的臨時(shí)節(jié)點(diǎn)
deleteOurPath(lockPath);
}
釋放鎖的流程相對(duì)加鎖就簡單多了,重點(diǎn)就是刪除臨時(shí)節(jié)點(diǎn),當(dāng)臨時(shí)節(jié)點(diǎn)刪除后就會(huì)觸發(fā)回調(diào),通知下一個(gè)節(jié)點(diǎn)加鎖。
通過前面一大輪的核心源碼剖析,再回頭想想整個(gè)加鎖和釋放鎖核心邏輯,小結(jié)一下:
創(chuàng)建 InterProcessMutex對(duì)象時(shí),固定 maxLeases 為 1,表明只有一個(gè)線程加鎖成功
為每一個(gè)嘗試加鎖的線程創(chuàng)建臨時(shí)順序節(jié)點(diǎn)
如果當(dāng)前順序節(jié)點(diǎn)處于該目錄下的第一位,則加鎖成功;否則就調(diào)用wait方法等待,且給上一個(gè)臨時(shí)順序節(jié)點(diǎn)添加監(jiān)聽器(上一個(gè)節(jié)點(diǎn)釋放后回調(diào))
當(dāng)釋放鎖的時(shí)候會(huì)刪除臨時(shí)順序節(jié)點(diǎn),并回調(diào)通知下一個(gè)臨時(shí)順序節(jié)點(diǎn),這樣的話,下一個(gè)臨時(shí)順序節(jié)點(diǎn)就能感知自己可以嘗試加鎖
上面剖析了核心流程,實(shí)際上很少直接調(diào)用 acquire 方法請求加鎖,因?yàn)榫€程請求 acquire 方法就會(huì)阻塞線程,在并發(fā)量大的情況下就會(huì)導(dǎo)致服務(wù)器內(nèi)大量線程被占用,最終會(huì)耗盡資源導(dǎo)致服務(wù)異常。一般的業(yè)務(wù)場景都會(huì)設(shè)置請求加鎖的過期時(shí)間,這樣在短時(shí)間內(nèi)無法加鎖成功就會(huì)走加鎖失敗的邏輯,就不會(huì)在一直等待。感興趣的讀者可以細(xì)看acquire(long time, TimeUnit unit)的流程,核心流程和上面說的一樣,只不過是加上一些時(shí)間判斷,如果超時(shí)就會(huì)刪除臨時(shí)順序節(jié)點(diǎn)而已。
可重入讀寫共享鎖
/**
* 讀寫鎖簡單例子
*/
public static void main(String[] args) throws Exception {
String lockPath = "/read-write-sharded-lock";
CuratorFramework client = CuratorUtils.getClient();
// 添加連接狀態(tài)的監(jiān)聽器
CuratorConnectionStateListener connectionStateListener = new CuratorConnectionStateListener(lockPath, "讀寫鎖");
client.getConnectionStateListenable().addListener(connectionStateListener);
// 可重入讀寫共享鎖
InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, lockPath);
InterProcessMutex readLock = readWriteLock.readLock();
InterProcessMutex writeLock = readWriteLock.writeLock();
new Thread(() -> {
try {
readLock.acquire();
System.out.println(Thread.currentThread().getName() + "獲取讀鎖成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (readLock.isOwnedByCurrentThread()) {
readLock.release();
System.out.println(Thread.currentThread().getName() + "釋放讀鎖成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}, "線程1").start();
new Thread(() -> {
try {
writeLock.acquire();
System.out.println(Thread.currentThread().getName() + "獲取寫鎖成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (writeLock.isOwnedByCurrentThread()) {
writeLock.release();
System.out.println(Thread.currentThread().getName() + "釋放寫鎖成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}, "線程2").start();
}
輸出結(jié)果:
線程1獲取讀鎖成功
線程1釋放讀鎖成功
線程2獲取寫鎖成功
線程2釋放寫鎖成功
上面只是讀寫鎖的簡單例子,但讀寫鎖真實(shí)的使用場景會(huì)稍微復(fù)雜點(diǎn),比如下面四個(gè)問題,需要從源碼中找到答案!
不同線程獲取讀鎖是否互斥?答:不互斥
不同線程獲取寫鎖是否互斥?答:互斥
同一線程,可不可以先獲取讀鎖、再獲取寫鎖?答:不可以。如果同一線程先加讀鎖成功,后再請求加寫鎖,會(huì)阻塞加寫鎖請求
同一線程,可不可以先獲取寫鎖、再獲取讀鎖?答:可以。如果同一線程先加寫鎖成功,后再請求讀鎖,并不會(huì)阻塞讀鎖的加鎖請求
如果對(duì)上述結(jié)論有疑問的讀者,可以帶著疑問去閱讀源碼。接下來結(jié)合下面的部分源碼剖析以上四種結(jié)論在源碼內(nèi)怎么實(shí)現(xiàn)的!
可重入讀寫共享鎖底層加鎖和釋放鎖源碼
InterProcessReadWriteLock 內(nèi)創(chuàng)建了一個(gè)繼承 InterProcessMutex 的 InternalInterProcessMutex 內(nèi)部類,然后分別創(chuàng)建了readMutex和writeMutex兩個(gè)對(duì)象。各位有沒有發(fā)現(xiàn)什么?讀寫鎖加鎖邏輯還是通過 InterProcessMutex 呀!所以只要理解上面 InterProcessMute 的流程,理解讀寫鎖的核心邏輯就很簡單啦~
private final InterProcessMutex readMutex;
private final InterProcessMutex writeMutex;
public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) {
lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);
// 初始化寫鎖
writeMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(
client,
basePath,
WRITE_LOCK_NAME,// 設(shè)置節(jié)點(diǎn)名字
lockData,
1, // 寫鎖的 maxLeases 設(shè)置為1
new InterProcessReadWriteLock.SortingLockInternalsDriver() {
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
}
}
);
// 初始化讀鎖
readMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(
client,
basePath,
READ_LOCK_NAME,// 設(shè)置節(jié)點(diǎn)名字
lockData,
Integer.MAX_VALUE, // 讀鎖的 maxLeases 設(shè)置為無限大
new InterProcessReadWriteLock.SortingLockInternalsDriver() {
// 當(dāng)執(zhí)行g(shù)etsTheLock加鎖的時(shí)候?qū)嶋H執(zhí)行到這里
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
return readLockPredicate(children, sequenceNodeName);
}
}
);
}
private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception {
// 如果當(dāng)前線程擁有寫鎖,則當(dāng)前線程請求讀鎖也成功。留意下面初始化 PredicateResults 對(duì)象的 getsTheLock 參數(shù)為true,就代表加讀鎖成功
if (writeMutex.isOwnedByCurrentThread()) {
return new PredicateResults(null, true);
}
// 省略部分源碼
}
// 注意:這里繼承 InterProcessMutex
private static class InternalInterProcessMutex extends InterProcessMutex {
// 省略源碼
}
以下說明的前提是,先成功獲取鎖,但都沒有執(zhí)行到釋放鎖的期間,有其他線程并發(fā)的請求加鎖操作。如果先獲取鎖再釋放鎖,其他線程肯定可以加鎖成功呀!
不同線程獲取讀鎖不互斥。實(shí)際上是因?yàn)樽x鎖的 maxLeases 設(shè)置為 Integer.MAX_VALUE,當(dāng)請求獲取加讀鎖的時(shí)候,執(zhí)行 StandardLockInternalsDriver 類的 getsTheLock 方法,根據(jù) boolean getsTheLock = ourIndex < maxLeases 判斷是否加鎖成功,ourIndex 指的是當(dāng)前臨時(shí)順序節(jié)點(diǎn)在目錄下的索引位置,肯定是小于 maxLeases,所以返回 true,代表加讀鎖成功。
不同線程獲取寫鎖互斥。細(xì)心的讀者肯定留意到創(chuàng)建寫鎖的時(shí)候,maxLeases 設(shè)置為 1,同樣執(zhí)行到 boolean getsTheLock = ourIndex < maxLeases 這一行的時(shí)候,除非當(dāng)前臨時(shí)順序節(jié)點(diǎn)處于該目錄下的第一位,否則都會(huì)返回 false 代表加鎖失敗。這也說明了只有一個(gè)且最先獲取寫鎖的線程能成功加鎖,其他的加寫鎖都會(huì)阻塞。
同一線程,不可以先獲取讀鎖、再獲取寫鎖。在時(shí)間順序上來說,先創(chuàng)建讀鎖的臨時(shí)順序節(jié)點(diǎn),再到寫鎖的臨時(shí)順序節(jié)點(diǎn)。當(dāng)執(zhí)行加寫鎖的操作時(shí)候,執(zhí)行 boolean getsTheLock = ourIndex < maxLeases 發(fā)現(xiàn) ourIndex 為 1,而寫鎖的 maxLeases 也是 1,所以返回 false 加寫鎖失敗。
同一線程,可以先獲取寫鎖、再獲取讀鎖。在時(shí)間順序上來說,先創(chuàng)建寫鎖的臨時(shí)順序節(jié)點(diǎn),再創(chuàng)建讀鎖的臨時(shí)順序節(jié)點(diǎn)。加寫鎖成功后,再到執(zhí)行加讀鎖的操作,因?yàn)樽x鎖的 maxLeases 的值無限大嘛,所以 boolean getsTheLock = ourIndex < maxLeases 會(huì)返回 true 加讀鎖成功。
信號(hào)量鎖
/**
* maxLease代表可成功加鎖的數(shù)量
*/
public static void main(String[] args) {
String lockPath = "/semaphore-sharded-lock";
int maxLeases = 1;
CuratorFramework client = CuratorUtils.getClient();
// 添加連接狀態(tài)的監(jiān)聽器
CuratorConnectionStateListener connectionStateListener = new CuratorConnectionStateListener(lockPath, "讀寫鎖");
client.getConnectionStateListenable().addListener(connectionStateListener);
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, lockPath, maxLeases);
new Thread(() -> {
Lease lease = null;
try {
lease = semaphore.acquire(100, TimeUnit.MILLISECONDS);
if (null != lease) {
System.out.println(Thread.currentThread().getName() + "獲取共享鎖成功");
} else {
System.out.println(Thread.currentThread().getName() + "獲取共享鎖失敗");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != lease) {
semaphore.returnLease(lease);
System.out.println(Thread.currentThread().getName() + "釋放共享鎖成功");
}
}
}, "線程1").start();
new Thread(() -> {
Lease lease = null;
try {
lease = semaphore.acquire(100, TimeUnit.MILLISECONDS);
if (null != lease) {
System.out.println(Thread.currentThread().getName() + "獲取共享鎖成功");
} else {
System.out.println(Thread.currentThread().getName() + "獲取共享鎖失敗");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != lease) {
semaphore.returnLease(lease);
System.out.println(Thread.currentThread().getName() + "釋放共享鎖成功");
}
}
}, "線程2").start();
}
輸出結(jié)果:
線程1獲取共享鎖失敗
線程2獲取共享鎖成功
線程2釋放共享鎖成功
看一下初始化 InterProcessSemaphoreV2 的源碼,我們還是可以發(fā)現(xiàn)底層依然使用的是 InterProcessMutex 類加鎖!
// 省略部分源碼
private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count) {
lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
this.maxLeases = (count != null) ? count.getCount() : maxLeases;
}
剩下加鎖釋放鎖的源碼就不用多說了,只要理解上面 maxLeases 在 InterProcessMutex 的妙用,就可以知道在 InterProcessSemaphoreV2 內(nèi)是通過 maxLeases 控制獲取鎖的數(shù)量。不過信號(hào)量鎖在獲取鎖和釋放鎖的方式上和其它共享鎖稍微有點(diǎn)不一樣,釋放鎖的時(shí)候是調(diào)用 returnLease 方法,參數(shù)是從加鎖成功后返回的 Lease 對(duì)象,不過其釋放鎖的底層原理和上面的 release 方法大同小異,就不再贅述了。
多對(duì)象共享鎖
public static void main(String[] args) throws Exception {
String lockPathA = "/multi-sharded-lock-A";
String lockPathB = "/multi-sharded-lock-B";
String lockPathC = "/multi-sharded-lock-C";
CuratorFramework client = CuratorUtils.getClient();
// 單對(duì)象共享鎖
InterProcessMutex processMutex = new InterProcessMutex(client, lockPathC);
// 多對(duì)象共享鎖
InterProcessMultiLock multiLock = new InterProcessMultiLock(client, Arrays.asList(lockPathA, lockPathB));
new Thread(() -> {
try {
multiLock.acquire();
System.out.println(Thread.currentThread().getName() + "獲取鎖成功");
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
multiLock.release();
System.out.println(Thread.currentThread().getName() + "釋放鎖成功");
} catch (Exception e) {
e.printStackTrace();
}
}
}, "線程1").start();
// 為了讓線程1優(yōu)先執(zhí)行
Thread.sleep(1000);
new Thread(() -> {
try {
if (processMutex.acquire(100, TimeUnit.MILLISECONDS)) {
System.out.println(Thread.currentThread().getName() + "獲取鎖成功");
} else {
System.out.println(Thread.currentThread().getName() + "獲取鎖失敗");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (processMutex.isOwnedByCurrentThread()) {
processMutex.release();
System.out.println(Thread.currentThread().getName() + "釋放鎖成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}, "線程2").start();
}
輸出結(jié)果:
線程1獲取鎖成功
線程2獲取鎖成功
線程2釋放鎖成功
線程1釋放鎖成功
只要將 InterProcessMutex 加鎖的對(duì)象改為 lockPathA 或 lockPathB 就可以發(fā)現(xiàn),線程2加鎖失敗。這也能印證使用InterProcessMultiLock多對(duì)象共享鎖能夠限制對(duì)象只能有一個(gè)線程占有。接下來簡單看下底層的原理,其實(shí)底層依舊用到的是 InterProcessMutex 共享鎖,加鎖和釋放鎖邏輯上面都剖析過,就不再贅述。咱們只需要關(guān)注初始化 InterProcessMultiLock 的時(shí)候做了什么事情。
// 初始化 InterProcessMultiLock
public InterProcessMultiLock(CuratorFramework client, List<String> paths) {
this(makeLocks(client, paths));
}
// 分別對(duì)多個(gè)對(duì)象加上 InterProcessMutex 共享鎖
private static List<InterProcessLock> makeLocks(CuratorFramework client, List<String> paths) {
ImmutableList.Builder<InterProcessLock> builder = ImmutableList.builder();
for (String path : paths) {
InterProcessLock lock = new InterProcessMutex(client, path);
builder.add(lock);
}
return builder.build();
}
// locks用于記錄加鎖的對(duì)象,之后的加鎖和釋放鎖都會(huì)根據(jù)這個(gè)對(duì)象循環(huán)加鎖、釋放鎖
public InterProcessMultiLock(List<InterProcessLock> locks) {
this.locks = ImmutableList.copyOf(locks);
}
簡單來說,就是將多個(gè)對(duì)象分別用 InterProcessMutex 加鎖,鎖住資源;然后在加鎖和釋放鎖的時(shí)候,循環(huán)這些對(duì)象加鎖或釋放鎖就可以了!
總結(jié):
這篇文章重點(diǎn)介紹了 Curator 的幾種分布式鎖使用和核心源碼。不難發(fā)現(xiàn),其實(shí) Curator 只是在 ZK 的基礎(chǔ)上擴(kuò)展了更多功能,其底層其實(shí)就是通過臨時(shí)順序節(jié)點(diǎn)來控制加鎖的順序和鎖的釋放。靈活運(yùn)用臨時(shí)順序節(jié)點(diǎn)的特性,加上 Curator 內(nèi)設(shè)置 maxLeases 等熟悉,演變出可重入鎖、讀寫鎖、信號(hào)量鎖等類型的分布式鎖。另外,本文重點(diǎn)介紹了 InterProcessMutex的核心源碼,只要掌握這個(gè)類,其它的分布式鎖也能快速了解其底層原理。在實(shí)際工作中,可以根據(jù)業(yè)務(wù)自定義類來封裝 Curator 等核心類,用于擴(kuò)展適合自身業(yè)務(wù)的場景。
如果覺得文章不錯(cuò)的話,麻煩點(diǎn)個(gè)贊哈,你的鼓勵(lì)就是我的動(dòng)力!對(duì)于文章有哪里不清楚或者有誤的地方,歡迎在評(píng)論區(qū)留言~
參考資料:
Curator 官網(wǎng):https://curator.apache.org/curator-recipes/index.html