kafka Consumer — offset的控制

前言

在N久之前,曾寫過kafka 生產(chǎn)者使用詳解
今天補上關(guān)于 offset 相關(guān)的內(nèi)容。
那么本文主要涉及:

  1. Kafka 消費者的兩個大版本
  2. 消費者的基本使用流程
  3. 重點:offset 的控制

消費者版本

  1. 開源之初使用Scala 語言編寫的客戶端,
    我們可以稱之為舊消費者客戶端(Old Consumer)
    或 Scala 消費者客戶端;

  2. 第二個是從Kafka 0.9. x 版本開始推出的使用Java 編寫的客戶端,
    我們可以稱之為新消費者客戶端( New Consumer )
    或Java 消費者客戶端,
    它彌補了舊客戶端中存在的諸多設(shè)計缺陷,
    不過我不建議你在0.9.x 使用該客戶端,
    該新客戶端再 0.10.0 才算比較穩(wěn)定了

這里額外提一句就是,客戶端從scala 語言轉(zhuǎn)向 java,
并不是 java 比 scala 要怎么怎么樣,
僅僅只是因為社區(qū)的開發(fā)者換人了~~~~

開發(fā)一個消費者的正常流程

一個正常的消費邏輯需要具備以下幾個步驟:

  1. 配置消費者客戶端參數(shù)及創(chuàng)建相應(yīng)的消費者實例。
  2. 訂閱主題。
  3. 拉取消息并消費。
  4. 提交消費位移。
  5. 關(guān)閉消費者實例。

消費者可以訂閱多個Topic,
consumer.subscribe(Arrays.asList("t1","t2"))),
如果訂閱多次,后面的會覆蓋前面的,
所以取消訂閱其實也可以去訂閱一個空集合。

訂閱支持正則表達(dá)式:
consumer.subscribe(Pattern.compile("topic .*"));
這樣訂閱后,如果kafka后面新增了滿足該正則的 Topic也會被該消費者消費

消費者也可以直接訂閱某個分區(qū)的數(shù)據(jù),
這里我們貼下代碼,如下:

