聊聊 Kafka: Consumer 源碼解析之 Consumer 如何加入 Consumer Group

一、前言

今天這一篇我們來說一下 Consumer 是如何加入 Consumer Group 的,我們前面有一篇 Kafka 的架構文章有說到,Consumer 有消費組(Consumer Group)的概念,而 Producer 沒有生產(chǎn)組的概念。所以說 Consumer 側(cè)會比 Producer 側(cè)復雜點,除了消費者有消費組的概念,還需要維護管理 offset 偏移量、重復消費等問題。

與消費組相關的兩個組件,一個是消費者客戶端的 ConsumerCoordinator,一個是 Kafka Broker 服務端的 GroupCoordinator。ConsumerCoordinator 負責與 GroupCoordinator 通信,Broker 啟動的時候,都會啟動一個 GroupCoordinator 實例,而一個集群中,會有多個 Broker,那么如何確定一個新的 Consumer 加入 Consumer Group 后,到底和哪個 Broker 上的 GroupCoordinator 進行交互呢?

別急,聰明的程序員肯定是有辦法的,我們還是先來說一下 GroupCoordinator 吧。

二、GroupCoordinator

別問,問就是有相應的算法和策略。那我們就來看下是啥算法和策略實現(xiàn) Consumer 正確找到 GroupCoordinator 的,這就和 Kafka 內(nèi)部的 Topic __consumer_offsets 有關系了。

2.1 __consumer_offsets

__consumer_offsets 這個內(nèi)部 Topic,專門用來存儲 Consumer Group 消費的情況,默認情況下有 50 個 partition,每個 partition 默認三個副本。如下圖所示:

在這里插入圖片描述

2.2 Consumer 如何找到 GroupCoordinator 的?

每個 Consumer Group 都有其對應的 GroupCoordinator,當一個新的 Consumer 要尋找和它交互的 GroupCoordinator 時,需要先對它的 GroupId 進行 hash,然后取模 __consumer_offsets 的 partition 數(shù)量,最后得到的值就是對應 partition,那么這個 partition 的 leader 所在的 broker 即為這個 Consumer Group 要交互的 GroupCoordinator 所在的節(jié)點。獲取 partition 公式如下:

abs(GroupId.hashCode()) % NumPartitions

舉個例子,假設一個 GroupId 計算出來的 hashCode 是 8,之后取模 50 得到 8。那么 partition-8 的 leader 所在的 broker 就是我們要找的那個節(jié)點。這個 Consumer Group 后面都會直接和該 broker 上的 GroupCoordinator 交互。

三、Group 狀態(tài)變更

說 Consumer 加入 Consumer Group 流程之前,老周覺得有必要先說一下 Consumer Group 的狀態(tài)變更。

3.1 消費端

在協(xié)調(diào)器 AbstractCoordinator 中的內(nèi)部類 MemberState 中我們可以看到協(xié)調(diào)器的四種狀態(tài),分別是未注冊、重分配后沒收到響應、重分配后收到響應但還沒有收到分配、穩(wěn)定狀態(tài)。


在這里插入圖片描述

上述消費端的四種狀態(tài)的轉(zhuǎn)換如下圖所示:


在這里插入圖片描述

3.2 服務端

對于 Kafka 服務端的組則有五種狀態(tài) Empty、PreparingRebalance、CompletingRebalance、Stable、Dead。他們的狀態(tài)轉(zhuǎn)換如下圖所示:

在這里插入圖片描述

在這里插入圖片描述

四、Consumer 加入 Consumer Group 流程

說 Consumer 如何加入 Consumer Group 之前,我們還是先來回顧下上一篇消息消費的測試案例。

在這里插入圖片描述

核心方法是 poll() 方法,我們這里簡單提一下,后面我們會詳細介紹 Consumer 關于 poll 的網(wǎng)絡模型。


在這里插入圖片描述

Consumer 如何加入 Consumer Group 的,我們得來看啥時候與 GroupCoordinator 交互通信的,不難發(fā)現(xiàn)在消息拉取請求做準備 updateAssignmentMetadataIfNeeded() 這個方法里。

然后關于對 ConsumerCoordinator 的處理都集中在 coordinator.poll() 方法中。

我們來跟一下這兩個方法:

在這里插入圖片描述

