[Hazelcast系列 四] 分布式map

Hazelcast的IMap 擴(kuò)展了java.util.concurrent.ConcurrentMapjava.util.Map·兩個(gè)接口,是Java map的分布式實(shí)現(xiàn)。

1. 獲取Map并存儲(chǔ)數(shù)據(jù)


調(diào)用Hazelcast實(shí)例的getMap 方法可以獲取一個(gè)分布式map,并可以通過(guò)put 方法存儲(chǔ)數(shù)據(jù)。Hazelcast堆map數(shù)據(jù)和備份數(shù)據(jù)進(jìn)行分區(qū),并將數(shù)據(jù)平均分配給集群所有的節(jié)點(diǎn)。每個(gè)節(jié)點(diǎn)存儲(chǔ)的數(shù)量大約為Map.size()*2/n,其中n為集群節(jié)點(diǎn)數(shù)量。

HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IMap<Integer, String> fruits = instance.getMap("fruits");
fruits.put(1, "apple");
fruits.put(2, "banana");
fruits.put(3, "pear");

Hazelcast默認(rèn)有271個(gè)分區(qū),但是fruits 這個(gè)map只有三條數(shù)據(jù),因此大多數(shù)的分區(qū)中沒(méi)有保存數(shù)據(jù)。

2. 創(chuàng)建一個(gè)節(jié)點(diǎn)備份Map數(shù)據(jù)


當(dāng)集群中節(jié)點(diǎn)數(shù)量大于1時(shí),map中的數(shù)據(jù)會(huì)被自動(dòng)分配給集群所有節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)存儲(chǔ)的數(shù)據(jù)量約為data count/n + backups。

3. Map備份


Hazelcast會(huì)把map中的數(shù)據(jù)分布在集群的多個(gè)節(jié)點(diǎn)上,每個(gè)節(jié)點(diǎn)只存儲(chǔ)一部分?jǐn)?shù)據(jù)。分布式map的默認(rèn)備份數(shù)為一。如果有一個(gè)節(jié)點(diǎn)故障,可以使用集群中的備份數(shù)據(jù)進(jìn)行恢復(fù)。Hazelcast中的map有兩種類型的備份:同步和異步。

同步備份

通過(guò)設(shè)置備份數(shù),Hazelcast提供了更高的數(shù)據(jù)安全性,集群一個(gè)節(jié)點(diǎn)的數(shù)據(jù)會(huì)被拷貝到其他節(jié)點(diǎn)。創(chuàng)建同步備份只需要設(shè)置backup-count

<hazelcast>
    ...
    <map name="fruits">
        <backup-count>1</backup-count>
    </map>
    ...
</hazelcast>

如果backup-count 的值為1,map的備份數(shù)據(jù)會(huì)全部存儲(chǔ)在另外一個(gè)節(jié)點(diǎn)上。如果backup-count 的值為2,集群中的兩個(gè)節(jié)點(diǎn)會(huì)存儲(chǔ)map的備份數(shù)據(jù)。如果不希望對(duì)數(shù)據(jù)進(jìn)行備份,可以設(shè)置backup-count 的值為0。如果性能的重要性超過(guò)數(shù)據(jù)的可靠性,backup-count 可設(shè)置的最大值為6。

Hazelcast同時(shí)支持同步和異步備份,默認(rèn)為同步備份,并使用backup-count 參數(shù)配置。在同步備份場(chǎng)景下,備份操作會(huì)阻塞其他操作直到備份數(shù)據(jù)已經(jīng)同步到集群節(jié)點(diǎn)并收到確認(rèn),因此備份會(huì)在put 操作完成前更新,從而提供了集群數(shù)據(jù)更高的可靠性。同步備份操作可能帶來(lái)額外的阻塞成本并產(chǎn)生時(shí)延問(wèn)題。

異步備份

與同步備份呢不同,異步備份不會(huì)阻塞對(duì)map的操作,異步備份不需要確認(rèn),而且備份動(dòng)作在特定的時(shí)間點(diǎn)執(zhí)行??梢酝ㄟ^(guò)參數(shù)asyn-backup-count 設(shè)置異步備份,下面是一個(gè)簡(jiǎn)單聲明式配置:

<hazelcast>
    ...
    <map name="default">
        <backup-count>0</backup-count>
        <async-backup-count>1</async-backup-count>
    </map>
    ...
</hazelcast>
  • 數(shù)據(jù)備份會(huì)增加內(nèi)存的使用。
  • map可以同時(shí)擁有同步備份和異步備份。
備份可讀

Hazelcast默認(rèn)只有一個(gè)同步備份。如果backup-count 的值大于1,那么集群中的節(jié)點(diǎn)需要保存屬于本節(jié)點(diǎn)的數(shù)據(jù)和備份數(shù)據(jù)。因此,在某個(gè)節(jié)點(diǎn)上調(diào)用map.get(key) 方法時(shí),該節(jié)點(diǎn)可能擁有該key對(duì)應(yīng)的備份數(shù)據(jù),默認(rèn)map.get(key) 方法會(huì)從真正擁有該數(shù)據(jù)的節(jié)點(diǎn)讀取數(shù)據(jù)(這點(diǎn)和kafka中從partition的leader讀取數(shù)據(jù)類似)。