List<TopicPartition> partitions = new ArrayList<>();
// 查詢kafka分區(qū)信息
List<Partitioninfo> partitioninfos = consumer.partitionsFor( topic );
if (partitioninfos != null) {
for (Partitioninfo tpinfo : partitioninfos) {
partitions.add(new TopicPartition( tpinfo.topic(), tpinfo.partition() )) ;
consumer.assign( partitions ) ;

值得注意的是:
subscribe訂閱是具有分區(qū)在均衡能力的,
而 assign 是沒有的

這里我們只是簡單的過了一下 消費者,
因為不是本文的重點,
如果要詳細(xì)了解的話,
還是去看看這篇 kafka 生產(chǎn)者使用詳解

Offset 提交

這里指的是消費者消費的位移,
而不是Kafka端儲存的消息的 offset,
這其中的區(qū)別希望讀者清楚,不要混淆了。
對于offset 的提交,
我們要清楚一點
如果我們消費到了 offset=x 的消息
那么提交的應(yīng)該是 offset=x+1,
而不是 offset=x

kafka的提交方式分為兩種:

自動提交

在Kafka 中默認(rèn)的消費位移的提交方式是自動提交,
這個由消費者客戶端參數(shù)enable.auto.commit 配置,
默認(rèn)值為true。
當(dāng)然這個默認(rèn)的自動提交不是每消費一條消息就提交一次,
而是定期提交,
這個定期的周期時間由客戶端參數(shù)auto.commit.interval.ms配置,
默認(rèn)值為5 秒,
此參數(shù)生效的前提是enable.auto.commit 參數(shù)為true。
自動位移提交的動作是在poll()方法的邏輯里完成的,
在每次真正向服務(wù)端發(fā)起拉取請求之前會檢查是否可以進(jìn)行位移提交,
如果可以,那么就會提交上一輪消費的位移。

手動提交

  • commitSync() 同步提交

    • 批量提交
      該方式的最大問題在于數(shù)據(jù)是批量處理,
      當(dāng)部分?jǐn)?shù)據(jù)完成消費,
      還沒來得及提交offset就被中斷,
      則會使得下次消費會重復(fù)消費那部分已經(jīng)消費過的數(shù)據(jù)。

      consumer.commitSync()會在消費完數(shù)據(jù)后,
      將消費完消費的 offset+1 提交.
      直接使用如下:

       final int minBatchSize = 200;
       List<ConsumerRecord> buffer= new ArrayList<>() ;
       while ( isRunning.get() ) {
           ConsumerRecords<String , String> records = consumer . poll(1000) ;
           for (ConsumerRecord<String , String> record : records) {
           buffer.add(record);
           if (buffer.size() >= minBatchSize) {
               //do some logical processing with buffer .
               consumer.commitSync() ;
               buffer.clear();
          }
       }
      
    • 單條消息提交一次
      該方式每消費一次,就保存一次。
      雖然在很大程度上避免了重復(fù)消費,
      但是其性能是極其低下的,
      基本不在企業(yè)級考慮的范圍,
      并且也不是完全的能做到精準(zhǔn)一次消費

      while ( isRunning. get () ) {
          ConsumerRecords<String , String> records= consumer.poll(1000) ;
          for (ConsumerRecord<String , String> record : records) {
              //do some logical processing.
              //讀取消費的消息的 offset
              long offset= record.offset() ;
              TopicPartition partition =new TopicPartition(record.topic() , record.partition()) ;
              // 提交位移
              consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1))) ;
          }
      }
      
    • 按分區(qū)提交
      該方式其實是綜合了 批量提交 和 單條消息提交一次,
      按分區(qū)的小批次提交,
      如果你要使用同步提交的方式,
      那么建議你使用該方式

      try {
        while (isRunning.get() ) {
            ConsumerRecords<String , String> records= consumer .poll(1000);
            for (TopicPartition partition : records.partitions( )) {
                //取出每個分區(qū)的消息
                List<ConsumerRecord<String, String> partitionRecords = records . records(partition)
                for (ConsumerRecord<String , String> record : partitionRecords) {
                        //消費該分區(qū)的消息*****
                        //*********
                       
                        //將該分區(qū)的 offset 提交
                        long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1) .offset() ;
                        consumer.commitSync(Collections.singletonMap ( partition , 
                        new OffsetAndMetadata(lastConsumedOffset + 1))
                        );
                  }
              }
          }
      }finally {
          consumer.close();
      }
      


  • commitAsync() 異步提交

    //三個重載方法
    public void commitAsync()
    public void commitAsync(OffsetCommitCallback callback)
    public void commitAsync(final Map<TopicPartition , OffsetAndMetadata> offsets ,OffsetCommitCallback callback)
    

    commitAsync屬于異步提交,
    也就是不會阻塞線程,
    比起同步提交commitSync具有更好的性能。
    這里我們主要來討論下OffsetCommitCallback callback回調(diào)的使用,
    理解起來很簡單,我們每提交一次 Offset,
    callback 都會告訴我們是否提交成功。
    那么如果我們提交失敗了怎么辦呢??

    • 一般的想法就是:失敗了?那重新提交唄。
      這種方式是否可行?我們看下面這個列子。
      如果一個消費者消費到了 offset=10,
      我們就異步提交了 offset=11,
      繼續(xù)拉取消息 offset=11-20,
      這個時候 提交的 offset=11 還沒有返回成功,
      我們提交 offset=21,
      返回 offset=21 提交成功。
      OK,現(xiàn)在提交 offset=1的那條消息返回了,
      并且是失敗的,
      那么如果你去重試,
      提交 offset=11 就會覆蓋掉 已經(jīng)提交的 offset=21
      很明顯這不是我們想要的。

    • 正確的做法:
      這個時候需要客戶端維護一個序列號,
      每次提交成功都 +1,
      重試的時候進(jìn)行對比,
      不合法就不需要重試了。
      當(dāng)然實際情況,
      一般提交offset不會失敗,
      并且就算失敗一次也不會有問題,
      因為后面每次消費一樣會進(jìn)行offset提交,
      而對于消費者正常退出,
      我們可以使用,commitSync同步提交,
      保證offset的正確。

      try {
          while(isRunning.get()) {
              //poll records and do some log 工cal processing .
              consumer . commitAsync() ;
          }
      ) finally {
          try {
              consumer.commitSync() ;
          ) finally {
              consumer.close() ;
      }}
      
    • 再均衡導(dǎo)致的重復(fù)消費:
      再均衡發(fā)生的時候也可能會導(dǎo)致消費者的offset來不及提交,
      這時候我們需要在監(jiān)聽到再均衡發(fā)生的時候進(jìn)行一次offset提交:

      //該對象需要保存該消費者消費的分區(qū)的最新的 offset
      //本段代碼中沒有體現(xiàn),可以在消費數(shù)據(jù)之后 進(jìn)行更新該對象
      Map<TopicPartition , OffsetAndMetadata> currentOffsets =new HashMap<>() ;
      
      consumer.subscribe(Arrays .asList( topic) , new ConsumerRebalanceListener () {
          //發(fā)生在 再均衡 之前,并且消費者停止讀取消息的時候
          @Override
          public void onPartitionsRevoked(Collection<TopicPart ition> partitions) {
              consume.commitSync(currentOffsets) ;
              currentOffsets.clear();
          }
      
          @Override
           public void onPartitions Assigned(Collection<TopicPartition > partitions) {
              //do nothing .
          }
      } );
      


最后,我們來總結(jié)下:

  1. 一般來說,我們不會使用自動提交的方式管理 offset,
    雖然簡單,但是缺乏很好的控制,
    不過如果能滿足業(yè)務(wù)要求,
    那么還是果斷的使用起來吧

  2. 對于手動提交,
    一般我們都是使用異步提交的方式,
    在考慮準(zhǔn)確的消費的情況下,兼顧的效率。

  3. 同步提交一般用來輔助異步提交,
    對于一些特殊情況,保證offset的正確提交。

  4. 我們考慮到了再均衡的影響,并做了相關(guān)的處理

  5. 對于消費者異常退出 和 崩潰:
    很遺憾的是如果出現(xiàn)異常和崩潰,
    我們的消費還是很難做到精準(zhǔn)的一次消費,
    不過一般來說,
    以上這些方法是絕對滿足大部分企業(yè)大部分的業(yè)務(wù)的需求。
    如果你實在要保證精準(zhǔn)的一次消費,
    你可能還需要一些其他的輔助,
    比如:消費和提交 當(dāng)做一次事務(wù),
    或者 重復(fù)消費是冪等 等等方式。

    要精準(zhǔn)一次消費,
    還得依靠開發(fā)人員來自己保證,
    當(dāng)然,如果你使用 Kafka 的stream 方式消費,
    是可以做到精準(zhǔn)一次消費的,
    不過這不在本文的討論范圍了...

最后,感謝你的閱讀,如果可以,留個贊支持下作者!??!嘿嘿嘿~~~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容