kafka 消費(fèi)者offset兩種存儲(chǔ)方式

kafka消費(fèi)者在會(huì)保存其消費(fèi)的進(jìn)度,也就是offset,存儲(chǔ)的位置根據(jù)選用的kafka api不同而不同。
首先來(lái)說(shuō)說(shuō)消費(fèi)者如果是根據(jù)javaapi來(lái)消費(fèi),也就是【kafka.javaapi.consumer.ConsumerConnector】,我們會(huì)配置參數(shù)【zookeeper.connect】來(lái)消費(fèi)。這種情況下,消費(fèi)者的offset會(huì)更新到zookeeper的【consumers/{group}/offsets/{topic}/{partition}】目錄下,例如:

[zk: localhost(CONNECTED) 0] get /kafka/consumers/zoo-consumer-group/offsets/my-topic/0
5662
cZxid = 0x20006d28a
ctime = Wed Apr 12 18:20:51 CST 2017
mZxid = 0x30132b0ed
mtime = Tue Aug 22 18:53:22 CST 2017
pZxid = 0x20006d28a
cversion = 0
dataVersion = 5758
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0

如果是根據(jù)kafka默認(rèn)的api來(lái)消費(fèi),即【org.apache.kafka.clients.consumer.KafkaConsumer】,我們會(huì)配置參數(shù)【bootstrap.servers】來(lái)消費(fèi)。而其消費(fèi)者的offset會(huì)更新到一個(gè)kafka自帶的topic【__consumer_offsets】下面,查看當(dāng)前group的消費(fèi)進(jìn)度,則要依靠kafka自帶的工具【kafka-consumer-offset-checker】,例如:

[root@localhost data]# kafka-consumer-offset-checker --zookeeper localhost :2181/kafka --group test-consumer-group  --topic stable-test
[2017-08-22 19:24:24,222] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
test-consumer-group stable-test                    0   601808          601808          0               none
test-consumer-group stable-test                    1   602826          602828          2               none
test-consumer-group stable-test                    2   602136          602136          0               none

offset更新的方式,不區(qū)分是用的哪種api,大致分為兩類:

1、自動(dòng)提交,設(shè)置enable.auto.commit=true,更新的頻率根據(jù)參數(shù)【auto.commit.interval.ms】來(lái)定。這種方式也被稱為【at most once】,fetch到消息后就可以更新offset,無(wú)論是否消費(fèi)成功。
2、手動(dòng)提交,設(shè)置enable.auto.commit=false,這種方式稱為【at least once】。fetch到消息后,等消費(fèi)完成再調(diào)用方法【consumer.commitSync()】,手動(dòng)更新offset;如果消費(fèi)失敗,則offset也不會(huì)更新,此條消息會(huì)被重復(fù)消費(fèi)一次。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容