org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#poll(org.apache.kafka.common.utils.Timer, boolean) 方法中,具體可以分為以下幾個步驟:

  • 檢測心跳線程運行是否正常 (需要定時向 GroupCoordinator 發(fā)送心跳,在建立連接之后,建立連接之前不會做任何事情)
  • 如果不存在協(xié)調(diào)器和協(xié)調(diào)器已斷開連接,則返回 false,結(jié)束本次拉取。如果 coordinator 未知,就初始化 ConsumerCoordinator (在 ensureCoordinatorReady() 中實現(xiàn))
  • 判斷是否需要觸發(fā)重平衡,即消費組內(nèi)的所有消費者重新分配 topic 中的分區(qū)信息。
  • 通過 ensureActiveGroup() 發(fā)送 join-group、sync-group 請求,加入 group 并獲取其 assign 的 TopicPartition list。
  • 如果需要更新元數(shù)據(jù),并且還沒有分區(qū)準備好,則同步阻塞等待元數(shù)據(jù)更新完畢。
  • 如果開啟了自動提交消費進度,并且已到下一次提交時間,則提交。

其中,有幾個地方需要詳細介紹,那就是 ensureCoordinatorReady() 方法、rejoinNeededOrPending() 方法和 ensureActiveGroup() 方法。

五、ensureCoordinatorReady()

這個方法的作用是:選擇一個連接數(shù)最小的 broker,向其發(fā)送 GroupCoordinator 請求,并建立相應的 TCP 連接。

  • 方法調(diào)用流程是:ensureCoordinatorReady() -> lookupCoordinator() -> sendFindCoordinatorRequest()。
  • 如果 client 獲取到 Server response,那么就會與 GroupCoordinator 建立連接。

5.1 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureCoordinatorReady

在這里插入圖片描述

5.2 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#lookupCoordinator

在這里插入圖片描述

5.3 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendFindCoordinatorRequest

在這里插入圖片描述

5.4 小結(jié)

  • 選擇一個連接最小的節(jié)點,發(fā)送 FindCoordinator request 請求,并對 response 進行處理。
  • FindCoordinatorRequest 這個請求會使用 group id 通過 ConsumerNetworkClient.send() 來查找對應的 GroupCoordinator 節(jié)點。(當然 ConsumerNetworkClient.send() 也是采用的 Java NIO 的機制,我們前面的文章有說到過)
  • 如果正確獲取 GroupCoordinator 時(會返回其對應的 node id、host 和 port 信息),建立連接,并更新心跳時間。

六、rejoinNeededOrPending()

在這里插入圖片描述

關于 rejoin, 下列幾種情況會觸發(fā)再均衡 reblance 操作

  • 新的消費者加入消費組 (第一次進行消費也屬于這種情況)
  • 消費者宕機下線 (長時間未發(fā)送心跳包)
  • 消費者主動退出消費組,比如調(diào)用 unsubscrible() 方法取消對主題的訂閱
  • 消費組對應的 GroupCoordinator 節(jié)點發(fā)生了變化
  • 消費組內(nèi)所訂閱的任一主題或者主題的分區(qū)數(shù)量發(fā)生了變化

七、ensureActiveGroup()

現(xiàn)在我們已經(jīng)知道了 GroupCoordinator 節(jié)點,并建立了連接。ensureActiveGroup() 這個方法的主要作用是向 GroupCoordinator 發(fā)送 join-group、sync-group 請求,獲取 assign 的 TopicPartition list。

  • 方法調(diào)用流程是:ensureActiveGroup() -> ensureCoordinatorReady() -> startHeartbeatThreadIfNeeded() -> joinGroupIfNeeded()
  • joinGroupIfNeeded() 方法中最重要的方法是 initiateJoinGroup(),它的調(diào)用流程是 sendJoinGroupRequest() -> JoinGroupResponseHandler.handle() -> onJoinLeader()、onJoinFollower() -> sendSyncGroupRequest()
/**
 * 確保 Group 是 active,并且加入該 group。
 * Ensure the group is active (i.e., joined and synced)
 *
 * @param timer Timer bounding how long this method can block
 * @throws KafkaException if the callback throws exception
 * @return true iff the group is active
 */
boolean ensureActiveGroup(final Timer timer) {
    // always ensure that the coordinator is ready because we may have been disconnected
    // when sending heartbeats and does not necessarily require us to rejoin the group.
    // 確保 GroupCoordinator 已經(jīng)連接
    if (!ensureCoordinatorReady(timer)) {
        return false;
    }

    // 啟動心跳發(fā)送線程(并不一定發(fā)送心跳,滿足條件后才會發(fā)送心跳)
    startHeartbeatThreadIfNeeded();
    // 發(fā)送 JoinGroup 請求,并對返回的信息進行處理。
    return joinGroupIfNeeded(timer);
}

