【Zookeeper系列】Curator入門和分布式鎖源碼剖析

本篇文章會(huì)介紹以下幾點(diǎn):

  • 節(jié)點(diǎn)的基本操作
  • 統(tǒng)一錯(cuò)誤監(jiān)聽器
  • 不可重入共享鎖 & 可重入共享鎖【例子 & 核心源碼剖析】
  • 可重入讀寫共享鎖
  • 信號(hào)量鎖
  • 多對(duì)象共享鎖

簡介

Curator基于原生的Zookeeper Api封裝提供了更加豐富的功能,如:Leader選舉、分布式鎖等。

實(shí)際開發(fā)中,很少場景會(huì)直接使用Zookeeper原生Api開發(fā)的,這是因?yàn)镃urator比原生的Api更加易用、功能更強(qiáng)大,另外對(duì)于監(jiān)聽回調(diào)機(jī)制也做了封裝(Zookeeper原生 Api 回調(diào)監(jiān)聽一次后,后續(xù)就不會(huì)再回調(diào),需要重新設(shè)置回調(diào)機(jī)制,實(shí)現(xiàn)上比較麻煩)

有意思的是,Curator的英文解釋是博物館館長/動(dòng)物園園長,和 Zookeeper 這個(gè)動(dòng)物園管理員身份相近,但職責(zé)和功能更強(qiáng)大,這個(gè)命名十分符合這種關(guān)系,不得不說,歪果仁真會(huì)玩 :)

Curator的官方地址是:https://curator.apache.org/index.html

Curator Maven 依賴

先引入相關(guān)的依賴,接下來開始熟悉相關(guān)的Api操作吧~

       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <!-- 屏蔽日志輸出,便于本地學(xué)習(xí) -->
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.1</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.5.6</version>
        </dependency>

節(jié)點(diǎn)的基本操作

    /**
     * @Author: MuggleLee
     * @Date: 2022/2/9 下午12:30
     */
    public class CuratorUtils {

        private static CuratorFramework client = null;

        public static CuratorFramework getClient() {
            if (client != null) {
                return client;
            }
            client = CuratorFrameworkFactory.builder()
                    .connectString("zookeeper地址")
                    .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                    //連接超時(shí)時(shí)間,默認(rèn)15秒
                    .connectionTimeoutMs(15 * 1000)
                    //會(huì)話超時(shí)時(shí)間,默認(rèn)60秒
                    .sessionTimeoutMs(60 * 1000)
                    .namespace("curator-study")
                    .build();
            client.start();
            return client;
        }

        /**
         * 新建節(jié)點(diǎn)
         */
        public static void create(CuratorFramework client, CreateMode createMode, String path, String content) throws Exception {
            client.create().withMode(createMode).forPath(path, content.getBytes());
        }

        /**
         * 查看節(jié)點(diǎn)
         */
        public static String getData(CuratorFramework client, String path) throws Exception {
            byte[] bytes = client.getData().forPath(path);
            return new String(bytes);
        }

        /**
         * 更新節(jié)點(diǎn)
         */
        public static void setData(CuratorFramework client, String path, String content) throws Exception {
            client.setData().forPath(path, content.getBytes());
        }

        /**
         * 刪除節(jié)點(diǎn)
         */
        public static void delete(CuratorFramework client, String path) throws Exception {
            client.delete().forPath(path);
        }

        public static void main(String[] args) throws Exception {
            String path = "/curator-demo";
            CuratorFramework client = CuratorUtils.getClient();
            create(client, CreateMode.PERSISTENT, path, "curator-test");
            String data = getData(client, path);
            System.out.println(data);
            setData(client, path, "new content");
            String newData = getData(client, path);
            System.out.println(newData);
            delete(client, path);
        }
    }

通過上面的例子,就能夠發(fā)現(xiàn),實(shí)現(xiàn)節(jié)點(diǎn)的crud是如此簡單。不過這只是最基礎(chǔ)簡單的需求,如果項(xiàng)目上有其他需求,比如新建TTL節(jié)點(diǎn)等等,看下Curator的Api很快就能上手~

分布式鎖

為了保證分布式環(huán)境下的原子性,這時(shí)候就輪到分布式鎖上場啦!

業(yè)界中,分布式鎖最常用的無非就是 Curator 和 Redisson ,從性能上來說,正常情況下 Redisson 會(huì)優(yōu)于 Curator,因?yàn)?Curator 需要?jiǎng)?chuàng)建臨時(shí)節(jié)點(diǎn),客戶端運(yùn)氣不好的話,只是請求到 Follower 節(jié)點(diǎn),那么 Follower 節(jié)點(diǎn)還要請求轉(zhuǎn)發(fā)到 Leader 節(jié)點(diǎn)并執(zhí)行事務(wù),肯定比 Redisson 這種純內(nèi)存操作的慢;但從可靠性和有效性來說,Curator 會(huì)比 Redisson 好,因?yàn)閾?jù)說 Redisson 底層使用 RedLock 算法保證原子性,但在極端情況下卻無法保證,另外加鎖節(jié)點(diǎn)宕機(jī)后,可能最長等待30秒后才能釋放鎖,而通過 Curator 分布式加鎖創(chuàng)建的節(jié)點(diǎn)都是臨時(shí)節(jié)點(diǎn),當(dāng)客戶端下線后,臨時(shí)節(jié)點(diǎn)也會(huì)消失,時(shí)效性好。