通過(guò)設(shè)置read-backup-datatrue,可以允許備份數(shù)據(jù)可讀,Hazelcast從數(shù)據(jù)一致性考慮觸發(fā),將該參數(shù)的默認(rèn)值為false。備份數(shù)據(jù)可讀可以提高讀操作性能,但是可能產(chǎn)生臟讀問(wèn)題。

設(shè)置備份可讀的簡(jiǎn)單聲明式配置如下:

<hazelcast>
    ...
    <map name="default">
        <backup-count>0</backup-count>
        <async-backup-count>1</async-backup-count>
        <read-backup-data>true</read-backup-data>
    </map>
    ...
</hazelcast>

只有在至少有一個(gè)同步或異步備份的條件下備份數(shù)據(jù)可讀這個(gè)功能才可用。如果從備份讀取數(shù)據(jù),需要考慮key命中問(wèn)題,key 在備份數(shù)據(jù)中命中,可能在真正的數(shù)據(jù)成員節(jié)點(diǎn)不會(huì)命中。這會(huì)影響IMap統(tǒng)計(jì)中的最大空閑時(shí)間和過(guò)期時(shí)間。因此在備份中命中的key,在真正擁有數(shù)據(jù)的成員可能已經(jīng)過(guò)期。

4. 驅(qū)逐map數(shù)據(jù)

Hazelcast使用了新的基于數(shù)據(jù)采樣的驅(qū)逐機(jī)制。

map中的數(shù)據(jù)會(huì)一直存在除非手動(dòng)刪除或使用驅(qū)逐策略驅(qū)逐。IMap支持基于策略的數(shù)據(jù)驅(qū)逐,目前支持兩種驅(qū)逐策略:LRU和LFU。

理解map驅(qū)逐

Hazelcast實(shí)現(xiàn)了基于分區(qū)的map驅(qū)逐。例如,如果PER_NODE的值為max-size,Hazelcast使用下面的公式計(jì)算每個(gè)分區(qū)的最大數(shù)據(jù)量:

partition-maximum-size = max-size * member-count / partition-count

如果計(jì)算出partition-maximum-size 的值小于1,partition-maximum-size 會(huì)被設(shè)置為1.

當(dāng)向map中插入數(shù)據(jù)時(shí),根據(jù)上面公式計(jì)算出的分區(qū)最大數(shù)量啟動(dòng)驅(qū)逐。如果該分區(qū)存儲(chǔ)的數(shù)據(jù)量超過(guò)最大值,Hazelcast會(huì)在該分區(qū)啟動(dòng)驅(qū)逐。

假設(shè)map的配置信息如下:

  • 分區(qū)數(shù): 200
  • 每個(gè)分區(qū)數(shù)據(jù)量: 100
  • max-size (PER_NODE): 20000

map總的存儲(chǔ)數(shù)據(jù)量為20000,也就是說(shuō)驅(qū)逐的數(shù)據(jù)量閾值為20000.當(dāng)向map插入數(shù)據(jù)時(shí),驅(qū)逐的過(guò)程如下:

  1. 計(jì)算插入的數(shù)據(jù)需要存儲(chǔ)的分區(qū)位置
  2. 檢查分區(qū)是否達(dá)到驅(qū)逐閾值
  3. 驅(qū)逐一條數(shù)據(jù)

上述驅(qū)逐過(guò)程的結(jié)果是map的大小變?yōu)?9999,下一次操作不會(huì)觸發(fā)新的驅(qū)逐操作直到map存儲(chǔ)的數(shù)量再次到達(dá)驅(qū)逐閾值。

Map驅(qū)逐配置

下面是一個(gè)簡(jiǎn)單的map驅(qū)逐配置:

<hazelcast>
    ...
    <map name="default">
        <time-to-live-seconds>0</time-to-live-seconds>
        <max-idle-seconds>0</max-idle-seconds>
        <eviction eviction-policy="LRU" max-size-policy="PER_NODE" size="5000"/>
    </map>
    ...
</hazelcast>

