
在前一片文章中我們介紹了Hazelcast中的主題,但是由于沒有數(shù)據(jù)的備份,主題中的事件可能丟失,為了提高數(shù)據(jù)可靠性Hazelcast提供了可靠主題。可靠主題也使用ITopic 接口,為了保證數(shù)據(jù)的可靠性,可靠主題使用Ringbuffer 數(shù)據(jù)結(jié)構(gòu)備份主題的事件。和普通主題相比,可靠主題有以下優(yōu)勢:
- 主題中的事件不會丟失。
Ringbuffer默認有一個同步備份。 - 每個可靠主題使用獨立的
Ringbuffer,各個主題之間互不影響。 - 在腦裂環(huán)境中可靠主題無法工作。
下面是使用可靠主題發(fā)布事件的代碼樣例:
import com.hazelcast.core.Hazelcast
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val instance = Hazelcast.newHazelcastInstance()
val topic = instance.getReliableTopic<Long>("reliable")
var messageId = 1L
while (true) {
topic.publish(messageId)
messageId++
kotlinx.coroutines.delay(200L)
}
}
一個可靠主題的訂閱者:
import com.hazelcast.core.Hazelcast
import kotlinx.coroutines.runBlocking
fun main() = runBlocking<Unit> {
val instance = Hazelcast.newHazelcastInstance();
val topic = instance.getReliableTopic<Long>("reliable")
topic.addMessageListener {
println(it.messageObject)
}
}
在創(chuàng)建一個可靠主題時,Hazelcast會為主題自動創(chuàng)建一個Ringbuffer。Ringbuffer的名字和可靠主題的名字相同,可以通過添加一個Ringbuffer配置對可靠主題對應的Ringbuffer進行配置。對于Ringbuffer可以配置容量、主題消息的TTL,設置可以配置持久存儲。下面是一個對名字為“reliable”可靠主題使用的Ringbuffer配置的樣例:
<hazelcast>
...
<ringbuffer name="reliable">
<capacity>1000</capacity>
<time-to-live-seconds>5</time-to-live-seconds>
</ringbuffer>
<reliable-topic name="reliable">
<topic-overload-policy>BLOCK</topic-overload-policy>
</reliable-topic>
...
</hazelcast>
默認,可靠主題使用一個共享的線城池,如果想得到更好的隔離,可以通過ReliableTopicConfig 配置。Ringbuffer讀是非破壞性的,因此實現(xiàn)批量讀更加簡單,ITopic 每次最多讀取10條數(shù)據(jù)。
緩慢的消費者
可靠主題提供了對消費速度慢的消費的控制和管理方法。因為不知道速度慢的消費者何時能趕上,因此將對應的事件無限期保存在內(nèi)存中是不明智的??梢酝ㄟ^Ringbuffer的容量來對內(nèi)存中的數(shù)量進行限制,當Ringbuffer存儲的數(shù)據(jù)量超過容量限制,可以選擇下面的四種策略進行處理:
-
DISCARD_OLDEST: 丟棄老數(shù)據(jù),即使設置了TTL。 -
DISCARD_NEWEST: 丟棄新數(shù)據(jù)。 -
BLOCK: 阻塞直到有數(shù)據(jù)過期。 -
ERROR: 立即拋出TopicOverloadException異常。
配置可靠主題
聲明式配置:
<hazelcast>
...
<reliable-topic name="default">
<statistics-enabled>true</statistics-enabled>
<message-listeners>
<message-listener>
...
</message-listener>
</message-listeners>
<read-batch-size>10</read-batch-size>
<topic-overload-policy>BLOCK</topic-overload-policy>
</reliable-topic>
...
</hazelcast>
可靠主題的配置參數(shù)有以下四項:
-
statistics-enabled: 默認值true,是否開啟統(tǒng)計。 -
message-listener: 事件監(jiān)聽器。 -
read-batch-size: 批量讀大小,默認10. -
topic-overload-policy: 事件超過容量限制時的處理策略。