上面只是簡單的比較 Curator 和 Redisson,想對(duì)分布式鎖有更深入的讀者,可以參考這篇 分布式鎖看這篇就夠了

接下來,先從官方文檔看下 Curator 分布式鎖是怎么玩的

分布式鎖介紹.png

由官方文檔描述可知,Curator分布式鎖有5種類型:

這幾種類型的鎖都有一個(gè)共同點(diǎn):Sharded。官方的一段描述:Fully distributed locks that are globally synchronous(全局同步的分布式鎖),這也是意味著全部的客戶端都可見,可以知道鎖當(dāng)前是否有被占用。

那么這幾種類型的鎖,有什么區(qū)別,在什么場景下使用合適呢?

其實(shí)從鎖名稱就可以猜到,Sharded Reentrant Lock 和 Sharded Lock 區(qū)別在與是否可重入;當(dāng)存在讀多寫少的場景,使用 Shared Reentrant Read Write Lock 會(huì)更加合適;當(dāng)需要限制獲取某個(gè)資源的數(shù)量,可以使用 Shared Semaphore 設(shè)置信號(hào)量;如果需要同時(shí)對(duì)多個(gè)對(duì)象同時(shí)加鎖,比如發(fā)紅包的時(shí)候,除了要對(duì)紅包這個(gè)對(duì)象加鎖,還要給賬號(hào)錢包這個(gè)對(duì)象加鎖,這樣才能保證在搶紅包的同時(shí),賬號(hào)錢包的錢也不會(huì)因?yàn)槠渌l(fā)線程修改金額導(dǎo)致數(shù)據(jù)不一致了,這時(shí)候就可以使用 Multi Sharded Lock。

ps.不太了解可重入鎖概念的同學(xué),可以簡單理解為同一個(gè)客戶端可重復(fù)的加鎖;如果對(duì)信號(hào)量的使用不太了解的話,可以簡單理解類似令牌的東西,不能超過設(shè)定的數(shù)量線程獲取到鎖,作用是保護(hù)共享資源。

可重入鎖和信號(hào)量鎖可分別看下我過往寫過的文章:
多線程之ReentrantLock源碼剖析
多線程之并發(fā)類CountDownLatch、CyclicBarrier和Semaphor的使用

統(tǒng)一錯(cuò)誤監(jiān)聽器

查看官方 Curator 分布式鎖的時(shí)候,發(fā)現(xiàn)每個(gè)類型的分布式鎖介紹,都有一處相同的說明

Error Handling
It is strongly recommended that you add a ConnectionStateListener and watch for SUSPENDED and LOST state changes. If a SUSPENDED state is reported you cannot be certain that you still hold the lock unless you subsequently receive a RECONNECTED state. If a LOST state is reported it is certain that you no longer hold the lock.

大概的意思就是,開發(fā)者可以實(shí)現(xiàn)一個(gè)統(tǒng)一錯(cuò)誤處理的監(jiān)聽器,監(jiān)聽器用于監(jiān)聽 SUSPENDEDLOST狀態(tài),當(dāng)發(fā)生 SUSPENDED 或 LOST 會(huì)告知我們相應(yīng)的信息。

監(jiān)聽器的實(shí)現(xiàn)并不難,實(shí)現(xiàn) ConnectionStateListener接口就行,示例代碼如下:

public class CuratorConnectionStateListener implements ConnectionStateListener {
    // 節(jié)點(diǎn)路徑
    private String zkPath;
    // 節(jié)點(diǎn)內(nèi)容
    private String content;

    public CuratorConnectionStateListener(String zkPath, String content) {
        this.zkPath = zkPath;
        this.content = content;
    }

    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState) {
        if (newState == ConnectionState.LOST) {
            System.out.println("====== 連接丟失 ======");
            while(true){
                try {
                    System.err.println("====== 嘗試重新連接 ======");
                    if(client.getZookeeperClient().blockUntilConnectedOrTimedOut()){
                        client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(zkPath, content.getBytes("UTF-8"));
                        break;
                    }
                } catch (InterruptedException e) {
                    break;
                } catch (Exception e){
                    // 可以根據(jù)業(yè)務(wù),在這里設(shè)置告警、故障恢復(fù)等設(shè)置
                    e.printStackTrace();
                }
            }
        } else if (newState == ConnectionState.CONNECTED) {
            System.out.println("====== 新建連接 ======");
        } else if (newState == ConnectionState.RECONNECTED) {
            System.out.println("====== 重新連接 ======");
        }
    }
}

不可重入共享鎖 & 可重入共享鎖

不可重入共享鎖核心類:InterProcessSemaphoreMutex

可重入共享鎖核心類:InterProcessMutex