配置參數(shù)的含義如下:

  • time-to-live-seconds:該參數(shù)設(shè)置每條數(shù)據(jù)在map中的保存時(shí)間(TTL)。該參數(shù)限制了一條數(shù)據(jù)自最后一次被操作的存活時(shí)間,如果該參數(shù)的值不為0,數(shù)據(jù)的存活時(shí)間超過(guò)參數(shù)值時(shí)會(huì)被自動(dòng)驅(qū)逐。每條數(shù)據(jù)可以設(shè)置自己的time-to-live-seconds參數(shù),如果不設(shè)置則使用map的參數(shù)值。該參數(shù)值的取值范圍為[0,Integer.MAX_VALUE]。默認(rèn)值為0,表示數(shù)據(jù)永不過(guò)期。如果參數(shù)的值不為0,數(shù)據(jù)驅(qū)逐將不會(huì)受eviction-policy的影響。

  • max-idle-seconds: 該參數(shù)設(shè)置數(shù)據(jù)在map中的最大空閑時(shí)間(自最后一次操作以來(lái)的時(shí)間)數(shù)據(jù)的空閑時(shí)間超過(guò)該值時(shí)會(huì)自動(dòng)被驅(qū)逐。參數(shù)取值范圍[0,Integer.MAX_VALUE],默認(rèn)值為0,表示無(wú)窮大,數(shù)據(jù)一直不被訪問(wèn)也可以保存在map中。

  • eviction-policy: 數(shù)據(jù)量超過(guò)設(shè)置的最大值時(shí)采用的驅(qū)逐策略:

    • NONE: 默認(rèn)策略,如果使用該策略,當(dāng)數(shù)據(jù)量超過(guò)最大值時(shí)不會(huì)驅(qū)逐數(shù)據(jù)??梢院?time-to-live-secondsmax-idle-seconds 配合使用。
    • LRU: 最近最少使用策略.
    • LFU: 最少使用策略.
  • size: map存儲(chǔ)數(shù)據(jù)的最大值。當(dāng)map存儲(chǔ)的數(shù)據(jù)量超過(guò)該參數(shù)的值時(shí),map會(huì)基于設(shè)置的驅(qū)逐策略對(duì)數(shù)據(jù)驅(qū)逐。參數(shù)的取值范圍為[0,Integer.MAX_VALUE],默認(rèn)值為0,表示可以存儲(chǔ)無(wú)窮多數(shù)據(jù)。如果希望該參數(shù)可用, eviction-policy 只能設(shè)置為L(zhǎng)RU或LFU。

    • PER_NODE: 集群節(jié)點(diǎn)存儲(chǔ)的最大數(shù)據(jù)量,該策略為默認(rèn)策略。

      <eviction max-size-policy="PER_NODE" size="5000"/>

    • PER_PARTITION: 每個(gè)分區(qū)存儲(chǔ)的最大數(shù)據(jù)量。

      <eviction max-size-policy="PER_PARTITION" size="27100" />

    • USED_HEAP_SIZE: 每個(gè)Hazelcast實(shí)例使用的最大堆大?。∕B)

      <eviction max-size-policy="USED_HEAP_SIZE" size="4096" />

    • USED_HEAP_PERCENTAGE: 每個(gè)Hazelcast實(shí)例使用的堆內(nèi)存大小比例。

      <eviction max-size-policy="USED_HEAP_PERCENTAGE" size="10" />

    • FREE_HEAP_SIZE: 最小空閑堆內(nèi)存(MB)

      <eviction max-size-policy="FREE_HEAP_SIZE" size="512" />

    • FREE_HEAP_PERCENTAGE: 最小空閑內(nèi)存比例。

      <eviction max-size-policy="FREE_HEAP_PERCENTAGE" size="10" />

    • USED_NATIVE_MEMORY_SIZE: 每個(gè)Hazelcast使用的最大直接內(nèi)存。

      <eviction max-size-policy="USED_NATIVE_MEMORY_SIZE" size="1024" />

    • USED_NATIVE_MEMORY_PERCENTAGE: 每個(gè)實(shí)例使用的最大直接內(nèi)存比例。

      <eviction max-size-policy="USED_NATIVE_MEMORY_PERCENTAGE" size="65" />

    • FREE_NATIVE_MEMORY_SIZE:每個(gè)實(shí)例的最小空閑直接內(nèi)存。

      <eviction max-size-policy="FREE_NATIVE_MEMORY_SIZE" size="256" />

    • FREE_NATIVE_MEMORY_PERCENTAGE:每個(gè)實(shí)例的直接內(nèi)存最小空閑比例。

      <eviction max-size-policy="FREE_NATIVE_MEMORY_PERCENTAGE" size="5" />

數(shù)據(jù)過(guò)期后,數(shù)據(jù)不能再?gòu)膍ap中獲取,在某個(gè)時(shí)間點(diǎn)該數(shù)據(jù)可能會(huì)被清理以釋放內(nèi)存?;谶^(guò)期時(shí)間的驅(qū)逐策略,可以通過(guò) time-to-live-secondsmax-idle-seconds 兩個(gè)參數(shù)設(shè)置。

驅(qū)逐配置
<hazelcast>
    ...
    <map name="documents">
        <eviction eviction-policy="LRU" max-size-policy="PER_NODE" size="10000"/>
        <max-idle-seconds>60</max-idle-seconds>
    </map>
    ...
</hazelcast>

在上面的配置中。名為documents map將在集群成員存儲(chǔ)的數(shù)據(jù)量超過(guò)10000時(shí)啟動(dòng)驅(qū)逐,最近最少被使用的數(shù)據(jù)將會(huì)被驅(qū)逐(LRU),此外,60s內(nèi)沒(méi)有被使用的數(shù)據(jù)也會(huì)被驅(qū)逐。

下面是一個(gè)關(guān)于直接內(nèi)存使用比例的配置:

<hazelcast>
    ...
    <map name="nativeMap*">
        <in-memory-format>NATIVE</in-memory-format>
        <eviction max-size-policy="USED_NATIVE_MEMORY_PERCENTAGE" eviction-policy="LFU" size="99"/>
    </map>
    ...
</hazelcast>
驅(qū)逐特定數(shù)據(jù)

