Zookeeper客戶端Curator使用詳解(一)

轉(zhuǎn):http://throwable.coding.me/2018/12/16/zookeeper-curator-usage

前提

因為最近項目需要使用Zookeeper這個中間件,提前了解一下它的客戶端Curator的使用。

簡介

Curator是Netflix公司開源的一套zookeeper客戶端框架,解決了很多Zookeeper客戶端非常底層的細節(jié)開發(fā)工作,包括連接重連、反復(fù)注冊Watcher和NodeExistsException異常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”給Curator予高度評價。

引子和趣聞:
Zookeeper名字的由來是比較有趣的,下面的片段摘抄自《從PAXOS到ZOOKEEPER分布式一致性原理與實踐》一書:
Zookeeper最早起源于雅虎的研究院的一個研究小組。在當(dāng)時,研究人員發(fā)現(xiàn),在雅虎內(nèi)部很多大型的系統(tǒng)需要依賴一個類似的系統(tǒng)進行分布式協(xié)調(diào),但是這些系統(tǒng)往往存在分布式單點問題。所以雅虎的開發(fā)人員就試圖開發(fā)一個通用的無單點問題的分布式協(xié)調(diào)框架。在立項初期,考慮到很多項目都是用動物的名字來命名的(例如著名的Pig項目),雅虎的工程師希望給這個項目也取一個動物的名字。時任研究院的首席科學(xué)家Raghu Ramakrishnan開玩笑說:再這樣下去,我們這兒就變成動物園了。此話一出,大家紛紛表示就叫動物園管理員吧——因為各個以動物命名的分布式組件放在一起,雅虎的整個分布式系統(tǒng)看上去就像一個大型的動物園了,而Zookeeper正好用來進行分布式環(huán)境的協(xié)調(diào)——于是,Zookeeper的名字由此誕生了。

Curator無疑是Zookeeper客戶端中的瑞士軍刀,它譯作”館長”或者’’管理者’’,不知道是不是開發(fā)小組有意而為之,筆者猜測有可能這樣命名的原因是說明Curator就是Zookeeper的館長(腦洞有點大:Curator就是動物園的園長)。
Curator包含了幾個包:

  • curator-framework:對zookeeper的底層api的一些封裝。
  • curator-client:提供一些客戶端的操作,例如重試策略等。
  • curator-recipes:封裝了一些高級特性,如:Cache事件監(jiān)聽、選舉、分布式鎖、分布式計數(shù)器、分布式Barrier等。

Maven依賴(使用curator的版本:2.12.0,對應(yīng)Zookeeper的版本為:3.4.x,如果跨版本會有兼容性問題,很有可能導(dǎo)致節(jié)點操作失敗):

<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-framework</artifactId>
 <version>2.12.0</version>
</dependency>
<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-recipes</artifactId>
 <version>2.12.0</version>
</dependency>

Curator的基本Api

創(chuàng)建會話

1.使用靜態(tài)工程方法創(chuàng)建客戶端

一個例子如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
 connectionInfo,
 5000,
 3000,
 retryPolicy);

newClient靜態(tài)工廠方法包含四個主要參數(shù):

參數(shù)名 說明
connectionString 服務(wù)器列表,格式host1:port1,host2:port2,…
retryPolicy 重試策略,內(nèi)建有四種重試策略,也可以自行實現(xiàn)RetryPolicy接口
sessionTimeoutMs 會話超時時間,單位毫秒,默認(rèn)60000ms
connectionTimeoutMs 連接創(chuàng)建超時時間,單位毫秒,默認(rèn)60000ms

2.使用Fluent風(fēng)格的Api創(chuàng)建會話

核心參數(shù)變?yōu)榱魇皆O(shè)置,一個列子如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
 .connectString(connectionInfo)
 .sessionTimeoutMs(5000)
 .connectionTimeoutMs(5000)
 .retryPolicy(retryPolicy)
 .build();

3.創(chuàng)建包含隔離命名空間的會話

為了實現(xiàn)不同的Zookeeper業(yè)務(wù)之間的隔離,需要為每個業(yè)務(wù)分配一個獨立的命名空間(NameSpace),即指定一個Zookeeper的根路徑(官方術(shù)語:為Zookeeper添加“Chroot”特性)。例如(下面的例子)當(dāng)客戶端指定了獨立命名空間為“/base”,那么該客戶端對Zookeeper上的數(shù)據(jù)節(jié)點的操作都是基于該目錄進行的。通過設(shè)置Chroot可以將客戶端應(yīng)用與Zookeeper服務(wù)端的一課子樹相對應(yīng),在多個應(yīng)用共用一個Zookeeper集群的場景下,這對于實現(xiàn)不同應(yīng)用之間的相互隔離十分有意義。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
 CuratorFramework client =
 CuratorFrameworkFactory.builder()
 .connectString(connectionInfo)
 .sessionTimeoutMs(5000)
 .connectionTimeoutMs(5000)
 .retryPolicy(retryPolicy)
 .namespace("base")
 .build();