/**
     * 驗(yàn)證目的:
     * 1.同一個(gè)線程是否可重復(fù)獲取鎖
     * 2.不同線程獲取鎖是否互斥
     */
    public static void main(String[] args) throws Exception {
        String lockPath = "/sharded-lock";
        CuratorFramework client = CuratorUtils.getClient();
        // 添加連接狀態(tài)的監(jiān)聽器
        CuratorConnectionStateListener connectionStateListener = new CuratorConnectionStateListener(lockPath, "不可重入的共享鎖");
        client.getConnectionStateListenable().addListener(connectionStateListener);

        // A場景:使用不可重入共享鎖
        InterProcessSemaphoreMutex thread1Locks = new InterProcessSemaphoreMutex(client, lockPath);
        InterProcessSemaphoreMutex thread2Locks = new InterProcessSemaphoreMutex(client, lockPath);
        // B場景:使用可重入共享鎖
//        InterProcessMutex thread1Locks = new InterProcessMutex(client, lockPath);
//        InterProcessMutex thread2Locks = new InterProcessMutex(client, lockPath);

        new Thread(() -> {
            try {
                if (thread1Locks.acquire(100, TimeUnit.MILLISECONDS)) {
                    System.out.println(Thread.currentThread().getName() + ":第一次加鎖成功");
                    if (thread1Locks.acquire(100, TimeUnit.MILLISECONDS)) {
                        System.out.println(Thread.currentThread().getName() + ":重復(fù)加鎖成功");
                    } else {
                        System.out.println(Thread.currentThread().getName() + ":重復(fù)加鎖失敗");
                    }
                    Thread.sleep(2000);
                    System.out.println(Thread.currentThread().getName() + "執(zhí)行業(yè)務(wù)邏輯,花費(fèi)2秒");
                } else {
                    System.out.println(Thread.currentThread().getName() + ":第一次加鎖失敗");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    thread1Locks.release();
                    System.out.println(Thread.currentThread().getName() + ":釋放鎖成功!");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "線程1").start();
        // 休眠的目的是讓線程1先執(zhí)行
        Thread.sleep(500);
        new Thread(() -> {
            try {
                if (thread2Locks.acquire(100, TimeUnit.MILLISECONDS)) {
                    System.out.println(Thread.currentThread().getName() + ":加鎖成功");
                } else {
                    System.out.println(Thread.currentThread().getName() + ":加鎖失敗");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (thread2Locks.isAcquiredInThisProcess()) {
                        thread2Locks.release();
                        System.out.println(Thread.currentThread().getName() + ":釋放鎖成功!");
                    } else {
                        System.out.println(Thread.currentThread().getName() + ":釋放鎖失??!原因是當(dāng)前鎖對(duì)象并沒有占用鎖");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "線程2").start();
    }

輸出結(jié)果:

線程1:第一次加鎖成功
線程1:重復(fù)加鎖失敗
線程2:加鎖失敗
線程2:釋放鎖失?。≡蚴钱?dāng)前鎖對(duì)象并沒有占用鎖
線程1執(zhí)行業(yè)務(wù)邏輯,花費(fèi)2秒
線程1:釋放鎖成功!

從輸出結(jié)果可以看出,同一個(gè)線程不允許重復(fù)加鎖,不同的線程會(huì)存在競爭關(guān)系,相互互斥。

接下來,關(guān)注一下細(xì)節(jié)部分。使用共享鎖的核心類是InterProcessSemaphoreMutex,從命名上來看,我第一次看官方文檔是一臉懵逼的,為什么有 Semaphore 這一個(gè)詞?難不成底層是用 JUC 的 Semaphore 并發(fā)鎖保證鎖互斥的?

結(jié)論先行:實(shí)際上并不是使用 JUC 的 Semaphore,不過底層實(shí)現(xiàn)是相似的思想,通過類似信號(hào)量的概念,限制加鎖的次數(shù)。

咱們先看下,當(dāng)執(zhí)行 new InterProcessSemaphoreMutex(client, path) 的時(shí)候,做了什么事情。為了驗(yàn)證上面的猜想,省略部分源碼。

    public class InterProcessSemaphoreMutex implements InterProcessLock {
        // 省略其他源碼
        public InterProcessSemaphoreMutex(CuratorFramework client, String path) {
            // 省略其他源碼
            this.semaphore = new InterProcessSemaphoreV2(watcherRemoveClient, path, 1);
        }
    }

再重點(diǎn)看下InterProcessSemaphoreV2對(duì)象創(chuàng)建的流程

    public class InterProcessSemaphoreV2 {
        public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases) {
            this(client, path, maxLeases, null);
        }

        private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count) {
            // 注意這兩行代碼,鎖對(duì)象是通過 InterProcessMutex 類創(chuàng)建的,而 maxLeases 是通過初始化時(shí)傳 1 進(jìn)來
            lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
            this.maxLeases = (count != null) ? count.getCount() : maxLeases;
            // 省略其他源碼
        }
    }

從 InterProcessSemaphoreV2 對(duì)象初始化過程可知,InterProcessSemaphoreV2 實(shí)際上組合使用了 InterProcessMutex 可重入共享鎖,通過 maxLeases參數(shù)限制只能成功加鎖一次,使用起來就類似信號(hào)量,限制加鎖次數(shù)。所以從共享鎖的命名來看,使用 Semaphore 就感覺合理了...

底層的加鎖邏輯,實(shí)際上都是調(diào)用 InterProcessMutex 這個(gè)類的方法,這里先賣個(gè)關(guān)子,先不看加鎖的邏輯,而是看下 release() 方法是怎樣釋放鎖的。

    public void release() throws Exception {
        // 省略其他源碼
        Lease lease = this.lease;
        this.lease = null;
        lease.close();
    }

lease 的英文解釋是租約、租契,咱們可以理解為加鎖的租契,加鎖成功。釋放鎖的源碼中,將lease設(shè)置為null,就相當(dāng)于丟棄了這個(gè)租契,那么再次獲取鎖的時(shí)候發(fā)現(xiàn) lease 并沒有租契,就相當(dāng)于沒有線程占用,就可以嘗試競爭加鎖了。

至于 InterProcessMutex 內(nèi)的加鎖邏輯,先看完 InterProcessMutex 的例子再具體看下源碼

只需要將上面例子中的A場景代碼注釋調(diào),然后打開B場景的代碼,其實(shí)就是使用可重入共享鎖,先看下執(zhí)行后的輸出結(jié)果為:

線程1:第一次加鎖成功
線程1:重復(fù)加鎖成功
線程2:加鎖失敗
線程2:釋放鎖失??!原因是當(dāng)前鎖對(duì)象并沒有占用鎖
線程1執(zhí)行業(yè)務(wù)邏輯,花費(fèi)2秒
線程1:釋放鎖成功!

結(jié)論先行:使用 InterProcessMutex 可以讓同一個(gè)線程重復(fù)加鎖,但不同線程獲取鎖是互斥。這個(gè)也是符合前面 InterProcessMutex 類的描述!

剖析可重入鎖底層加鎖和釋放鎖源碼

ps. 本文為了篇幅和著重核心代碼,所以會(huì)省略部分源碼。強(qiáng)烈建議讀者可以根據(jù)上述例子debug源碼,這樣才能從被動(dòng)吸收變成主動(dòng)吸收知識(shí),這樣對(duì)于底層邏輯的印象和理解才會(huì)更深入~

在 InterProcessMutex 內(nèi)會(huì)為每一個(gè)第一次加鎖成功的線程封裝為一個(gè) LockData 對(duì)象,并將當(dāng)前 LockData 對(duì)象加入 ConcurrentMap 中

    // 并發(fā)集合,key是加鎖成功的線程對(duì)象,vlaue則是lockData對(duì)象
    private final ConcurrentMap<Thread, InterProcessMutex.LockData> threadData = Maps.newConcurrentMap();

    /**
     * 加鎖的對(duì)象。owningThread代表加鎖的線程對(duì)象、lockPath代表在ZK中加鎖的路徑、lockCount代表加鎖的次數(shù)(為可重入鋪墊)
     */
    private static class LockData {
        final Thread owningThread;
        final String lockPath;
        final AtomicInteger lockCount = new AtomicInteger(1);


        private LockData(Thread owningThread, String lockPath) {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }

接下來再看一下,調(diào)用 acquire 方法實(shí)際上做了什么

    public boolean acquire(long time, TimeUnit unit) throws Exception {
        return internalLock(time, unit);
    }

    private boolean internalLock(long time, TimeUnit unit) throws Exception {
        Thread currentThread = Thread.currentThread();
        // 從集合中獲取當(dāng)前線程的lockData對(duì)象
        InterProcessMutex.LockData lockData = threadData.get(currentThread);
        // 如果集合已存在當(dāng)前線程的lockData對(duì)象,證明當(dāng)前線程已加鎖成功且還沒釋放鎖
        if (lockData != null) {
            // re-entering,可重入,直接返回true代表重復(fù)加鎖成功
            lockData.lockCount.incrementAndGet();
            return true;
        }

        // 走到這里,就證明當(dāng)前線程還沒成功加鎖。那么就嘗試加鎖,如果加鎖失敗返回null
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        // 加鎖成功,返回的路徑不是null;先初始化lockData對(duì)象,然后插入到threadData集合中
        if (lockPath != null) {
            InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }
        // 走到這里,就證明lockPath為null,加鎖失敗返回false
        return false;
    }

結(jié)合注釋,可以知道可重入鎖實(shí)際上是判斷當(dāng)前線程是否存在 LockData 對(duì)象,存在的話就自增 LockData 對(duì)象內(nèi)的 lockCount 參數(shù);不存在就走 attemptLock方法嘗試加鎖,加鎖成功就返回 true 告知客戶端加鎖成功?,F(xiàn)在,咱們逐漸接近真相了,加鎖最終邏輯就是在attemptLock方法內(nèi)!

    // 省略部分源碼,請結(jié)合源碼綜合查看
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        String ourPath = null;
        boolean hasTheLock = false;
        // 給當(dāng)前嘗試加鎖的客戶端創(chuàng)建臨時(shí)的順序節(jié)點(diǎn),并返回節(jié)點(diǎn)的路徑
        ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
        // 嘗試加鎖
        hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        if (hasTheLock) {
            return ourPath;
        }
        return null;
    }

    // 省略部分源碼,請結(jié)合源碼綜合查看
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
            List<String> children = getSortedChildren();
            // 當(dāng)前順序節(jié)點(diǎn)的名稱
            String sequenceNodeName = ourPath.substring(basePath.length() + 1);
            // 嘗試獲取鎖
            PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            if (predicateResults.getsTheLock()) {
                haveTheLock = true;
            } else {
                // 從順序節(jié)點(diǎn)中獲取上一個(gè)節(jié)點(diǎn)
                String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                synchronized (this) {
                    // 給上一個(gè)節(jié)點(diǎn)添加監(jiān)聽器
                    client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                    // 調(diào)用wait方法等待
                }
            }
        }
        return haveTheLock;
    }

再進(jìn)一步看下 driver.getsTheLock的流程(StandardLockInternalsDriver 類)

    // 省略部分源碼
    public class StandardLockInternalsDriver implements LockInternalsDriver {
        @Override
        public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
            // 當(dāng)前節(jié)點(diǎn)在這個(gè)子目錄下的順序
            int ourIndex = children.indexOf(sequenceNodeName);
            // maxLeases是初始化 InterProcessMutex 的時(shí)候傳的,值固定為1。
            // 什么情況下 getsTheLock 為 true?
            // 只有當(dāng) ourIndex 為0才可能是true,也就是在當(dāng)前目錄下,臨時(shí)順序節(jié)點(diǎn)的索引是0才算加鎖成功
            // 換言之,只有排在目錄的第一位才能加鎖成功;當(dāng)釋放鎖的時(shí)候會(huì)刪除第一位臨時(shí)順序節(jié)點(diǎn),這樣的話才可以讓下一位成功加鎖
            boolean getsTheLock = ourIndex < maxLeases;
            String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
            return new PredicateResults(pathToWatch, getsTheLock);
        }
    }

到此,加鎖的主流程源碼都已剖析了。最后再看下釋放鎖的主流程:

    // 省略部分源碼
    public void release() throws Exception {
        Thread currentThread = Thread.currentThread();
        InterProcessMutex.LockData lockData = threadData.get(currentThread);
        try {
            // 釋放鎖
            internals.releaseLock(lockData.lockPath);
        } finally {
            threadData.remove(currentThread);
        }
    }

    // 省略部分源碼
    final void releaseLock(String lockPath) throws Exception {
        // 刪掉路徑上的臨時(shí)節(jié)點(diǎn)
        deleteOurPath(lockPath);
    }

釋放鎖的流程相對(duì)加鎖就簡單多了,重點(diǎn)就是刪除臨時(shí)節(jié)點(diǎn),當(dāng)臨時(shí)節(jié)點(diǎn)刪除后就會(huì)觸發(fā)回調(diào),通知下一個(gè)節(jié)點(diǎn)加鎖。

通過前面一大輪的核心源碼剖析,再回頭想想整個(gè)加鎖和釋放鎖核心邏輯,小結(jié)一下:

  1. 創(chuàng)建 InterProcessMutex對(duì)象時(shí),固定 maxLeases 為 1,表明只有一個(gè)線程加鎖成功

  2. 為每一個(gè)嘗試加鎖的線程創(chuàng)建臨時(shí)順序節(jié)點(diǎn)

  3. 如果當(dāng)前順序節(jié)點(diǎn)處于該目錄下的第一位,則加鎖成功;否則就調(diào)用wait方法等待,且給上一個(gè)臨時(shí)順序節(jié)點(diǎn)添加監(jiān)聽器(上一個(gè)節(jié)點(diǎn)釋放后回調(diào))

  4. 當(dāng)釋放鎖的時(shí)候會(huì)刪除臨時(shí)順序節(jié)點(diǎn),并回調(diào)通知下一個(gè)臨時(shí)順序節(jié)點(diǎn),這樣的話,下一個(gè)臨時(shí)順序節(jié)點(diǎn)就能感知自己可以嘗試加鎖

上面剖析了核心流程,實(shí)際上很少直接調(diào)用 acquire 方法請求加鎖,因?yàn)榫€程請求 acquire 方法就會(huì)阻塞線程,在并發(fā)量大的情況下就會(huì)導(dǎo)致服務(wù)器內(nèi)大量線程被占用,最終會(huì)耗盡資源導(dǎo)致服務(wù)異常。一般的業(yè)務(wù)場景都會(huì)設(shè)置請求加鎖的過期時(shí)間,這樣在短時(shí)間內(nèi)無法加鎖成功就會(huì)走加鎖失敗的邏輯,就不會(huì)在一直等待。感興趣的讀者可以細(xì)看acquire(long time, TimeUnit unit)的流程,核心流程和上面說的一樣,只不過是加上一些時(shí)間判斷,如果超時(shí)就會(huì)刪除臨時(shí)順序節(jié)點(diǎn)而已。

