Zookeeper 實(shí)現(xiàn)分布式節(jié)點(diǎn)下的配置文件統(tǒng)一管理和分布式鎖

一、ZooKeeper 簡(jiǎn)介

ZooKeeper 是一個(gè)集中式服務(wù),用于維護(hù)配置信息,命名,提供分布式同步和提供組服務(wù)。
ZooKeeper 的主要應(yīng)用:1、節(jié)點(diǎn)選舉;2、配置文件統(tǒng)一管理;3、分布式鎖;4、發(fā)布與訂閱(Dubbo);5、集群管理,集群中保證數(shù)據(jù)的強(qiáng)一致性,下面我們主要講配置文件統(tǒng)一管理和分布式鎖。

Zookeeper文件系統(tǒng)

Zookeeper的每個(gè)子目錄項(xiàng)如 NameService 都被稱作為 znode,和文件系統(tǒng)一樣,我們能夠自由的增加、刪除 znode,在一個(gè) znode 下增加、刪除子 znode,唯一的不同在于znode是可以存儲(chǔ)數(shù)據(jù)的。

有四種類型的 znode:

1、PERSISTENT-持久化目錄節(jié)點(diǎn)
客戶端與 zookeeper 斷開連接后,該節(jié)點(diǎn)依舊存在。
2、PERSISTENT_SEQUENTIAL-持久化順序編號(hào)目錄節(jié)點(diǎn)
客戶端與 zookeeper 斷開連接后,該節(jié)點(diǎn)依舊存在,只是 zookeeper 給該節(jié)點(diǎn)名稱進(jìn)行順序編號(hào)。
3、EPHEMERAL-臨時(shí)目錄節(jié)點(diǎn)
客戶端與 zookeeper 斷開連接后,該節(jié)點(diǎn)被刪除。
4、EPHEMERAL_SEQUENTIAL-臨時(shí)順序編號(hào)目錄節(jié)點(diǎn)
客戶端與 zookeeper 斷開連接后,該節(jié)點(diǎn)被刪除,只是 zookeeper 給該節(jié)點(diǎn)名稱進(jìn)行順序編號(hào)。

二、配置文件統(tǒng)一管理

1、實(shí)現(xiàn)思路

假如我們需要修改三(或者更多)臺(tái)服務(wù)器上 redis.conf 的配置信息,如果一臺(tái)一臺(tái)的去修改,則會(huì)加大出錯(cuò)概率,而且也不實(shí)際。這時(shí)候,我們需要引入Zookeeper(下面簡(jiǎn)稱zk),我們需要知道,zk 中有個(gè) watcher 事件,包括 :
EventType:NodeCreated //節(jié)點(diǎn)創(chuàng)建
EventType:NodeDataChanged //節(jié)點(diǎn)的數(shù)據(jù)變更
EventType:NodeChildrentChanged //子節(jié)點(diǎn)下的數(shù)據(jù)變更
EventType:NodeDeleted // 節(jié)點(diǎn)刪除
當(dāng)我們監(jiān)聽了上面的事件時(shí),事件觸發(fā)就會(huì)被告知。以統(tǒng)一更新 redis.conf 配置文件為例,我們可以實(shí)現(xiàn)監(jiān)聽某一個(gè)節(jié)點(diǎn)的數(shù)據(jù)更新事件,當(dāng)DBA更改了該節(jié)點(diǎn)的值(一般為 json 串,方便程序解析,例:{"type":"update","url":"ftp:192.168.2.10/config/redis.xml"}),此時(shí)我們可以根據(jù) type 的值“update”可知,是需要更新 redis.conf 配置文件,然后根據(jù) url 的值,獲取最新的 redis.conf 文件所在的服務(wù)器地址。此時(shí),我們可以下載最新配置文件,然后刪除原來的 redis.conf 配置文件,最后將最新的配置文件添加到項(xiàng)目中,從而通過重啟程序就可以讀取到最新的配置了。

2、代碼實(shí)現(xiàn)

這里我們模擬了三個(gè)客戶端 Client1、Client2、Client3,代碼都是一樣的。當(dāng) zk 節(jié)點(diǎn)數(shù)據(jù)發(fā)送變化,就會(huì)觸發(fā)數(shù)據(jù)更新的事件,從而告知其客戶端(必須監(jiān)聽了該事件)。