7.1 joinGroupIfNeeded()

join-group 的請求是在 joinGroupIfNeeded() 中實現(xiàn)的。

在這里插入圖片描述

7.2 initiateJoinGroup()

joinGroupIfNeeded() 方法中最重要的方法是 initiateJoinGroup(),我們來看下:

private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
    // we store the join future in case we are woken up by the user after beginning the
    // rebalance in the call to poll below. This ensures that we do not mistakenly attempt
    // to rejoin before the pending rebalance has completed.
    if (joinFuture == null) {
        // 狀態(tài)標記為 rebalance
        state = MemberState.PREPARING_REBALANCE;
        // a rebalance can be triggered consecutively if the previous one failed,
        // in this case we would not update the start time.
        if (lastRebalanceStartMs == -1L)
            lastRebalanceStartMs = time.milliseconds();
        // 發(fā)送 JoinGroup 請求
        joinFuture = sendJoinGroupRequest();
        joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
            @Override
            public void onSuccess(ByteBuffer value) {
                // do nothing since all the handler logic are in SyncGroupResponseHandler already
            }

            @Override
            public void onFailure(RuntimeException e) {
                // we handle failures below after the request finishes. if the join completes
                // after having been woken up, the exception is ignored and we will rejoin;
                // this can be triggered when either join or sync request failed
                synchronized (AbstractCoordinator.this) {
                    sensors.failedRebalanceSensor.record();
                }
            }
        });
    }
    return joinFuture;
}

7.3 sendJoinGroupRequest() :join-group 請求

繼續(xù)跟 sendJoinGroupRequest() 方法

在這里插入圖片描述

sendJoinGroupRequest():向 GroupCoordinator 發(fā)送 join-group 請求

  • 如果 group 是新的 group.id,那么此時 group 初始化的狀態(tài)為 Empty。
  • 當 GroupCoordinator 接收到 consumer 的 join-group 請求后,由于此時這個 group 的 member 列表還是空(group 是新建的,每個 consumer 實例被稱為這個 group 的一個 member),第一個加入的 member 將被選為 leader,也就是說,對于一個新的 consumer group 而言,當?shù)谝粋€ consumer 實例加入后將會被選為 leader。
  • 如果 GroupCoordinator 接收到 leader 發(fā)送 join-group 請求,將會觸發(fā) rebalance,group 的狀態(tài)變?yōu)?PreparingRebalance。
  • 此時,GroupCoordinator 將會等待一定的時間,如果在一定時間內(nèi),接收到 join-group 請求的 consumer 將被認為是依然存活的,此時 group 會變?yōu)?AwaitSync 狀態(tài),并且 GroupCoordinator 會向這個 group 的所有 member 返回其 response。
  • consumer 在接收到 GroupCoordinator 的 response 后,如果這個 consumer 是 group 的 leader,那么這個 consumer 將會負責為整個 group assign partition 訂閱安排(默認是按 range 的策略,目前也可選 RoundRobin),然后 leader 將分配后的信息以 sendSyncGroupRequest() 請求的方式發(fā)給 GroupCoordinator,而作為 follower 的 consumer 實例會發(fā)送一個空列表。
  • GroupCoordinator 在接收到 leader 發(fā)來的請求后,會將 assign 的結(jié)果返回給所有已經(jīng)發(fā)送 sync-group 請求的 consumer 實例,并且 group 的狀態(tài)將會轉(zhuǎn)變?yōu)?Stable,如果后續(xù)再收到 sync-group 請求,由于 group 的狀態(tài)已經(jīng)是 Stable,將會直接返回其分配結(jié)果。

7.4 sendSyncGroupRequest() :sync-group 請求

sync-group 發(fā)送請求核心代碼如下:


在這里插入圖片描述

7.5 onJoinComplete()

經(jīng)過上面的步驟,一個 consumer 實例就已經(jīng)加入 group 成功了,加入 group 成功后,將會觸發(fā)ConsumerCoordinator 的 onJoinComplete() 方法,其作用就是:更新訂閱的 tp 列表以及更新其對應的 metadata。

在這里插入圖片描述

至此,一個 consumer 實例算是真正上意義上加入 group 成功。然后消費者就進入正常工作狀態(tài),同時消費者也通過向 GroupCoordinator 發(fā)送心跳來維持它們與消費者的從屬關系以及它們對分區(qū)的所有權關系。只要以正常的間隔發(fā)送心跳,就被認為是活躍的,但是如果 GroupCoordinator 沒有響應,那么就會發(fā)送 LeaveGroup 請求退出消費組。

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容