前言
在N久之前,曾寫過kafka 生產(chǎn)者使用詳解,
今天補上關(guān)于 offset 相關(guān)的內(nèi)容。
那么本文主要涉及:
- Kafka 消費者的兩個大版本
- 消費者的基本使用流程
- 重點:offset 的控制
消費者版本
開源之初使用Scala 語言編寫的客戶端,
我們可以稱之為舊消費者客戶端(Old Consumer)
或 Scala 消費者客戶端;第二個是從Kafka 0.9. x 版本開始推出的使用Java 編寫的客戶端,
我們可以稱之為新消費者客戶端( New Consumer )
或Java 消費者客戶端,
它彌補了舊客戶端中存在的諸多設(shè)計缺陷,
不過我不建議你在0.9.x 使用該客戶端,
該新客戶端再 0.10.0 才算比較穩(wěn)定了
這里額外提一句就是,客戶端從scala 語言轉(zhuǎn)向 java,
并不是 java 比 scala 要怎么怎么樣,
僅僅只是因為社區(qū)的開發(fā)者換人了~~~~
開發(fā)一個消費者的正常流程
一個正常的消費邏輯需要具備以下幾個步驟:
- 配置消費者客戶端參數(shù)及創(chuàng)建相應(yīng)的消費者實例。
- 訂閱主題。
- 拉取消息并消費。
- 提交消費位移。
- 關(guān)閉消費者實例。
消費者可以訂閱多個Topic,
consumer.subscribe(Arrays.asList("t1","t2"))),
如果訂閱多次,后面的會覆蓋前面的,
所以取消訂閱其實也可以去訂閱一個空集合。
訂閱支持正則表達(dá)式:
consumer.subscribe(Pattern.compile("topic .*"));
這樣訂閱后,如果kafka后面新增了滿足該正則的 Topic也會被該消費者消費
消費者也可以直接訂閱某個分區(qū)的數(shù)據(jù),
這里我們貼下代碼,如下:
List<TopicPartition> partitions = new ArrayList<>();
// 查詢kafka分區(qū)信息
List<Partitioninfo> partitioninfos = consumer.partitionsFor( topic );
if (partitioninfos != null) {
for (Partitioninfo tpinfo : partitioninfos) {
partitions.add(new TopicPartition( tpinfo.topic(), tpinfo.partition() )) ;
consumer.assign( partitions ) ;
值得注意的是:
subscribe訂閱是具有分區(qū)在均衡能力的,
而 assign 是沒有的
這里我們只是簡單的過了一下 消費者,
因為不是本文的重點,
如果要詳細(xì)了解的話,
還是去看看這篇 kafka 生產(chǎn)者使用詳解
Offset 提交
這里指的是消費者消費的位移,
而不是Kafka端儲存的消息的 offset,
這其中的區(qū)別希望讀者清楚,不要混淆了。
對于offset 的提交,
我們要清楚一點
如果我們消費到了 offset=x 的消息
那么提交的應(yīng)該是 offset=x+1,
而不是 offset=x
kafka的提交方式分為兩種:
自動提交
在Kafka 中默認(rèn)的消費位移的提交方式是自動提交,
這個由消費者客戶端參數(shù)enable.auto.commit 配置,
默認(rèn)值為true。
當(dāng)然這個默認(rèn)的自動提交不是每消費一條消息就提交一次,
而是定期提交,
這個定期的周期時間由客戶端參數(shù)auto.commit.interval.ms配置,
默認(rèn)值為5 秒,
此參數(shù)生效的前提是enable.auto.commit 參數(shù)為true。
自動位移提交的動作是在poll()方法的邏輯里完成的,
在每次真正向服務(wù)端發(fā)起拉取請求之前會檢查是否可以進(jìn)行位移提交,
如果可以,那么就會提交上一輪消費的位移。
手動提交
-
commitSync() 同步提交
-
批量提交
該方式的最大問題在于數(shù)據(jù)是批量處理,
當(dāng)部分?jǐn)?shù)據(jù)完成消費,
還沒來得及提交offset就被中斷,
則會使得下次消費會重復(fù)消費那部分已經(jīng)消費過的數(shù)據(jù)。consumer.commitSync()會在消費完數(shù)據(jù)后,
將消費完消費的offset+1提交.
直接使用如下:final int minBatchSize = 200; List<ConsumerRecord> buffer= new ArrayList<>() ; while ( isRunning.get() ) { ConsumerRecords<String , String> records = consumer . poll(1000) ; for (ConsumerRecord<String , String> record : records) { buffer.add(record); if (buffer.size() >= minBatchSize) { //do some logical processing with buffer . consumer.commitSync() ; buffer.clear(); } } -
單條消息提交一次
該方式每消費一次,就保存一次。
雖然在很大程度上避免了重復(fù)消費,
但是其性能是極其低下的,
基本不在企業(yè)級考慮的范圍,
并且也不是完全的能做到精準(zhǔn)一次消費while ( isRunning. get () ) { ConsumerRecords<String , String> records= consumer.poll(1000) ; for (ConsumerRecord<String , String> record : records) { //do some logical processing. //讀取消費的消息的 offset long offset= record.offset() ; TopicPartition partition =new TopicPartition(record.topic() , record.partition()) ; // 提交位移 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1))) ; } } -
按分區(qū)提交
該方式其實是綜合了 批量提交 和 單條消息提交一次,
按分區(qū)的小批次提交,
如果你要使用同步提交的方式,
那么建議你使用該方式try { while (isRunning.get() ) { ConsumerRecords<String , String> records= consumer .poll(1000); for (TopicPartition partition : records.partitions( )) { //取出每個分區(qū)的消息 List<ConsumerRecord<String, String> partitionRecords = records . records(partition) for (ConsumerRecord<String , String> record : partitionRecords) { //消費該分區(qū)的消息***** //********* //將該分區(qū)的 offset 提交 long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1) .offset() ; consumer.commitSync(Collections.singletonMap ( partition , new OffsetAndMetadata(lastConsumedOffset + 1)) ); } } } }finally { consumer.close(); }
-
-
commitAsync() 異步提交
//三個重載方法 public void commitAsync() public void commitAsync(OffsetCommitCallback callback) public void commitAsync(final Map<TopicPartition , OffsetAndMetadata> offsets ,OffsetCommitCallback callback)commitAsync屬于異步提交,
也就是不會阻塞線程,
比起同步提交commitSync具有更好的性能。
這里我們主要來討論下OffsetCommitCallback callback回調(diào)的使用,
理解起來很簡單,我們每提交一次 Offset,
callback 都會告訴我們是否提交成功。
那么如果我們提交失敗了怎么辦呢??一般的想法就是:失敗了?那重新提交唄。
這種方式是否可行?我們看下面這個列子。
如果一個消費者消費到了 offset=10,
我們就異步提交了 offset=11,
繼續(xù)拉取消息 offset=11-20,
這個時候 提交的 offset=11 還沒有返回成功,
我們提交 offset=21,
返回 offset=21 提交成功。
OK,現(xiàn)在提交 offset=1的那條消息返回了,
并且是失敗的,
那么如果你去重試,
提交 offset=11 就會覆蓋掉 已經(jīng)提交的 offset=21
很明顯這不是我們想要的。-
正確的做法:
這個時候需要客戶端維護一個序列號,
每次提交成功都 +1,
重試的時候進(jìn)行對比,
不合法就不需要重試了。
當(dāng)然實際情況,
一般提交offset不會失敗,
并且就算失敗一次也不會有問題,
因為后面每次消費一樣會進(jìn)行offset提交,
而對于消費者正常退出,
我們可以使用,commitSync同步提交,
保證offset的正確。try { while(isRunning.get()) { //poll records and do some log 工cal processing . consumer . commitAsync() ; } ) finally { try { consumer.commitSync() ; ) finally { consumer.close() ; }} -
再均衡導(dǎo)致的重復(fù)消費:
再均衡發(fā)生的時候也可能會導(dǎo)致消費者的offset來不及提交,
這時候我們需要在監(jiān)聽到再均衡發(fā)生的時候進(jìn)行一次offset提交://該對象需要保存該消費者消費的分區(qū)的最新的 offset //本段代碼中沒有體現(xiàn),可以在消費數(shù)據(jù)之后 進(jìn)行更新該對象 Map<TopicPartition , OffsetAndMetadata> currentOffsets =new HashMap<>() ; consumer.subscribe(Arrays .asList( topic) , new ConsumerRebalanceListener () { //發(fā)生在 再均衡 之前,并且消費者停止讀取消息的時候 @Override public void onPartitionsRevoked(Collection<TopicPart ition> partitions) { consume.commitSync(currentOffsets) ; currentOffsets.clear(); } @Override public void onPartitions Assigned(Collection<TopicPartition > partitions) { //do nothing . } } );
最后,我們來總結(jié)下:
一般來說,我們不會使用自動提交的方式管理 offset,
雖然簡單,但是缺乏很好的控制,
不過如果能滿足業(yè)務(wù)要求,
那么還是果斷的使用起來吧對于手動提交,
一般我們都是使用異步提交的方式,
在考慮準(zhǔn)確的消費的情況下,兼顧的效率。同步提交一般用來輔助異步提交,
對于一些特殊情況,保證offset的正確提交。我們考慮到了再均衡的影響,并做了相關(guān)的處理
-
對于消費者異常退出 和 崩潰:
很遺憾的是如果出現(xiàn)異常和崩潰,
我們的消費還是很難做到精準(zhǔn)的一次消費,
不過一般來說,
以上這些方法是絕對滿足大部分企業(yè)大部分的業(yè)務(wù)的需求。
如果你實在要保證精準(zhǔn)的一次消費,
你可能還需要一些其他的輔助,
比如:消費和提交 當(dāng)做一次事務(wù),
或者 重復(fù)消費是冪等 等等方式。要精準(zhǔn)一次消費,
還得依靠開發(fā)人員來自己保證,
當(dāng)然,如果你使用 Kafka 的stream 方式消費,
是可以做到精準(zhǔn)一次消費的,
不過這不在本文的討論范圍了...