package com.imooc.curator.operator;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.retry.RetryNTimes;
import org.springframework.util.StringUtils;

import java.util.concurrent.CountDownLatch;

/**
 *  使用 zk 的 watch 事件,實(shí)現(xiàn)配置文件的統(tǒng)一配置
 * @author K. L. Mao
 * @create 2018/9/8
 */
public class Client1 {
    public CuratorFramework client;
    public static final String zkServerPath = "192.168.174.10:2181,192.168.174.11:2181,192.168.174.12:2181";

    public static final String CONFIG_NODE_PATH = "/super/imooc";
    public static final String SUB_PATH = "/redis-config";
    public static CountDownLatch countDownLatch = new CountDownLatch(1);

    /**
     * 實(shí)例化 zk 客戶端
     */
    public Client1(){
        /**
         * curator 連接 zk 的策略:RetryNTimes
         * n:重試次數(shù)
         * sleepMsBetweenRetries:每次重試間隔的時(shí)間
         */
        RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
        client = CuratorFrameworkFactory.builder()
                .connectString(zkServerPath)
                .sessionTimeoutMs(10000).retryPolicy(retryPolicy)
                .namespace("workspace").build();
        // 啟動(dòng)客戶端連接
        client.start();
    }

    /**
     * 關(guān)閉 zk 客戶端
     */
    public void closeZKClient() {
        if (client != null){
            client.close();
        }
    }

    public static void main(String[] args) throws Exception {
        // 實(shí)例化
        Client1 operator = new Client1();
        System.out.println("client1 啟動(dòng)成功");
        // 創(chuàng)建節(jié)點(diǎn)
//        byte[] data = "super".getBytes();
//        operator.client.create().creatingParentsIfNeeded()
//                .withMode(CreateMode.PERSISTENT)    // 持久化節(jié)點(diǎn)
//                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)   // 全權(quán)限ACL
//                .forPath(nodePath, data);

        // watcher 事件 當(dāng)使用usingWatcher的時(shí)候,監(jiān)聽只會(huì)觸發(fā)一次,監(jiān)聽完畢后就銷毀
//        operator.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
        // NodeCache:監(jiān)聽 CONFIG_NODE_PATH 下的子包數(shù)據(jù)節(jié)點(diǎn)的變更,會(huì)觸發(fā)事件
        final PathChildrenCache nodeCache = new PathChildrenCache(operator.client, CONFIG_NODE_PATH, true);
        // 初始化的時(shí)候獲取 node 的值并且緩存
        nodeCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        // 新增監(jiān)聽器
        nodeCache.getListenable().addListener((client, event) -> {
            // 監(jiān)聽節(jié)點(diǎn)更新
            if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
                String configNodePath = event.getData().getPath();
                if (configNodePath.equals(CONFIG_NODE_PATH + SUB_PATH)){
                    System.out.println("監(jiān)聽到配置發(fā)生變化,節(jié)點(diǎn)路徑為:" + configNodePath);
                    // 讀取節(jié)點(diǎn)數(shù)據(jù)
                    String data = new String(event.getData().getData(), "UTF-8");
                    System.out.println("節(jié)點(diǎn)" + CONFIG_NODE_PATH + "的數(shù)據(jù)為" + data);
                    if (!StringUtils.isEmpty(data)){
                        JSONObject jsonObject = JSON.parseObject(data);
                        String type = jsonObject.getString("type");
                        String url = jsonObject.getString("url");
                        if ("add".equals(type)){
                            System.out.println("監(jiān)聽到新增的配置,準(zhǔn)備下載...");
                            // ... 連接ftp服務(wù)器,根據(jù)url找到對(duì)應(yīng)的配置
                            Thread.sleep(500);
                            System.out.println("開始下載新的配置文件,下載路徑為<" + url + ">");
                            // ... 下載配置到你指定的目錄
                            Thread.sleep(1000);
                            System.out.println("下載成功,已經(jīng)添加到項(xiàng)目中");
                        }else if ("update".equals(type)){
                            System.out.println("監(jiān)聽到更新的配置,準(zhǔn)備下載...");
                            // ... 連接ftp服務(wù)器,根據(jù)url找到對(duì)應(yīng)的配置
                            Thread.sleep(500);
                            System.out.println("開始下載新的配置文件,下載路徑為<" + url + ">");
                            // ... 下載配置到你指定的目錄
                            Thread.sleep(1000);
                            System.out.println("下載成功");
                            System.out.println("刪除項(xiàng)目中原配置文件...");
                            Thread.sleep(100);
                            // ... 刪除原文件
                            System.out.println("拷貝配置文件到項(xiàng)目目錄...");
                            // ... 拷貝文件到項(xiàng)目
                        }else if ("delete".equals(type)){
                            System.out.println("監(jiān)聽到刪除的配置");
                            System.out.println("刪除項(xiàng)目中原配置文件...");
                        }
                    }
                }
            }
        });
        countDownLatch.await();
        operator.closeZKClient();
    }
}