可重入讀寫共享鎖

/**
     * 讀寫鎖簡單例子
     */
    public static void main(String[] args) throws Exception {
        String lockPath = "/read-write-sharded-lock";
        CuratorFramework client = CuratorUtils.getClient();
        // 添加連接狀態(tài)的監(jiān)聽器
        CuratorConnectionStateListener connectionStateListener = new CuratorConnectionStateListener(lockPath, "讀寫鎖");
        client.getConnectionStateListenable().addListener(connectionStateListener);
        // 可重入讀寫共享鎖
        InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, lockPath);
        InterProcessMutex readLock = readWriteLock.readLock();
        InterProcessMutex writeLock = readWriteLock.writeLock();
        new Thread(() -> {
            try {
                readLock.acquire();
                System.out.println(Thread.currentThread().getName() + "獲取讀鎖成功");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (readLock.isOwnedByCurrentThread()) {
                        readLock.release();
                        System.out.println(Thread.currentThread().getName() + "釋放讀鎖成功");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "線程1").start();
        new Thread(() -> {
            try {
                writeLock.acquire();
                System.out.println(Thread.currentThread().getName() + "獲取寫鎖成功");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (writeLock.isOwnedByCurrentThread()) {
                        writeLock.release();
                        System.out.println(Thread.currentThread().getName() + "釋放寫鎖成功");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "線程2").start();
    }

輸出結(jié)果:

線程1獲取讀鎖成功
線程1釋放讀鎖成功
線程2獲取寫鎖成功
線程2釋放寫鎖成功

上面只是讀寫鎖的簡單例子,但讀寫鎖真實(shí)的使用場景會(huì)稍微復(fù)雜點(diǎn),比如下面四個(gè)問題,需要從源碼中找到答案!

  1. 不同線程獲取讀鎖是否互斥?答:不互斥

  2. 不同線程獲取寫鎖是否互斥?答:互斥

  3. 同一線程,可不可以先獲取讀鎖、再獲取寫鎖?答:不可以。如果同一線程先加讀鎖成功,后再請求加寫鎖,會(huì)阻塞加寫鎖請求

  4. 同一線程,可不可以先獲取寫鎖、再獲取讀鎖?答:可以。如果同一線程先加寫鎖成功,后再請求讀鎖,并不會(huì)阻塞讀鎖的加鎖請求

如果對(duì)上述結(jié)論有疑問的讀者,可以帶著疑問去閱讀源碼。接下來結(jié)合下面的部分源碼剖析以上四種結(jié)論在源碼內(nèi)怎么實(shí)現(xiàn)的!

可重入讀寫共享鎖底層加鎖和釋放鎖源碼

InterProcessReadWriteLock 內(nèi)創(chuàng)建了一個(gè)繼承 InterProcessMutex 的 InternalInterProcessMutex 內(nèi)部類,然后分別創(chuàng)建了readMutexwriteMutex兩個(gè)對(duì)象。各位有沒有發(fā)現(xiàn)什么?讀寫鎖加鎖邏輯還是通過 InterProcessMutex 呀!所以只要理解上面 InterProcessMute 的流程,理解讀寫鎖的核心邏輯就很簡單啦~

private final InterProcessMutex readMutex;
    private final InterProcessMutex writeMutex;

    public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) {
        lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);
        // 初始化寫鎖
        writeMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(
                client,
                basePath,
                WRITE_LOCK_NAME,// 設(shè)置節(jié)點(diǎn)名字
                lockData,
                1, // 寫鎖的 maxLeases 設(shè)置為1
                new InterProcessReadWriteLock.SortingLockInternalsDriver() {
                    @Override
                    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
                        return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
                    }
                }
        );
        // 初始化讀鎖
        readMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(
                client,
                basePath,
                READ_LOCK_NAME,// 設(shè)置節(jié)點(diǎn)名字
                lockData,
                Integer.MAX_VALUE, // 讀鎖的 maxLeases 設(shè)置為無限大
                new InterProcessReadWriteLock.SortingLockInternalsDriver() {
                    // 當(dāng)執(zhí)行g(shù)etsTheLock加鎖的時(shí)候?qū)嶋H執(zhí)行到這里
                    @Override
                    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
                        return readLockPredicate(children, sequenceNodeName);
                    }
                }
        );
    }

    private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception {
        // 如果當(dāng)前線程擁有寫鎖,則當(dāng)前線程請求讀鎖也成功。留意下面初始化 PredicateResults 對(duì)象的 getsTheLock 參數(shù)為true,就代表加讀鎖成功
        if (writeMutex.isOwnedByCurrentThread()) {
            return new PredicateResults(null, true);
        }
        // 省略部分源碼
    }

    // 注意:這里繼承 InterProcessMutex
    private static class InternalInterProcessMutex extends InterProcessMutex {
        // 省略源碼
    }

以下說明的前提是,先成功獲取鎖,但都沒有執(zhí)行到釋放鎖的期間,有其他線程并發(fā)的請求加鎖操作。如果先獲取鎖再釋放鎖,其他線程肯定可以加鎖成功呀!

  1. 不同線程獲取讀鎖不互斥。實(shí)際上是因?yàn)樽x鎖的 maxLeases 設(shè)置為 Integer.MAX_VALUE,當(dāng)請求獲取加讀鎖的時(shí)候,執(zhí)行 StandardLockInternalsDriver 類的 getsTheLock 方法,根據(jù) boolean getsTheLock = ourIndex < maxLeases 判斷是否加鎖成功,ourIndex 指的是當(dāng)前臨時(shí)順序節(jié)點(diǎn)在目錄下的索引位置,肯定是小于 maxLeases,所以返回 true,代表加讀鎖成功。

  2. 不同線程獲取寫鎖互斥。細(xì)心的讀者肯定留意到創(chuàng)建寫鎖的時(shí)候,maxLeases 設(shè)置為 1,同樣執(zhí)行到 boolean getsTheLock = ourIndex < maxLeases 這一行的時(shí)候,除非當(dāng)前臨時(shí)順序節(jié)點(diǎn)處于該目錄下的第一位,否則都會(huì)返回 false 代表加鎖失敗。這也說明了只有一個(gè)且最先獲取寫鎖的線程能成功加鎖,其他的加寫鎖都會(huì)阻塞。

  3. 同一線程,不可以先獲取讀鎖、再獲取寫鎖。在時(shí)間順序上來說,先創(chuàng)建讀鎖的臨時(shí)順序節(jié)點(diǎn),再到寫鎖的臨時(shí)順序節(jié)點(diǎn)。當(dāng)執(zhí)行加寫鎖的操作時(shí)候,執(zhí)行 boolean getsTheLock = ourIndex < maxLeases 發(fā)現(xiàn) ourIndex 為 1,而寫鎖的 maxLeases 也是 1,所以返回 false 加寫鎖失敗。

  4. 同一線程,可以先獲取寫鎖、再獲取讀鎖。在時(shí)間順序上來說,先創(chuàng)建寫鎖的臨時(shí)順序節(jié)點(diǎn),再創(chuàng)建讀鎖的臨時(shí)順序節(jié)點(diǎn)。加寫鎖成功后,再到執(zhí)行加讀鎖的操作,因?yàn)樽x鎖的 maxLeases 的值無限大嘛,所以 boolean getsTheLock = ourIndex < maxLeases 會(huì)返回 true 加讀鎖成功。