上面介紹的驅(qū)逐策略和配置適用于map中的所有數(shù)據(jù),map中滿足驅(qū)逐條件的所有數(shù)據(jù)都將會(huì)被驅(qū)逐。如果想驅(qū)逐map中一些特定的數(shù)據(jù),可以通過(guò)V put(K var1, V var2, long var3, TimeUnit var5)V put(K var1, V var2, long var3, TimeUnit var5, long var6, TimeUnit var8) 兩個(gè)方法設(shè)置ttl 和最大空閑時(shí)間:

HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IMap<Integer, String> fruits = instance.getMap("fruits");
fruits.put(1, "apple", 60, TimeUnit.SECONDS);
fruits.put(2, "banana", 60, TimeUnit.SECONDS, 30, TimeUnit.SECONDS);
驅(qū)逐所有數(shù)據(jù)

使用evictAll() 方法可以驅(qū)逐map中所有沒(méi)被鎖的key 。 如果map定義存儲(chǔ)的MapStore,evictAll() 不會(huì)調(diào)用deleteAll() 方法,如果想調(diào)用deleteAll() 方法,可以使用clear() 方法。

下面是一個(gè)簡(jiǎn)單的樣例:

HazelcastInstance node1 = Hazelcast.newHazelcastInstance();
HazelcastInstance node2 = Hazelcast.newHazelcastInstance();
IMap<Integer, Integer> map1 = node1.getMap("data");
for (int i = 0; i < 100; i++) {
    map1.put(i, i);
}
for (int i = 0; i < 4; i++) {
    map1.lock(i);
}
IMap<Integer, Integer> map2 = node2.getMap("data");
System.out.println("map size before evict all = " + map2.size());
map2.evictAll();
System.out.println("map size after evict all = " + map1.size());

5. 設(shè)置內(nèi)存存儲(chǔ)格式

IMap 和一些其他的數(shù)據(jù)結(jié)構(gòu),比如ICache 都有一個(gè)in-memory-format 參數(shù)。Hazelcast默認(rèn)在內(nèi)存中以二進(jìn)制的格式存儲(chǔ)數(shù)據(jù)。有時(shí)候以對(duì)象的形式存儲(chǔ)數(shù)據(jù)可以加速本地處理,尤其對(duì)于查詢操作。

Hazelcast支持以下三中數(shù)據(jù)格式:

  • BINARY (default): 數(shù)據(jù)包括keyvalue都是序列化的二進(jìn)制格式存儲(chǔ)在內(nèi)存中。如果操作都是一些map的常規(guī)操作putget 可以使用這種格式存儲(chǔ)數(shù)據(jù)。
  • OBJECT: 數(shù)據(jù)以對(duì)象存儲(chǔ)在內(nèi)存中。以OBJECT格式存儲(chǔ)數(shù)據(jù)可以減少反序列化的開銷,適用于數(shù)據(jù)復(fù)雜和需要處理大量數(shù)據(jù)的場(chǎng)景。盡管value可以以OBJECT的格式存儲(chǔ),但是key依然以二進(jìn)制的格式存儲(chǔ)。
  • NATIVE: (Hazelcast 企業(yè)版特性) 這種格式和BINARY格式類型,但是存儲(chǔ)在直接內(nèi)存中。

get這樣的常規(guī)操作依賴于對(duì)象實(shí)例。當(dāng)使用OBJECT 存儲(chǔ)格式時(shí),調(diào)用get方法,map不會(huì)返回存儲(chǔ)的實(shí)例,而是返回存儲(chǔ)實(shí)例的一個(gè)克隆。一次get操作需要序列化和反序列,但是使用BINARY 格式存儲(chǔ)只需要一次反序列化,因此BINARY 更快。類似的,對(duì)于put 操作使用BINARY 存儲(chǔ)格式也更快。

6. 元數(shù)據(jù)策略

IMap可以在更新時(shí)自動(dòng)預(yù)處理多種數(shù)據(jù)類型,以加速對(duì)數(shù)據(jù)的查詢,當(dāng)前只有HazelcastJsonValue這種類型支持。啟用創(chuàng)建元數(shù)據(jù)創(chuàng)建后,IMap會(huì)創(chuàng)建有關(guān)受支持類型對(duì)象的元數(shù)據(jù),并在查詢時(shí)使用此元數(shù)據(jù)。這不影響除支持的類型外,操作其他任何類型的對(duì)象的時(shí)延和吞吐量。

Hazelcast已默認(rèn)開啟該功能,可以通過(guò)metadata-policy 關(guān)閉該功能,該參數(shù)的可選值為:OFF,CREATE_ON_UPDATE。

關(guān)閉元數(shù)據(jù)的聲明式配置:

<hazelcast>
    ...
    <map name="map-a">
        <metadata-policy>OFF</metadata-policy>
    </map>
    ...
</hazelcast>

代碼配置:

MapConfig mapConfig = new MapConfig();
mapConfig.setMetadataPolicy(MetadataPolicy.OFF);

7. 鎖map

IMap實(shí)現(xiàn)是線程安全的,可以滿足對(duì)線程安全的基本需求。不過(guò)隨著需求不斷增長(zhǎng)或者你想更多的對(duì)并發(fā)進(jìn)行控制,可以考慮下面Hazelcast提供的解決方案。

考慮下面一個(gè)修改map值的代碼:

public class RacyUpdateMember {
    public static void main( String[] args ) throws Exception {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        IMap<String, Value> map = hz.getMap( "map" );
        String key = "1";
        map.put( key, new Value() );
        System.out.println( "Starting" );
        for ( int k = 0; k < 1000; k++ ) {
            if ( k % 100 == 0 ) System.out.println( "At: " + k );
            Value value = map.get( key );
            Thread.sleep( 10 );
            value.amount++;
            map.put( key, value );
        }
        System.out.println( "Finished! Result = " + map.get(key).amount );
    }

    static class Value implements Serializable {
        public int amount;
    }
}

當(dāng)多個(gè)實(shí)例同時(shí)運(yùn)行上述代碼時(shí),就有可能產(chǎn)生“競(jìng)態(tài)”,可以使用Hazelcast提供的悲觀鎖和樂(lè)觀鎖來(lái)解決這個(gè)"競(jìng)態(tài)"問(wèn)題。

悲觀鎖

解決競(jìng)態(tài)問(wèn)題的一種方法是使用悲觀鎖:鎖住要操作map的entry直到操作完成。要使用悲觀鎖可以調(diào)用IMap提供的map.lockmap.unlock 方法。下面是一個(gè)簡(jiǎn)單的樣例:

public class PessimisticUpdateMember {
    public static void main( String[] args ) throws Exception {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        IMap<String, Value> map = hz.getMap( "map" );
        String key = "1";
        map.put( key, new Value() );
        System.out.println( "Starting" );
        for ( int k = 0; k < 1000; k++ ) {
            map.lock( key );
            try {
                Value value = map.get( key );
                Thread.sleep( 10 );
                value.amount++;
                map.put( key, value );
            } finally {
                map.unlock( key );
            }
        }
        System.out.println( "Finished! Result = " + map.get( key ).amount );
    }

    static class Value implements Serializable {
        public int amount;
    }
}

IMap 的鎖在已經(jīng)釋放而且沒(méi)有其他線程等待時(shí),可以被垃圾收集器自動(dòng)回收。IMap 的悲觀鎖是可重入的但是不是公平鎖。

樂(lè)觀鎖

Hazelcast中,IMapreplace 方法使用樂(lè)觀鎖。replace 根據(jù)數(shù)據(jù)在內(nèi)存的存儲(chǔ)格式比較值,如果值相等,則使用新的值替換舊的值(和CSA類似)。如果想使用自定義的equals 方法進(jìn)行相等性比較,數(shù)據(jù)在內(nèi)存中的存儲(chǔ)格式必須是OBJECT ,否Hazelcast首先將數(shù)據(jù)序列化然后進(jìn)行比較。

下面是樂(lè)觀鎖的一個(gè)簡(jiǎn)單樣例:

public class OptimisticMember {
    public static void main( String[] args ) throws Exception {
        HazelcastInstance hz = Hazelcast.newHazelcastInstance();
        IMap<String, Value> map = hz.getMap( "map" );
        String key = "1";
        map.put( key, new Value() );
        System.out.println( "Starting" );
        for ( int k = 0; k < 1000; k++ ) {
            if ( k % 10 == 0 ) System.out.println( "At: " + k );
            for (; ; ) {
                Value oldValue = map.get( key );
                Value newValue = new Value( oldValue );
                Thread.sleep( 10 );
                newValue.amount++;
                if ( map.replace( key, oldValue, newValue ) )
                    break;
            }
        }
        System.out.println( "Finished! Result = " + map.get( key ).amount );
    }

    static class Value implements Serializable {
        public int amount;

        public Value() {
        }

        public Value( Value that ) {
            this.amount = that.amount;
        }

        public boolean equals( Object o ) {
            if ( o == this ) return true;
            if ( !( o instanceof Value ) ) return false;
            Value that = ( Value ) o;
            return that.amount == this.amount;
        }
    }
}
悲觀鎖 vs. 樂(lè)觀鎖

悲觀鎖和樂(lè)觀鎖沒(méi)有絕對(duì)的優(yōu)劣,需要根據(jù)業(yè)務(wù)場(chǎng)景選擇不同的鎖策略。對(duì)于大多數(shù)只讀系統(tǒng),樂(lè)觀鎖更加合適,和悲觀鎖相比樂(lè)觀鎖有更高的性能。對(duì)于同一個(gè)key存在大量更新的場(chǎng)景使用悲觀鎖更好,從數(shù)據(jù)一致性來(lái)看悲觀鎖比樂(lè)觀鎖更加可靠。對(duì)于任務(wù),使用IExecutorService 比使用悲觀鎖或樂(lè)觀鎖技術(shù)更加合適,IExecutorService 有更少的網(wǎng)絡(luò)躍點(diǎn)和輸出傳輸,任務(wù)會(huì)在更加靠近數(shù)據(jù)的地方被執(zhí)行。

解決 ABA 問(wèn)題

什么是ABA問(wèn)題可以參考Wikipedia中的定義 什么是ABA問(wèn)題