三、分布式鎖

1、場(chǎng)景

高并場(chǎng)景下,對(duì)共享資源的訪問,都需要加鎖,分布式的環(huán)境下,就需要加分布式鎖。如下代碼,模擬一個(gè)分布式的并發(fā)操作。

  • Service
package com.imooc.curator.service;

import org.springframework.stereotype.Service;

/**
 * @author K. L. Mao
 * @create 2018/9/9
 */
@Service
public class PayService {

    private static int COUNT = 100;

    /**
     * 高并發(fā)下的 count-1
     * @return
     */
    public int countLock(){
        if (COUNT <= 99){
            return -1;
        }
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        COUNT = COUNT - 1;
        return COUNT;
    }
}
  • Controller
package com.imooc.curator.web;

import com.imooc.curator.service.PayService;
import com.imooc.curator.utils.ZKCurator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author K. L. Mao
 * @create 2018/9/9
 */
@RestController
public class PayController {

    @Autowired
    private PayService payService;

    /**
     * 模擬客戶端1
     * @return
     */
    @GetMapping("/lock")
    public int lock(){
       return payService.countLock();
    }

    /**
     * 模擬客戶端2
     * @return
     */
    @GetMapping("/lock2")
    public int lock2(){
        return payService.countLock();
    }

}

當(dāng)我們?cè)谌雰?nèi)分別訪問 localhost:8080/lock 和 localhost:8080/lock2 時(shí),就會(huì)產(chǎn)生2個(gè)線程對(duì) COUNT 進(jìn)行了操作,使其 變?yōu)?8了。這樣是不允許的,那么如何保證這兩個(gè)線程是依次執(zhí)行的呢?這時(shí)候,我們需要 zk 來實(shí)現(xiàn)分布式鎖了。

2、實(shí)現(xiàn)思路

首先,Zookeeper 的每一個(gè)節(jié)點(diǎn),都是一個(gè)天然的順序發(fā)號(hào)器。

在每一個(gè)節(jié)點(diǎn)下面創(chuàng)建子節(jié)點(diǎn)時(shí),只要選擇的創(chuàng)建類型是有序(EPHEMERAL_SEQUENTIAL 臨時(shí)有序或者PERSISTENT_SEQUENTIAL 永久有序)類型,那么,新的子節(jié)點(diǎn)后面,會(huì)加上一個(gè)次序編號(hào)。這個(gè)次序編號(hào),是上一個(gè)生成的次序編號(hào)加一。

比如,創(chuàng)建一個(gè)用于發(fā)號(hào)的節(jié)點(diǎn)“/test/lock”,然后以他為父親節(jié)點(diǎn),可以在這個(gè)父節(jié)點(diǎn)下面創(chuàng)建相同前綴的子節(jié)點(diǎn),假定相同的前綴為“/test/lock/seq-”,在創(chuàng)建子節(jié)點(diǎn)時(shí),同時(shí)指明是有序類型。如果是第一個(gè)創(chuàng)建的子節(jié)點(diǎn),那么生成的子節(jié)點(diǎn)為“/test/lock/seq-0000000000”,下一個(gè)節(jié)點(diǎn)則為“/test/lock/seq-0000000001”,依次類推,等等。



其次,Zookeeper節(jié)點(diǎn)的遞增性,可以規(guī)定節(jié)點(diǎn)編號(hào)最小的那個(gè)獲得鎖。