信號(hào)量鎖

/**
     * maxLease代表可成功加鎖的數(shù)量
     */
    public static void main(String[] args) {
        String lockPath = "/semaphore-sharded-lock";
        int maxLeases = 1;
        CuratorFramework client = CuratorUtils.getClient();
        // 添加連接狀態(tài)的監(jiān)聽器
        CuratorConnectionStateListener connectionStateListener = new CuratorConnectionStateListener(lockPath, "讀寫鎖");
        client.getConnectionStateListenable().addListener(connectionStateListener);

        InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, lockPath, maxLeases);

        new Thread(() -> {
            Lease lease = null;
            try {
                lease = semaphore.acquire(100, TimeUnit.MILLISECONDS);
                if (null != lease) {
                    System.out.println(Thread.currentThread().getName() + "獲取共享鎖成功");
                } else {
                    System.out.println(Thread.currentThread().getName() + "獲取共享鎖失敗");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (null != lease) {
                    semaphore.returnLease(lease);
                    System.out.println(Thread.currentThread().getName() + "釋放共享鎖成功");
                }
            }
        }, "線程1").start();

        new Thread(() -> {
            Lease lease = null;
            try {
                lease = semaphore.acquire(100, TimeUnit.MILLISECONDS);
                if (null != lease) {
                    System.out.println(Thread.currentThread().getName() + "獲取共享鎖成功");
                } else {
                    System.out.println(Thread.currentThread().getName() + "獲取共享鎖失敗");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (null != lease) {
                    semaphore.returnLease(lease);
                    System.out.println(Thread.currentThread().getName() + "釋放共享鎖成功");
                }
            }
        }, "線程2").start();
    }

輸出結(jié)果:

線程1獲取共享鎖失敗
線程2獲取共享鎖成功
線程2釋放共享鎖成功

看一下初始化 InterProcessSemaphoreV2 的源碼,我們還是可以發(fā)現(xiàn)底層依然使用的是 InterProcessMutex 類加鎖!

    // 省略部分源碼
    private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count) {
        lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
        this.maxLeases = (count != null) ? count.getCount() : maxLeases;
    }