啟動客戶端

當(dāng)創(chuàng)建會話成功,得到client的實例然后可以直接調(diào)用其start( )方法:

client.start();

數(shù)據(jù)節(jié)點操作

創(chuàng)建數(shù)據(jù)節(jié)點

Zookeeper的節(jié)點創(chuàng)建模式

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且?guī)蛄刑?/li>
  • EPHEMERAL:臨時
  • EPHEMERAL_SEQUENTIAL:臨時并且?guī)蛄刑?/li>

創(chuàng)建一個節(jié)點,初始內(nèi)容為空

client.create().forPath("path");

注意:如果沒有設(shè)置節(jié)點屬性,節(jié)點創(chuàng)建模式默認(rèn)為持久化節(jié)點,內(nèi)容默認(rèn)為空

創(chuàng)建一個節(jié)點,附帶初始化內(nèi)容

client.create().forPath("path","init".getBytes());

創(chuàng)建一個節(jié)點,指定創(chuàng)建模式(臨時節(jié)點),內(nèi)容為空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

創(chuàng)建一個節(jié)點,指定創(chuàng)建模式(臨時節(jié)點),附帶初始化內(nèi)容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes())

創(chuàng)建一個節(jié)點,指定創(chuàng)建模式(臨時節(jié)點),附帶初始化內(nèi)容,并且自動遞歸創(chuàng)建父節(jié)點

client.create()
 .creatingParentContainersIfNeeded()
 .withMode(CreateMode.EPHEMERAL)
 .forPath("path","init".getBytes());

這個creatingParentContainersIfNeeded()接口非常有用,因為一般情況開發(fā)人員在創(chuàng)建一個子節(jié)點必須判斷它的父節(jié)點是否存在,如果不存在直接創(chuàng)建會拋出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator能夠自動遞歸創(chuàng)建所有所需的父節(jié)點。

刪除數(shù)據(jù)節(jié)點

刪除一個節(jié)點

client.delete().forPath("path");

注意,此方法只能刪除葉子節(jié)點,否則會拋出異常。

刪除一個節(jié)點,并且遞歸刪除其所有的子節(jié)點

client.delete().deletingChildrenIfNeeded().forPath("path");

刪除一個節(jié)點,強制指定版本進行刪除

client.delete().withVersion(10086).forPath("path");

刪除一個節(jié)點,強制保證刪除

client.delete().guaranteed().forPath("path");

guaranteed()接口是一個保障措施,只要客戶端會話有效,那么Curator會在后臺持續(xù)進行刪除操作,直到刪除節(jié)點成功。

注意:上面的多個流式接口是可以自由組合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

讀取數(shù)據(jù)節(jié)點數(shù)據(jù)

讀取一個節(jié)點的數(shù)據(jù)內(nèi)容

client.getData().forPath("path");

注意,此方法返的返回值是byte[ ];

讀取一個節(jié)點的數(shù)據(jù)內(nèi)容,同時獲取到該節(jié)點的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

更新數(shù)據(jù)節(jié)點數(shù)據(jù)

更新一個節(jié)點的數(shù)據(jù)內(nèi)容

client.setData().forPath("path","data".getBytes());

注意:該接口會返回一個Stat實例

更新一個節(jié)點的數(shù)據(jù)內(nèi)容,強制指定版本進行更新

client.setData().withVersion(10086).forPath("path","data".getBytes());

檢查節(jié)點是否存在

client.checkExists().forPath("path");

注意:該方法返回一個Stat實例,用于檢查ZNode是否存在的操作. 可以調(diào)用額外的方法(監(jiān)控或者后臺處理)并在最后調(diào)用forPath()指定要操作的ZNode

獲取某個節(jié)點的所有子節(jié)點路徑

client.getChildren().forPath("path");

注意:該方法的返回值為List<string style="box-sizing: border-box;">,獲得ZNode的子節(jié)點Path列表。 可以調(diào)用額外的方法(監(jiān)控、后臺處理或者獲取狀態(tài)watch, background or get stat) 并在最后調(diào)用forPath()指定要操作的父ZNode</string>

事務(wù)

CuratorFramework的實例包含inTransaction( )接口方法,調(diào)用此方法開啟一個ZooKeeper事務(wù). 可以復(fù)合create, setData, check, and/or delete 等操作然后調(diào)用commit()作為一個原子操作提交。一個例子如下:

client.inTransaction().check().forPath("path")
 .and()
 .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
 .and()
 .setData().withVersion(10086).forPath("path","data2".getBytes())
 .and()
 .commit();

異步接口

上面提到的創(chuàng)建、刪除、更新、讀取等方法都是同步的,Curator提供異步接口,引入了BackgroundCallback接口用于處理異步接口調(diào)用之后服務(wù)端返回的結(jié)果信息。BackgroundCallback接口中一個重要的回調(diào)值為CuratorEvent,里面包含事件類型、響應(yīng)嗎和節(jié)點的詳細信息。

CuratorEventType