一個(gè)zookeeper分布式鎖,首先需要?jiǎng)?chuàng)建一個(gè)父節(jié)點(diǎn),盡量是持久節(jié)點(diǎn)(PERSISTENT類型),然后每個(gè)要獲得鎖的線程都會(huì)在這個(gè)節(jié)點(diǎn)下創(chuàng)建個(gè)臨時(shí)順序節(jié)點(diǎn),由于序號(hào)的遞增性,可以規(guī)定排號(hào)最小的那個(gè)獲得鎖。所以,每個(gè)線程在嘗試占用鎖之前,首先判斷自己是排號(hào)是不是當(dāng)前最小,如果是,則獲取鎖。

第三,Zookeeper的節(jié)點(diǎn)監(jiān)聽機(jī)制,可以保障占有鎖的方式有序而且高效。

每個(gè)線程搶占鎖之前,先搶號(hào)創(chuàng)建自己的ZNode。同樣,釋放鎖的時(shí)候,就需要?jiǎng)h除搶號(hào)的Znode。搶號(hào)成功后,如果不是排號(hào)最小的節(jié)點(diǎn),就處于等待通知的狀態(tài)。等誰(shuí)的通知呢?不需要其他人,只需要等前一個(gè)Znode 的通知就可以了。當(dāng)前一個(gè)Znode 刪除的時(shí)候,就是輪到了自己占有鎖的時(shí)候。第一個(gè)通知第二個(gè)、第二個(gè)通知第三個(gè),擊鼓傳花似的依次向后。

Zookeeper的節(jié)點(diǎn)監(jiān)聽機(jī)制,可以說能夠非常完美的,實(shí)現(xiàn)這種擊鼓傳花似的信息傳遞。具體的方法是,每一個(gè)等通知的Znode節(jié)點(diǎn),只需要監(jiān)聽或者 watch 監(jiān)視排號(hào)在自己前面那個(gè),而且緊挨在自己前面的那個(gè)節(jié)點(diǎn)。 只要上一個(gè)節(jié)點(diǎn)被刪除了,就進(jìn)行再一次判斷,看看自己是不是序號(hào)最小的那個(gè)節(jié)點(diǎn),如果是,則獲得鎖。

3、代碼實(shí)現(xiàn)

首先定義了一個(gè)鎖的接口,很簡(jiǎn)單,一個(gè)加鎖方法,一個(gè)解鎖方法。

public interface Lock {

    boolean lock() throws Exception;

    boolean unlock();
}
  • lock 加鎖方法
    @Override
    public boolean lock() {

       try {
            boolean locked = false;

            // 嘗試加鎖
            locked = tryLock();

            if (locked) {
                return true;
            }
            while (!locked) {

                // 加鎖失敗,則等待
                await();


                if (checkLocked()) {
                    locked=true;
                }
            }
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            unlock();
        }

        return false;
    }

lock方法的具體邏輯是,首先嘗試著去加鎖,如果加鎖失敗就去等待,然后再重復(fù)。嘗試加鎖的tryLock方法是關(guān)鍵。做了兩件重要的事情:

(1)創(chuàng)建臨時(shí)順序節(jié)點(diǎn),并且保存自己的節(jié)點(diǎn)路徑。

(2)判斷是否是第一個(gè),如果是第一個(gè),則加鎖成功。如果不是,就找到前一個(gè)Znode節(jié)點(diǎn),并且保存其路徑到 prior_path。

  • tryLock 方法:
private boolean tryLock() throws Exception {
        //創(chuàng)建臨時(shí)Znode
        List<String> waiters = getWaiters();
        locked_path = ZKclient.instance
                .createEphemeralSeqNode(LOCK_PREFIX);
        if (null == locked_path) {
            throw new Exception("zk error");
        }
        locked_short_path = getShorPath(locked_path);

        //獲取等待的子節(jié)點(diǎn)列表,判斷自己是否第一個(gè)
        if (checkLocked()) {
            return true;
        }

        // 判斷自己排第幾個(gè)
        int index = Collections.binarySearch(waiters, locked_short_path);
        if (index < 0) { // 網(wǎng)絡(luò)抖動(dòng),獲取到的子節(jié)點(diǎn)列表里可能已經(jīng)沒有自己了
            throw new Exception("節(jié)點(diǎn)沒有找到: " + locked_short_path);
        }

        //如果自己沒有獲得鎖,則要監(jiān)聽前一個(gè)節(jié)點(diǎn)
        prior_path = ZK_PATH + "/" + waiters.get(index - 1);

        return false;
    }