在多個(gè)線程更新共享資源的場(chǎng)景就會(huì)引發(fā)ABA問(wèn)題。即使一個(gè)線程在連續(xù)讀取一個(gè)特定key的值時(shí)看到的值是相同的,但是這并意味著在讀之間數(shù)據(jù)沒(méi)有發(fā)生變化。另一個(gè)線程可能會(huì)更改該值,執(zhí)行業(yè)務(wù)邏輯后并將其更改回原來(lái)的值,而第一個(gè)線程認(rèn)為沒(méi)有任何更改。為了解決這類問(wèn)題,可以給每個(gè)數(shù)據(jù)增加一個(gè)版本,在操作之前檢查版本以確保數(shù)據(jù)沒(méi)有被更改。盡管數(shù)據(jù)的其他部分全部相同,但是版本不同也認(rèn)為是數(shù)據(jù)是不同的。這其實(shí)就是樂(lè)觀鎖的機(jī)制,這種機(jī)制在對(duì)特定key更新不頻繁的場(chǎng)景使用更加合適。

給數(shù)據(jù)增加版本是常用的解決問(wèn)題的方法。

使用悲觀鎖避免鎖腦裂

可以配置在鎖之前先檢查集群成員數(shù),如果檢查失敗,鎖操作拋出SplitBrainProtectionException 并失敗。悲觀鎖內(nèi)部也使用了鎖操作,因此也可以配置對(duì)鎖腦裂的保護(hù)。這意味著您可以使用相同名稱或與映射名稱匹配的模式配置鎖裂腦保護(hù)。 請(qǐng)注意,針對(duì)IMap鎖定操作的裂腦保護(hù)可能不同于針對(duì)其他IMap方法的裂腦保護(hù)。

下面的操作支持在使用前進(jìn)行腦裂檢查:

  • IMap.lock(K) 、IMap.lock(K, long, java.util.concurrent.TimeUnit)
  • IMap.isLocked()
  • IMap.tryLock(K), IMap.tryLock(K, long, java.util.concurrent.TimeUnit) and IMap.tryLock(K, long, java.util.concurrent.TimeUnit, long, java.util.concurrent.TimeUnit)
  • IMap.unlock()
  • IMap.forceUnlock()
  • MultiMap.lock(K) and MultiMap.lock(K, long, java.util.concurrent.TimeUnit)
  • MultiMap.isLocked()
  • MultiMap.tryLock(K), MultiMap.tryLock(K, long, java.util.concurrent.TimeUnit) and MultiMap.tryLock(K, long, java.util.concurrent.TimeUnit, long, java.util.concurrent.TimeUnit)
  • MultiMap.unlock()
  • MultiMap.forceUnlock()

一個(gè)簡(jiǎn)單的聲明式配置如下:

<hazelcast>
    ...
    <map name="myMap">
        <split-brain-protection-ref>map-actions-split-brain-protection</split-brain-protection-ref>
    </map>
    <lock name="myMap">
        <split-brain-protection-ref>map-lock-actions-split-brain-protection</split-brain-protection-ref>
    </lock>
    ...
</hazelcast>

map-lock-actions-split-brain-protection 配置用于map鎖定,map-actions-split-brain-protection用于其他map操作。

9. 獲取map統(tǒng)計(jì)信息

可以使用 getLocalMapStats() 方法獲取map的統(tǒng)計(jì)信息,比如entry的主備數(shù)量,最后更新時(shí)間以及被鎖的entry數(shù)量。如果需要集群范圍內(nèi)的map統(tǒng)計(jì)信息,需要獲取每個(gè)集群成員的map統(tǒng)計(jì)信息并將信息合并,或者從Hazelcast管理中心獲取。

為了獲取map的統(tǒng)計(jì)信息需要配置statistics-enabled 的值為true :

<hazelcast>
    ...
    <map name="myMap">
        <statistics-enabled>true</statistics-enabled>
    </map>
    ...
</hazelcast>

如果statistics-enabled 設(shè)置為false,Hazelcast將不會(huì)收集map的統(tǒng)計(jì)信息,統(tǒng)計(jì)信息也無(wú)法從Hazelcast管理中心獲取,方法 getLocalMapStats() 也無(wú)法獲?。ńy(tǒng)計(jì)數(shù)據(jù)都沒(méi)有從何獲取,哈哈)。

HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IMap<String, Order> map = instance.getMap("data");
map.put("first order", new Order("car"));
map.get("first order");
LocalMapStats stats = map.getLocalMapStats();
System.out.println ( "size in memory  : " + stats.getHeapCost() );
System.out.println ( "creationTime    : " + stats.getCreationTime() );
System.out.println ( "number of hits  : " + stats.getHits() );
System.out.println ( "lastAccessedTime: " + stats.getLastAccessTime() );
System.out.println ( "lastUpdateTime  : " + stats.getLastUpdateTime() );

Hazelcast還保存了map中entry的統(tǒng)計(jì)信息,包括創(chuàng)建時(shí)間,最后更新時(shí)間,最后訪問(wèn)時(shí)間,命中次數(shù)和版本等。可以使用 IMap.getEntryView(key) 方法獲取map中entry的統(tǒng)計(jì)信息:

HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IMap<String, Order> map = instance.getMap("data");
map.put("first order", new Order("car"));
map.get("first order");
EntryView entry = map.getEntryView("first order");
System.out.println ( "size in memory  : " + entry.getCost() );
System.out.println ( "creationTime    : " + entry.getCreationTime() );
System.out.println ( "expirationTime  : " + entry.getExpirationTime() );
System.out.println ( "number of hits  : " + entry.getHits() );
System.out.println ( "lastAccessedTime: " + entry.getLastAccessTime() );
System.out.println ( "lastUpdateTime  : " + entry.getLastUpdateTime() );
System.out.println ( "version         : " + entry.getVersion() );
System.out.println ( "key             : " + entry.getKey() );
System.out.println ( "value           : " + entry.getValue() );