剩下加鎖釋放鎖的源碼就不用多說了,只要理解上面 maxLeases 在 InterProcessMutex 的妙用,就可以知道在 InterProcessSemaphoreV2 內(nèi)是通過 maxLeases 控制獲取鎖的數(shù)量。不過信號(hào)量鎖在獲取鎖和釋放鎖的方式上和其它共享鎖稍微有點(diǎn)不一樣,釋放鎖的時(shí)候是調(diào)用 returnLease 方法,參數(shù)是從加鎖成功后返回的 Lease 對(duì)象,不過其釋放鎖的底層原理和上面的 release 方法大同小異,就不再贅述了。

多對(duì)象共享鎖

public static void main(String[] args) throws Exception {
        String lockPathA = "/multi-sharded-lock-A";
        String lockPathB = "/multi-sharded-lock-B";
        String lockPathC = "/multi-sharded-lock-C";
        CuratorFramework client = CuratorUtils.getClient();

        // 單對(duì)象共享鎖
        InterProcessMutex processMutex = new InterProcessMutex(client, lockPathC);
        // 多對(duì)象共享鎖
        InterProcessMultiLock multiLock = new InterProcessMultiLock(client, Arrays.asList(lockPathA, lockPathB));

        new Thread(() -> {
            try {
                multiLock.acquire();
                System.out.println(Thread.currentThread().getName() + "獲取鎖成功");
                Thread.sleep(2000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    multiLock.release();
                    System.out.println(Thread.currentThread().getName() + "釋放鎖成功");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "線程1").start();
        // 為了讓線程1優(yōu)先執(zhí)行
        Thread.sleep(1000);
        new Thread(() -> {
            try {
                if (processMutex.acquire(100, TimeUnit.MILLISECONDS)) {
                    System.out.println(Thread.currentThread().getName() + "獲取鎖成功");
                } else {
                    System.out.println(Thread.currentThread().getName() + "獲取鎖失敗");
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (processMutex.isOwnedByCurrentThread()) {
                        processMutex.release();
                        System.out.println(Thread.currentThread().getName() + "釋放鎖成功");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "線程2").start();
    }

輸出結(jié)果:

線程1獲取鎖成功
線程2獲取鎖成功
線程2釋放鎖成功
線程1釋放鎖成功

只要將 InterProcessMutex 加鎖的對(duì)象改為 lockPathA 或 lockPathB 就可以發(fā)現(xiàn),線程2加鎖失敗。這也能印證使用InterProcessMultiLock多對(duì)象共享鎖能夠限制對(duì)象只能有一個(gè)線程占有。接下來簡單看下底層的原理,其實(shí)底層依舊用到的是 InterProcessMutex 共享鎖,加鎖和釋放鎖邏輯上面都剖析過,就不再贅述。咱們只需要關(guān)注初始化 InterProcessMultiLock 的時(shí)候做了什么事情。

    // 初始化 InterProcessMultiLock 
    public InterProcessMultiLock(CuratorFramework client, List<String> paths) {
        this(makeLocks(client, paths));
    }

    // 分別對(duì)多個(gè)對(duì)象加上 InterProcessMutex 共享鎖
    private static List<InterProcessLock> makeLocks(CuratorFramework client, List<String> paths) {
        ImmutableList.Builder<InterProcessLock> builder = ImmutableList.builder();
        for (String path : paths) {
            InterProcessLock lock = new InterProcessMutex(client, path);
            builder.add(lock);
        }
        return builder.build();
    }

    // locks用于記錄加鎖的對(duì)象,之后的加鎖和釋放鎖都會(huì)根據(jù)這個(gè)對(duì)象循環(huán)加鎖、釋放鎖
    public InterProcessMultiLock(List<InterProcessLock> locks) {
        this.locks = ImmutableList.copyOf(locks);
    }

簡單來說,就是將多個(gè)對(duì)象分別用 InterProcessMutex 加鎖,鎖住資源;然后在加鎖和釋放鎖的時(shí)候,循環(huán)這些對(duì)象加鎖或釋放鎖就可以了!

總結(jié):

這篇文章重點(diǎn)介紹了 Curator 的幾種分布式鎖使用和核心源碼。不難發(fā)現(xiàn),其實(shí) Curator 只是在 ZK 的基礎(chǔ)上擴(kuò)展了更多功能,其底層其實(shí)就是通過臨時(shí)順序節(jié)點(diǎn)來控制加鎖的順序和鎖的釋放。靈活運(yùn)用臨時(shí)順序節(jié)點(diǎn)的特性,加上 Curator 內(nèi)設(shè)置 maxLeases 等熟悉,演變出可重入鎖、讀寫鎖、信號(hào)量鎖等類型的分布式鎖。另外,本文重點(diǎn)介紹了 InterProcessMutex的核心源碼,只要掌握這個(gè)類,其它的分布式鎖也能快速了解其底層原理。在實(shí)際工作中,可以根據(jù)業(yè)務(wù)自定義類來封裝 Curator 等核心類,用于擴(kuò)展適合自身業(yè)務(wù)的場景。

如果覺得文章不錯(cuò)的話,麻煩點(diǎn)個(gè)贊哈,你的鼓勵(lì)就是我的動(dòng)力!對(duì)于文章有哪里不清楚或者有誤的地方,歡迎在評(píng)論區(qū)留言~

參考資料:
Curator 官網(wǎng):https://curator.apache.org/curator-recipes/index.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容