創(chuàng)建臨時(shí)順序節(jié)點(diǎn)后,其完整路徑存放在locked_path成員變量中。另外還截取了一個(gè)后綴路徑,放在 locked_short_path成員變量中。 這個(gè)后綴路徑,是一個(gè)短路徑,只有完整路徑的最后一層。在和取到的遠(yuǎn)程子節(jié)點(diǎn)列表中的其他路徑進(jìn)行比較時(shí),需要用到短路徑。因?yàn)樽庸?jié)點(diǎn)列表的路徑,都是短路徑,只有最后一層。

然后,調(diào)用checkLocked方法,判斷是否是鎖定成功。如果是則返回。如果自己沒有獲得鎖,則要監(jiān)聽前一個(gè)節(jié)點(diǎn)。找出前一個(gè)節(jié)點(diǎn)的路徑,保存在prior_path成員中,供后面的await等待方法,去監(jiān)聽使用。

在進(jìn)入await等待方法的介紹前,先說下checkLocked鎖定判斷方法。

checkLocked方法中,判斷是否可以持有鎖。判斷規(guī)則很簡(jiǎn)單:當(dāng)前創(chuàng)建的節(jié)點(diǎn),是否在上一步獲取到的子節(jié)點(diǎn)列表的第一個(gè)位置:

如果是,說明可以持有鎖,返回true,表示加鎖成功;

如果不是,說明有其他線程早已先持有了鎖,返回false。

  • checkLocked 方法
private boolean checkLocked() {
        //獲取等待的子節(jié)點(diǎn)列表

        List<String> waiters = getWaiters();
        //節(jié)點(diǎn)按照編號(hào),升序排列
        Collections.sort(waiters);

        // 如果是第一個(gè),代表自己已經(jīng)獲得了鎖
        if (locked_short_path.equals(waiters.get(0))) {
            log.info("成功的獲取分布式鎖,節(jié)點(diǎn)為{}", locked_short_path);
            return true;
        }
        return false;
    }

等待方法await,表示在爭(zhēng)奪鎖失敗以后的等待邏輯。那么此處該線程應(yīng)該做什么呢?

  • await 方法
private void await() throws Exception {

        if (null == prior_path) {
            throw new Exception("prior_path error");
        }

        final CountDownLatch latch = new CountDownLatch(1);


        //訂閱比自己次小順序節(jié)點(diǎn)的刪除事件
        Watcher w = new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                System.out.println("監(jiān)聽到的變化 watchedEvent = " + watchedEvent);
                log.info("[WatchedEvent]節(jié)點(diǎn)刪除");

                latch.countDown();
            }
        };

        client.getData().usingWatcher(w).forPath(prior_path);
      
        latch.await(WAIT_TIME, TimeUnit.SECONDS);
    }

首先添加一個(gè)watcher監(jiān)聽,而監(jiān)聽的地址正是上面一步返回的prior_path成員。這里,僅僅會(huì)監(jiān)聽自己前一個(gè)節(jié)點(diǎn)的變動(dòng),而不是父節(jié)點(diǎn)下所有節(jié)點(diǎn)的變動(dòng)。然后,調(diào)用latch.await,進(jìn)入等待狀態(tài),等到latch.countDown()被喚醒。

一旦prior_path節(jié)點(diǎn)發(fā)生了變動(dòng),那么就將線程從等待狀態(tài)喚醒,重新一輪的鎖的爭(zhēng)奪。

至此,關(guān)于加鎖的算法基本完成。但是,上面還沒有實(shí)現(xiàn)鎖的可重入。

  • 可重入加鎖方式
#修改前面的lock方法,在前面加上可重入的判斷邏輯。代碼如下:

  public boolean lock() {
     synchronized (this) {
        if (lockCount.get() == 0) {
            thread = Thread.currentThread();
            lockCount.incrementAndGet();
        } else {
            if (!thread.equals(Thread.currentThread())) {
                return false;
            }
            lockCount.incrementAndGet();
            return true;
        }
    }
    
   //...
   }

為了變成可重入,在代碼中增加了一個(gè)加鎖的計(jì)數(shù)器lockCount,計(jì)算重復(fù)加鎖的次數(shù)。如果是同一個(gè)線程加鎖,只需要增加次數(shù),直接返回,表示加鎖成功。

  • unlock 方法