10. 使用謂詞監(jiān)聽map數(shù)據(jù)

可以監(jiān)聽map中對(duì)特定數(shù)據(jù)的操作,可以認(rèn)為是使用謂詞的監(jiān)聽(監(jiān)聽滿足所有謂詞的數(shù)據(jù))。從3.7版本開始,Hazelcast提供了hazelcast.map.entry.filtering.natural.event.types 屬性,下表展示配置參數(shù)的值為true 和不配置參數(shù)或值為false時(shí)Hazelcast行為區(qū)別:

Default True
舊值滿足謂詞,新值不滿足謂詞 無(wú)事件發(fā)送 發(fā)送REMOVED
新舊值均滿足謂詞 發(fā)送UPDATED 事件 發(fā)送UPDATED 事件
新舊值均不滿足謂詞 無(wú)事件發(fā)送 無(wú)事件發(fā)送
舊值不滿足謂詞,新值滿足謂詞 發(fā)送UPDATED 事件 發(fā)送ADDED 事件

作為一個(gè)例子,我們監(jiān)聽訂單Order 的變化,Order 類的定義如下:

public class Order implements Serializable {
    private String name;

    public Order(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }
}

創(chuàng)建一個(gè)監(jiān)聽Order添加、更新和刪除的監(jiān)聽器:CustomizeEntryListener

public class CustomizeEntryListener implements EntryAddedListener<String, Order>,
        EntryUpdatedListener<String, Order>,
        EntryRemovedListener<String, Order> {
    @Override
    public void entryAdded(EntryEvent<String, Order> event) {
        System.out.println(event.getValue().getName() + " order added");
    }

    @Override
    public void entryUpdated(EntryEvent<String, Order> event) {
        System.out.println(event.getValue().getName() + " order updated");
    }

    @Override
    public void entryRemoved(EntryEvent<String, Order> event) {
        System.out.println("order removed");
    }
}

創(chuàng)建一個(gè)檢查訂單名為car 的謂詞,并和CustomizeEntryListener 配合使用實(shí)現(xiàn)監(jiān)聽:

    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.setProperty("hazelcast.map.entry.filtering.natural.event.types", "true");
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();
        IMap<String, Order> map = instance.getMap("data");
        map.addEntryListener(new CustomizeEntryListener(), (Predicate<String, Order>) entry -> "car".equals(entry.getValue().getName()), true);
        map.put("first order", new Order("car"));
        map.put("first order", new Order("car"));
        map.remove("first order");

    }

上述代碼的輸出如下:

car order added
car order updated
order removed

11. 使用謂詞批量刪除

Hazelcast提供了removeAll() 方法以實(shí)現(xiàn)根據(jù)謂詞刪除所有數(shù)據(jù),方法的定義如下:

void removeAll(Predicate<K, V> predicate);

一般來(lái)說(shuō),要獲取所有滿足謂詞的數(shù)據(jù)需要掃描map全部數(shù)據(jù)。如果map中的數(shù)據(jù)添加了索引,Hazelcast可以使用索引來(lái)查詢所有滿足謂詞的數(shù)據(jù),使用索引會(huì)加速查詢(有木有感覺(jué)和數(shù)據(jù)庫(kù)一樣?)。

調(diào)用removeAll()方法會(huì)同時(shí)移除Near Cache中的數(shù)據(jù)。

下面的代碼樣例向map中添加了8條數(shù)據(jù),調(diào)用removeAll 刪除所有keyhazelcast 開始的數(shù)據(jù):

HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IMap<String, Integer> map = instance.getMap("data");
for (int i = 0; i < 4; i++) {
    map.put("hazelcast" + i, i);
}
for (int i = 0; i < 4; i++) {
    map.put("map" + i, i);
}
System.out.println("map size = " + map.size());
map.removeAll((Predicate<String, Integer>) entry -> entry.getKey().startsWith("hazelcast"));
System.out.println("map size = " + map.size());

代碼輸出如下:

map size = 8
map size = 4

12. 添加攔截器

你可以添加攔截操作,并執(zhí)行自定義的業(yè)務(wù)邏輯以同步阻塞操作??梢孕薷?code>get 方法的返回值,改變put 方法的值或者通過(guò)拋出一個(gè)異常來(lái)取消操作。攔截器不同于監(jiān)聽器,使用監(jiān)聽器可以在操作完成以后執(zhí)行一些操作,攔截器是同步的可以修改操作的行為,改變操作的值甚至取消操作。

map的攔截器組成一個(gè)鏈,因此多次添加一個(gè)攔截器會(huì)導(dǎo)致相同的攔截器被執(zhí)行多次。在成員初始化的時(shí)候添加攔截器會(huì)輕易的導(dǎo)致這種場(chǎng)景,因?yàn)槎鄠€(gè)成員會(huì)添加相同的攔截器。當(dāng)以這種方式添加攔截器時(shí)確保攔截器實(shí)現(xiàn)hashCode() 方法以保證每個(gè)成員的攔截器都可以返回相同的值。雖然實(shí)現(xiàn)equals() 方法不是必須的,但是這是一個(gè)更好的實(shí)踐,可以確保map可以安全的刪除攔截器。

