Kakfa客戶端pause采坑

[技術(shù)分享] Kafka Pause采坑記錄

Kafka在使用中,我們可能會(huì)需要有主動(dòng)暫停消費(fèi)的業(yè)務(wù)需求,等待一個(gè)信號(hào)再主動(dòng)恢復(fù),但實(shí)際使用中,客戶端提供的
暫停功能可能和你想象的不一樣,當(dāng)使用不當(dāng)時(shí),可能會(huì)引起pause機(jī)制失效,自動(dòng)恢復(fù)消費(fèi),一定要特別注意!

[采坑條件]

  1. 使用kafka 客戶端提供的pause功能
  2. kafka消費(fèi)者出現(xiàn)rebalance情況

[采坑復(fù)現(xiàn)]

1.創(chuàng)建一個(gè)test的topic,分區(qū)數(shù)為1
2.啟動(dòng)進(jìn)程A,訂閱test,顯式調(diào)用KafkaConsumer pause
3.啟動(dòng)進(jìn)程B,訂閱test,觸發(fā)kafka 消費(fèi)者rebalance情況
4.觀察進(jìn)程A,繼續(xù)消費(fèi)test topic的消息,pause機(jī)制失效

[Kafka 客戶端實(shí)現(xiàn)pause的機(jī)制原理]

1.puase功能是在kafka 客戶端實(shí)現(xiàn)的,服務(wù)端并沒有維護(hù)分區(qū)是否暫停消費(fèi)的狀態(tài)
2.kafka 客戶端本地維護(hù)一個(gè)消費(fèi)者訂閱分區(qū)的狀態(tài)列表,key為分區(qū),value為分區(qū)消費(fèi)者狀態(tài),屬性pause為true或fause
3.當(dāng)新的kafka 消費(fèi)者加入消費(fèi)組或者退出消費(fèi)組,都會(huì)引起kafka 消費(fèi)組 rebalance
4.當(dāng)rebalance之后,kafka客戶端會(huì)重新分配消費(fèi)的分區(qū),原先的狀態(tài)位丟失,導(dǎo)致pause失效

[源碼分析]

客戶端通過TopicPartitionState維護(hù)每個(gè)分區(qū)狀態(tài),包含pause屬性,顯式調(diào)用后為true

private static class TopicPartitionState {
        private Long position; // last consumed position
        private Long highWatermark; // the high watermark from last fetch
        private Long lastStableOffset;
        private OffsetAndMetadata committed;  // last committed position
        private boolean paused;  // whether this partition has been paused by the user
        private OffsetResetStrategy resetStrategy;  // the strategy to use if the offset needs resetting

kafka 消費(fèi)者通過AbstractCoordinator類,作用是維護(hù)心跳和分區(qū)等相關(guān)信息
這里主要分析onJoinComplete方法,此方法會(huì)在消費(fèi)者加入group成功后調(diào)用

 /**
     * Invoked when a group member has successfully joined a group.
     * @param generation The generation that was joined
     * @param memberId The identifier for the local member in the group
     * @param protocol The protocol selected by the coordinator
     * @param memberAssignment The assignment propagated from the group leader
     */
    protected abstract void onJoinComplete(int generation,
                                           String memberId,
                                           String protocol,
                                           ByteBuffer memberAssignment);

具體的實(shí)現(xiàn)類為ConsumerCoordinator,實(shí)現(xiàn)細(xì)節(jié)中,主要關(guān)注方法subscriptions.assignFromSubscribed(assignment.partitions());

 @Override
    protected void onJoinComplete(int generation,
                                  String memberId,
                                  String assignmentStrategy,
                                  ByteBuffer assignmentBuffer) {
        // only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
        if (!isLeader)
            assignmentSnapshot = null;

        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
        if (assignor == null)
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);

        Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);

        // set the flag to refresh last committed offsets
        subscriptions.needRefreshCommits();

        // update partition assignment 此處是重點(diǎn)
        subscriptions.assignFromSubscribed(assignment.partitions());
        .......

assignFromSubscribed是最終維護(hù)分區(qū)的方法

 public void assignFromSubscribed(Collection<TopicPartition> assignments) {
        if (!this.partitionsAutoAssigned())
            throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use");

        Map<TopicPartition, TopicPartitionState> assignedPartitionStates = partitionToStateMap(assignments);
        fireOnAssignment(assignedPartitionStates.keySet());
        ......

可以看到最終的TopicPartitionState在這重新生成了一次,導(dǎo)致原先保存的各個(gè)分區(qū)是否暫停消費(fèi)的狀態(tài)丟失

private static Map<TopicPartition, TopicPartitionState> partitionToStateMap(Collection<TopicPartition> assignments) {
        Map<TopicPartition, TopicPartitionState> map = new HashMap<>(assignments.size());
        for (TopicPartition tp : assignments)
            map.put(tp, new TopicPartitionState());
        return map;
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容