事件類型 對應(yīng)CuratorFramework實例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

響應(yīng)碼(#getResultCode())

響應(yīng)碼 意義
0 OK,即調(diào)用成功
-4 ConnectionLoss,即客戶端與服務(wù)端斷開連接
-110 NodeExists,即節(jié)點已經(jīng)存在
-112 SessionExpired,即會話過期

一個異步創(chuàng)建節(jié)點的例子如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
 .creatingParentsIfNeeded()
 .withMode(CreateMode.EPHEMERAL)
 .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
 },executor)
 .forPath("path");

注意:如果#inBackground()方法不指定executor,那么會默認(rèn)使用Curator的EventThread去進行異步處理。

Curator食譜(高級特性)

提醒:首先你必須添加curator-recipes依賴,下文僅僅對recipes一些特性的使用進行解釋和舉例,不打算進行源碼級別的探討

<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-recipes</artifactId>
 <version>2.12.0</version>
</dependency>

重要提醒:強烈推薦使用ConnectionStateListener監(jiān)控連接的狀態(tài),當(dāng)連接狀態(tài)為LOST,curator-recipes下的所有Api將會失效或者過期,盡管后面所有的例子都沒有使用到ConnectionStateListener。

Path Cache

Path Cache用來監(jiān)控一個ZNode的子節(jié)點. 當(dāng)一個子節(jié)點增加, 更新,刪除時, Path Cache會改變它的狀態(tài), 會包含最新的子節(jié)點, 子節(jié)點的數(shù)據(jù)和狀態(tài),而狀態(tài)的更變將通過PathChildrenCacheListener通知。

實際使用時會涉及到四個類:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

通過下面的構(gòu)造函數(shù)創(chuàng)建Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想使用cache,必須調(diào)用它的start方法,使用完后調(diào)用close方法。 可以設(shè)置StartMode來實現(xiàn)啟動的模式,

StartMode有下面幾種:

  1. NORMAL:正常初始化。
  2. BUILD_INITIAL_CACHE:在調(diào)用start()之前會調(diào)用rebuild()
  3. POST_INITIALIZED_EVENT: 當(dāng)Cache初始化數(shù)據(jù)后發(fā)送一個PathChildrenCacheEvent.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)可以增加listener監(jiān)聽緩存的變化。

getCurrentData()方法返回一個List<ChildData>對象,可以遍歷所有的子節(jié)點。

設(shè)置/更新、移除其實是使用client (CuratorFramework)來操作, 不通過PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件類型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("節(jié)點數(shù)據(jù):" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:如果new PathChildrenCache(client, PATH, true)中的參數(shù)cacheData值設(shè)置為false,則示例中的event.getData().getData()、data.getData()將返回null,cache將不會緩存節(jié)點數(shù)據(jù)。

注意:示例中的Thread.sleep(10)可以注釋掉,但是注釋后事件監(jiān)聽的觸發(fā)次數(shù)會不全,這可能與PathCache的實現(xiàn)原理有關(guān),不能太過頻繁的觸發(fā)事件!

Node Cache

Node Cache與Path Cache類似,Node Cache只是監(jiān)聽某一個特定的節(jié)點。它涉及到下面的三個類:

  • NodeCache - Node Cache實現(xiàn)類
  • NodeCacheListener - 節(jié)點監(jiān)聽器
  • ChildData - 節(jié)點數(shù)據(jù)

注意:使用cache,依然要調(diào)用它的start()方法,使用完后調(diào)用close()方法。

getCurrentData()將得到節(jié)點當(dāng)前的狀態(tài),通過它的狀態(tài)可以得到當(dāng)前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("節(jié)點數(shù)據(jù):" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("節(jié)點被刪除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:示例中的Thread.sleep(10)可以注釋,但是注釋后事件監(jiān)聽的觸發(fā)次數(shù)會不全,這可能與NodeCache的實現(xiàn)原理有關(guān),不能太過頻繁的觸發(fā)事件!

注意:NodeCache只能監(jiān)聽一個節(jié)點的狀態(tài)變化。

Tree Cache

Tree Cache可以監(jiān)控整個樹上的所有節(jié)點,類似于PathCache和NodeCache的組合,主要涉及到下面四個類:

  • TreeCache - Tree Cache實現(xiàn)類
  • TreeCacheListener - 監(jiān)聽器類
  • TreeCacheEvent - 觸發(fā)的事件類
  • ChildData - 節(jié)點數(shù)據(jù)
public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件類型:" + event.getType() +
                        " | 路徑:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:在此示例中沒有使用Thread.sleep(10),但是事件觸發(fā)次數(shù)也是正常的。

注意:TreeCache在初始化(調(diào)用start()方法)的時候會回調(diào)TreeCacheListener實例一個事TreeCacheEvent,而回調(diào)的TreeCacheEvent對象的Type為INITIALIZED,ChildData為null,此時event.getData().getPath()很有可能導(dǎo)致空指針異常,這里應(yīng)該主動處理并避免這種情況。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容