IMap 提供了兩個(gè)方法用于添加和刪除攔截器:addInterceptorremoveInterceptor 。下面是一個(gè)使用攔截器的簡(jiǎn)單樣例:

HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IMap<String, Integer> map = instance.getMap("data");
map.addInterceptor(new MapInterceptor() {
    @Override
    public Object interceptGet(Object o) {
        System.out.println("begin get " + o);
        return o;
    }

    @Override
    public void afterGet(Object o) {
        System.out.println("get finished " + o);
    }

    @Override
    public Object interceptPut(Object oldVal, Object newVal) {
        System.out.println("old value = " + oldVal + ",new value = " + newVal);
        return newVal;
    }

    @Override
    public void afterPut(Object o) {
        System.out.println("after put value = " + o);
    }

    @Override
    public Object interceptRemove(Object o) {
        System.out.println("begin remove " + o);
        return null;
    }

    @Override
    public void afterRemove(Object o) {
        System.out.println("remove " + o + " finished");
    }
});
map.put("hazelcast", 2);
map.get("hazelcast");
map.remove("hazelcast");

13. 防止內(nèi)存溢出

使用map的查詢方法很容易觸發(fā)內(nèi)存溢出異常,尤其在集群規(guī)模較大或堆很大的條件下。例如,一個(gè)集群有5個(gè)節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)堆最大值為25GB,每個(gè)成員保存10GB的數(shù)據(jù),調(diào)用IMap.entrySet() 方法將會(huì)拉取50GB的數(shù)據(jù),進(jìn)而導(dǎo)致實(shí)例內(nèi)存溢出而故障。對(duì)于單個(gè)節(jié)點(diǎn)來(lái)說(shuō)IMap.values() 返回了太多的數(shù)據(jù),一個(gè)真正的查詢或謂詞選擇錯(cuò)誤的查詢也肯能導(dǎo)致這種情況,尤其在應(yīng)用選擇參數(shù)的時(shí)候(應(yīng)用自定義查詢等場(chǎng)景)。

為了阻止這種異常的發(fā)生,可以配置每個(gè)查詢返回的最大數(shù)據(jù)量。這和SQL中的SELECT * FROM map LIMIT 100 不同,在SQL中你可以使用分頁(yè)查詢獲得全部數(shù)據(jù)?;诓樵兊牟僮鞯淖畲蠼Y(jié)果限制旨在作為最后一道防線,以防止檢索超出其處理能力的數(shù)據(jù)。Hazelcast 中的 QueryResultSizeLimiter組件負(fù)責(zé)計(jì)算這個(gè)大小限制。

設(shè)置查詢結(jié)果大小限制

如果 QueryResultSizeLimiter 組件被激活,它將計(jì)算每個(gè)分區(qū)的結(jié)果大小限制。每個(gè) QueryResultSizeLimiter組件運(yùn)行在集群成員所有的分區(qū)之上,因此只要集群成員沒(méi)有超過(guò)限制組件就會(huì)一直收集信息。如果超過(guò)限制會(huì)返回客戶端一個(gè)QueryResultSizeExceededException異常。該功能依賴數(shù)據(jù)在集群成員之間的均等分布,依賴計(jì)算每個(gè)成員的大小限制,因此在QueryResultSizeLimiter.MINIMUM_MAX_RESULT_LIMIT中定義了一個(gè)最小值。設(shè)置低于最小的值將會(huì)被加到最小值之上,比如最小值為5,設(shè)置的值為3,則最小值變?yōu)?.

本地預(yù)檢查

除去 QueryOperations中分布式的結(jié)果大小檢查,還可以在被調(diào)用實(shí)例上執(zhí)行本地預(yù)檢查。如果客戶端調(diào)用一個(gè)方法,本地預(yù)檢查會(huì)在調(diào)用QueryOperations的成員上執(zhí)行。由于本地預(yù)檢查會(huì)增加QueryOperation的延遲,因此可以配置本地多少個(gè)分區(qū)執(zhí)行本地預(yù)檢查或者完全關(guān)閉該功能。

結(jié)果大小限制范圍

除了指定的查詢操作外,在內(nèi)部還有一些使用謂詞的其他操作。這些操作也會(huì)拋出 QueryResultSizeExceededException 異常,下面的表格展示了那些操作受查詢結(jié)果大小的限制:

Methods Covered by Query Result Size Limit
通過(guò)系統(tǒng)屬性配置結(jié)果大小限制

可以通過(guò)下面的兩個(gè)系統(tǒng)屬性配置查詢結(jié)果的大小限制:

  • hazelcast.query.result.size.limit: map查詢返回結(jié)果的最大值。該值定義了單次查詢返回的最大數(shù)據(jù)量,如果單次查詢返回的數(shù)據(jù)量超過(guò)了該值則會(huì)拋出一個(gè)QueryResultSizeExceededException 異常。
  • hazelcast.query.max.local.partition.limit.for.precheck: 本地分區(qū)最大值。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容