釋放鎖主要有兩個(gè)工作:

(1)減少重入鎖的計(jì)數(shù),如果不是0,直接返回,表示成功的釋放了一次;

(2)如果計(jì)數(shù)器為0,移除Watchers監(jiān)聽器,并且刪除創(chuàng)建的 Znode 臨時(shí)節(jié)點(diǎn);

代碼如下:

    @Override
    public boolean unlock() {

        if (!thread.equals(Thread.currentThread())) {
            return false;
        }

        int newLockCount = lockCount.decrementAndGet();

        if (newLockCount < 0) {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + locked_path);
        }

        if (newLockCount != 0) {
            return true;
        }
        try {
            if (ZKclient.instance.isNodeExist(locked_path)) {
                client.delete().forPath(locked_path);
            }
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

        return true;
    }

這里,為了盡量保證線程安全,可重入計(jì)數(shù)器的類型,不是 int 類型,而是Java并發(fā)包中的原子類型——AtomicInteger。

4、分布式鎖的應(yīng)用場(chǎng)景

前面的實(shí)現(xiàn),主要的價(jià)值是展示一下分布式鎖的基礎(chǔ)開發(fā)和原理。實(shí)際的開發(fā)中,如果需要使用到分布式鎖,并不需要自己造輪子,可以直接使用curator客戶端中的各種官方實(shí)現(xiàn)的分布式鎖,比如其中的InterProcessMutex可重入鎖。

  • InterProcessMutex 可重入鎖的使用實(shí)例如下:
@Test
public void testzkMutex() throws InterruptedException {

    CuratorFramework client=ZKclient.instance.getClient();
    final InterProcessMutex zkMutex =
            new InterProcessMutex(client,"/mutex");  ;
    for (int i = 0; i < 10; i++) {
        FutureTaskScheduler.add(() -> {

            try {
                zkMutex.acquire();

                for (int j = 0; j < 10; j++) {

                    count++;
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("count = " + count);
                zkMutex.release();

            } catch (Exception e) {
                e.printStackTrace();
            }

        });
    }

    Thread.sleep(Integer.MAX_VALUE);
}

最后,總結(jié)一下 Zookeeper 分布式鎖。

利用在同級(jí)目錄下,不能創(chuàng)建相同的節(jié)點(diǎn)特性,可以利用多線程去創(chuàng)建一個(gè)節(jié)點(diǎn),但是只能有一個(gè)線程可以創(chuàng)建成功,所以該線程得到鎖。釋放鎖:釋放鎖時(shí),通過刪除該節(jié)點(diǎn),來觸發(fā)剛才沒有獲取到的線程的監(jiān)聽,讓他們?cè)俅蝸砀?jìng)爭(zhēng)獲取。這種方案可以達(dá)到效果,但是會(huì)有一個(gè)問題產(chǎn)生,就是如果在并發(fā)比較大的情況下,一個(gè)臨時(shí)節(jié)點(diǎn)的消失,會(huì)造成很多線程同時(shí)會(huì)試圖創(chuàng)建臨時(shí)節(jié)點(diǎn),這種方式會(huì)影響 zk 的穩(wěn)定性,這個(gè)效應(yīng)稱為羊群效應(yīng)。

Zookeeper分布式鎖,能有效的解決分布式問題,不可重入問題,實(shí)現(xiàn)起來較為簡(jiǎn)單。

但是,Zookeeper 實(shí)現(xiàn)的分布式鎖其實(shí)存在一個(gè)缺點(diǎn),那就是性能并不太高。因?yàn)槊看卧趧?chuàng)建鎖和釋放鎖的過程中,都要?jiǎng)討B(tài)創(chuàng)建、銷毀臨時(shí)節(jié)點(diǎn)來實(shí)現(xiàn)鎖功能。ZK中創(chuàng)建和刪除節(jié)點(diǎn)只能通過 Leader 服務(wù)器來執(zhí)行,然后 Leader 服務(wù)器還需要將數(shù)據(jù)同步到所有的 Follower 機(jī)器上。所以,在高性能,高并發(fā)的場(chǎng)景下,不建議使用Zk的分布式鎖,建議使用 Redis。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容