Kafka 源碼解析之 GroupCoordinator 詳解

[TOC]

GroupCoordinator handles general group membership and offset management.

Each Kafka server instantiates a coordinator which is responsible for a set of groups. Groups are assigned to coordinators based on their group names.

簡(jiǎn)單來(lái)說(shuō)就是,GroupCoordinator 是負(fù)責(zé)進(jìn)行 consumer 的 group 成員與 offset 管理(但每個(gè) GroupCoordinator 只是管理一部分的 consumer group member 和 offset 信息),那它是怎么管理的呢?這個(gè)從 GroupCoordinator 處理的 client 端請(qǐng)求類型可以看出來(lái),它處理的請(qǐng)求類型主要有以下幾種:

ApiKeys.OFFSET_COMMIT;
ApiKeys.OFFSET_FETCH;
ApiKeys.JOIN_GROUP;
ApiKeys.LEAVE_GROUP;
ApiKeys.SYNC_GROUP;
ApiKeys.DESCRIBE_GROUPS;
ApiKeys.LIST_GROUPS;
ApiKeys.HEARTBEAT;

而 Kafka Server 端要處理的請(qǐng)求總共有以下 21 種,其中有 8 種是由 GroupCoordinator 來(lái)完成的。

ApiKeys.forId(request.requestId) match {
  case ApiKeys.PRODUCE => handleProducerRequest(request)
  case ApiKeys.FETCH => handleFetchRequest(request)
  case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
  case ApiKeys.METADATA => handleTopicMetadataRequest(request)
  case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
  case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
  case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
  case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
  case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
  case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
  case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
  case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
  case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
  case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
  case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
  case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
  case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
  case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
  case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
  case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
  case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
  case requestId => throw new KafkaException("Unknown api code " + requestId)
}

GroupCoordinator 簡(jiǎn)介

GroupCoordinator 的啟動(dòng)

Broker 在啟動(dòng)時(shí),也就是 KafkaServer 在 startup() 方法中會(huì)有以下一段內(nèi)容,它表示每個(gè) Broker 在啟動(dòng)是都會(huì)啟動(dòng) GroupCoordinator 服務(wù)。

/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
groupCoordinator.startup()//note: 啟動(dòng) groupCoordinator

GroupCoordinator 服務(wù)在調(diào)用 setup() 方法啟動(dòng)后,進(jìn)行的操作如下,實(shí)際上只是把一個(gè)標(biāo)志變量值 isActive 設(shè)置為 true,并且啟動(dòng)了一個(gè)后臺(tái)線程來(lái)刪除過(guò)期的 group metadata。

/**
* Startup logic executed at the same time when the server starts up.
*/
def startup(enableMetadataExpiration: Boolean = true) {
  info("Starting up.")
  if (enableMetadataExpiration)
    groupManager.enableMetadataExpiration()
  isActive.set(true)
  info("Startup complete.")
}

group 如何選擇相應(yīng)的 GroupCoordinator

要說(shuō)這個(gè),就必須介紹一下這個(gè) __consumer_offsets topic 了,它是 Kafka 內(nèi)部使用的一個(gè) topic,專門用來(lái)存儲(chǔ) group 消費(fèi)的情況,默認(rèn)情況下有50個(gè) partition,每個(gè) partition 默認(rèn)有三個(gè)副本,而具體的一個(gè) group 的消費(fèi)情況要存儲(chǔ)到哪一個(gè) partition 上,是根據(jù) abs(GroupId.hashCode()) % NumPartitions 來(lái)計(jì)算的(其中,NumPartitions 是 __consumer_offsets 的 partition 數(shù),默認(rèn)是50個(gè))。

對(duì)于 consumer group 而言,是根據(jù)其 group.id 進(jìn)行 hash 并計(jì)算得到其具對(duì)應(yīng)的 partition 值,該 partition leader 所在 Broker 即為該 Group 所對(duì)應(yīng)的 GroupCoordinator,GroupCoordinator 會(huì)存儲(chǔ)與該 group 相關(guān)的所有的 Meta 信息。

GroupCoordinator 的 metadata

對(duì)于 consumer group 而言,其對(duì)應(yīng)的 metadata 信息主要包含一下內(nèi)容:

/**
 * Group contains the following metadata:
 *
 *  Membership metadata:
 *  1. Members registered in this group
 *  2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)
 *  3. Protocol metadata associated with group members
 *
 *  State metadata:
 *  1. group state
 *  2. generation id
 *  3. leader id
 */
@nonthreadsafe
//NOTE: group 的 meta 信息,對(duì) group 級(jí)別而言,每個(gè) group 都會(huì)有一個(gè)實(shí)例對(duì)象
private[coordinator] class GroupMetadata(val groupId: String, initialState: GroupState = Empty) {

  private var state: GroupState = initialState // group 的狀態(tài)
  private val members = new mutable.HashMap[String, MemberMetadata] // group 的 member 信息
  private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata] //對(duì)應(yīng)的 commit offset
  private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata] // commit offset 成功后更新到上面的 map 中

  var protocolType: Option[String] = None
  var generationId = 0 // generation id
  var leaderId: String = null // leader consumer id
  var protocol: String = null
}

而對(duì)于每個(gè) consumer 而言,其 metadata 信息主要包括以下內(nèi)容:

/**
 * Member metadata contains the following metadata:
 *
 * Heartbeat metadata:
 * 1. negotiated heartbeat session timeout 心跳超時(shí)時(shí)間
 * 2. timestamp of the latest heartbeat 上次發(fā)送心跳的時(shí)間
 *
 * Protocol metadata:
 * 1. the list of supported protocols (ordered by preference) 支持的 partition reassign 協(xié)議
 * 2. the metadata associated with each protocol
 *
 * In addition, it also contains the following state information:
 *
 * 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
 *                                 its rebalance callback will be kept in the metadata if the
 *                                 member has sent the join group request
 * 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
 *                            is kept in metadata until the leader provides the group assignment
 *                            and the group transitions to stable
 */
@nonthreadsafe
//NOTE: 記錄 group 中每個(gè)成員的狀態(tài)信息
private[coordinator] class MemberMetadata(val memberId: String,
                                          val groupId: String,
                                          val clientId: String,
                                          val clientHost: String,
                                          val rebalanceTimeoutMs: Int,
                                          val sessionTimeoutMs: Int,
                                          val protocolType: String,
                                          var supportedProtocols: List[(String, Array[Byte])]) {}

GroupCoordinator 請(qǐng)求處理

正如前面所述,Kafka Server 端可以介紹的21種請(qǐng)求中,其中有8種是由 GroupCoordinator 來(lái)處理的,這里主要介紹一下,GroupCoordinator 如何處理這些請(qǐng)求的。

Offset 請(qǐng)求的處理
關(guān)于 Offset 請(qǐng)求的處理,有兩個(gè):

  • OFFSET_FETCH:查詢 offset;
  • OFFSET_COMMIT:提供 offset;

OFFSET_FETCH 請(qǐng)求處理

關(guān)于 OFFSET_FETCH 請(qǐng)求,Server 端的處理如下,新版 offset 默認(rèn)是保存在 Kafka 中,這里也以保存在 Kafka 中為例,從下面的實(shí)現(xiàn)中也可以看出,在 fetch commit 是分兩種情況

  • 獲取 group 所消費(fèi)的所有 topic-partition 的 offset;
  • 獲取指定 topic-partition 的 offset。

兩種情況都是調(diào)用 coordinator.handleFetchOffsets() 方法實(shí)現(xiàn)的。

/**
 * Handle an offset fetch request
 */
def handleOffsetFetchRequest(request: RequestChannel.Request) {
  val header = request.header
  val offsetFetchRequest = request.body.asInstanceOf[OffsetFetchRequest]

  def authorizeTopicDescribe(partition: TopicPartition) =
    authorize(request.session, Describe, new Resource(auth.Topic, partition.topic)) //note: 驗(yàn)證 Describe 權(quán)限

  val offsetFetchResponse =
    // reject the request if not authorized to the group
    if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId)))
      offsetFetchRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED)
    else {
      if (header.apiVersion == 0) {
        val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
          .partition(authorizeTopicDescribe)

        // version 0 reads offsets from ZK
        val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
          val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
          try {
            if (!metadataCache.contains(topicPartition.topic))
              (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
            else {
              val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
              payloadOpt match {
                case Some(payload) =>
                  (topicPartition, new OffsetFetchResponse.PartitionData(
                      payload.toLong, OffsetFetchResponse.NO_METADATA, Errors.NONE))
                case None =>
                  (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
              }
            }
          } catch {
            case e: Throwable =>
              (topicPartition, new OffsetFetchResponse.PartitionData(
                  OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, Errors.forException(e)))
          }
        }.toMap

        val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
        new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava, header.apiVersion)
      } else {
        // versions 1 and above read offsets from Kafka
        if (offsetFetchRequest.isAllPartitions) {//note: 獲取這個(gè) group 消費(fèi)的所有 tp offset
          val (error, allPartitionData) = coordinator.handleFetchOffsets(offsetFetchRequest.groupId)
          if (error != Errors.NONE)
            offsetFetchRequest.getErrorResponse(error)
          else {
            // clients are not allowed to see offsets for topics that are not authorized for Describe
            //note: 如果沒有 Describe 權(quán)限的話,不能查看相應(yīng)的 offset
            val authorizedPartitionData = allPartitionData.filter { case (topicPartition, _) => authorizeTopicDescribe(topicPartition) }
            new OffsetFetchResponse(Errors.NONE, authorizedPartitionData.asJava, header.apiVersion)
          }
        } else { //note: 獲取指定列表的 tp offset
          val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
            .partition(authorizeTopicDescribe)
          val (error, authorizedPartitionData) = coordinator.handleFetchOffsets(offsetFetchRequest.groupId,
            Some(authorizedPartitions))
          if (error != Errors.NONE)
            offsetFetchRequest.getErrorResponse(error)
          else {
            val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
            new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava, header.apiVersion)
          }
        }
      }
    }

  trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
  requestChannel.sendResponse(new Response(request, offsetFetchResponse))
}

在 coordinator.handleFetchOffsets() 的實(shí)現(xiàn)中,主要是調(diào)用了 groupManager.getOffsets() 獲取相應(yīng)的 offset 信息,在查詢時(shí)加鎖的原因應(yīng)該是為了避免在查詢的過(guò)程中 offset 不斷更新。

def getOffsets(groupId: String, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
  trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
  val group = groupMetadataCache.get(groupId)
  if (group == null) {
    topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
      (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE))
    }.toMap
  } else {
    group synchronized {
      if (group.is(Dead)) { //note: group 狀態(tài)已經(jīng)變成 dead, offset 返回 -1(INVALID_OFFSET)
        topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
          (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE))
        }.toMap
      } else {
          topicPartitionsOpt match {
            case None => //note: 返回 group 消費(fèi)的所有 tp 的 offset 信息(只返回這邊已有 offset 的 tp)
              // Return offsets for all partitions owned by this consumer group. (this only applies to consumers
              // that commit offsets to Kafka.)
              group.allOffsets.map { case (topicPartition, offsetAndMetadata) =>
                topicPartition -> new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE)
              }

            case Some(topicPartitions) =>
              topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
                val partitionData = group.offset(topicPartition) match {
                  case None => //note: offset 沒有的話就返回-1
                    new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE)
                  case Some(offsetAndMetadata) =>
                    new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE)
                }
                topicPartition -> partitionData
              }.toMap
          }
      }
    }
  }
}

OFFSET_COMMIT 請(qǐng)求處理

private def doCommitOffsets(group: GroupMetadata,
                    memberId: String,
                    generationId: Int,
                    offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                    responseCallback: immutable.Map[TopicPartition, Short] => Unit) {
  var delayedOffsetStore: Option[DelayedStore] = None

  group synchronized {
    if (group.is(Dead)) {
      responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
    } else if (generationId < 0 && group.is(Empty)) {//note: 來(lái)自 assign 的情況
      // the group is only using Kafka to store offsets
      delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
        offsetMetadata, responseCallback)
    } else if (group.is(AwaitingSync)) {
      responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code))
    } else if (!group.has(memberId)) {//note: 有可能 simple 與 high level 的沖突了,這里就直接拒絕相應(yīng)的請(qǐng)求
      responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
    } else if (generationId != group.generationId) {
      responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
    } else {
      val member = group.get(memberId)
      completeAndScheduleNextHeartbeatExpiration(group, member)//note: 更新下次需要的心跳時(shí)間
      delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
        offsetMetadata, responseCallback) //note: commit offset
    }
  }

  // store the offsets without holding the group lock
  delayedOffsetStore.foreach(groupManager.store)
}

這里主要介紹一下 groupManager.prepareStoreOffsets() 方法,處理邏輯如下,這里簡(jiǎn)單說(shuō)一下其 offset 存儲(chǔ)的過(guò)程:

  1. 首先過(guò)濾掉那些 offset 超過(guò)范圍的 metadata;
  2. 將 offset 信息追加到 replicated log 中;
  3. 調(diào)用 prepareOffsetCommit() 方法,先將 offset 信息更新到 group 的 pendingOffsetCommits 中(這時(shí)還沒有真正提交,后面如果失敗的話,是可以撤回的);
  4. 在 putCacheCallback 回調(diào)函數(shù)中,如果 offset 信息追加到 replicated log 成功,那么就更新緩存(將 group 的 pendingOffsetCommits 中的信息更新到 offset 變量中)。
/**
 * Store offsets by appending it to the replicated log and then inserting to cache
 */
//note: 記錄 commit 的 offset
def prepareStoreOffsets(group: GroupMetadata,
                        consumerId: String,
                        generationId: Int,
                        offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                        responseCallback: immutable.Map[TopicPartition, Short] => Unit): Option[DelayedStore] = {
  // first filter out partitions with offset metadata size exceeding limit
  //note: 首先過(guò)濾掉 offset 信息超過(guò)范圍的 metadata
  val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
    validateOffsetMetadataLength(offsetAndMetadata.metadata)
  }

  // construct the message set to append
  //note: 構(gòu)造一個(gè) msg set 追加
  getMagicAndTimestamp(partitionFor(group.groupId)) match {
    case Some((magicValue, timestampType, timestamp)) =>
      val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
        Record.create(magicValue, timestampType, timestamp,
          GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition), //note: key是一個(gè)三元組: group、topic、partition
          GroupMetadataManager.offsetCommitValue(offsetAndMetadata))
      }.toSeq

      val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))

      //note: 將 offset 信息追加到 replicated log 中
      val entries = Map(offsetTopicPartition -> MemoryRecords.withRecords(timestampType, compressionType, records:_*))

      // set the callback function to insert offsets into cache after log append completed
      def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
        // the append response should only contain the topics partition
        if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition))
          throw new IllegalStateException("Append status %s should only have one partition %s"
            .format(responseStatus, offsetTopicPartition))

        // construct the commit response status and insert
        // the offset and metadata to cache if the append status has no error
        val status = responseStatus(offsetTopicPartition)

        val responseCode =
          group synchronized {
            if (status.error == Errors.NONE) { //note: 如果已經(jīng)追加到了 replicated log 中了,那么就更新其緩存
              if (!group.is(Dead)) { //note: 更新到 group 的 offset 中
                filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
                  group.completePendingOffsetWrite(topicPartition, offsetAndMetadata)
                }
              }
              Errors.NONE.code
            } else {
              if (!group.is(Dead)) {
                filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
                  group.failPendingOffsetWrite(topicPartition, offsetAndMetadata)
                }
              }

              debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId}, consumer $consumerId " +
                s"with generation $generationId failed when appending to log due to ${status.error.exceptionName}")

              // transform the log append error code to the corresponding the commit status error code
              val responseError = status.error match {
                case Errors.UNKNOWN_TOPIC_OR_PARTITION
                     | Errors.NOT_ENOUGH_REPLICAS
                     | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
                  Errors.GROUP_COORDINATOR_NOT_AVAILABLE

                case Errors.NOT_LEADER_FOR_PARTITION =>
                  Errors.NOT_COORDINATOR_FOR_GROUP

                case Errors.MESSAGE_TOO_LARGE
                     | Errors.RECORD_LIST_TOO_LARGE
                     | Errors.INVALID_FETCH_SIZE =>
                  Errors.INVALID_COMMIT_OFFSET_SIZE

                case other => other
              }

              responseError.code
            }
          }

        // compute the final error codes for the commit response
        val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
          if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
            (topicPartition, responseCode)
          else
            (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
        }

        // finally trigger the callback logic passed from the API layer
        responseCallback(commitStatus)
      }

      group synchronized {
        group.prepareOffsetCommit(offsetMetadata) //note: 添加到 group 的 pendingOffsetCommits 中
      }

      Some(DelayedStore(entries, putCacheCallback)) //note:

    case None =>
      val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
        (topicPartition, Errors.NOT_COORDINATOR_FOR_GROUP.code)
      }
      responseCallback(commitStatus)
      None
  }
}

group 相關(guān)的處理

這一小節(jié)主要介紹 GroupCoordinator 處理 group 相關(guān)的請(qǐng)求。

JOIN_GROUP 和 SYNC_GROUP請(qǐng)求處理
這兩個(gè)請(qǐng)求的處理實(shí)際上在 Kafka 源碼解析之 Consumer 如何加入一個(gè) Group 中已經(jīng)詳細(xì)介紹過(guò),這里就不再陳述。

DESCRIBE_GROUPS 請(qǐng)求處理
關(guān)于 DESCRIBE_GROUPS 請(qǐng)求處理實(shí)現(xiàn)如下,主要是返回 group 中各個(gè) member 的詳細(xì)信息,包含的變量信息為 memberId, clientId, clientHost, metadata(protocol), assignment。

def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
  if (!isActive.get) {
    (Errors.GROUP_COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup)
  } else if (!isCoordinatorForGroup(groupId)) {
    (Errors.NOT_COORDINATOR_FOR_GROUP, GroupCoordinator.EmptyGroup)
  } else if (isCoordinatorLoadingInProgress(groupId)) {
    (Errors.GROUP_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
  } else {
    groupManager.getGroup(groupId) match { //note: 返回 group 詳細(xì)信息,主要是 member 的詳細(xì)信息
      case None => (Errors.NONE, GroupCoordinator.DeadGroup)
      case Some(group) =>
        group synchronized {
          (Errors.NONE, group.summary)
        }
    }
  }
}

LEAVE_GROUP 請(qǐng)求處理

在什么情況下,Server 會(huì)收到 LEAVE_GROUP 的請(qǐng)求呢?一般來(lái)說(shuō)是:

  1. consumer 調(diào)用 unsubscribe() 方法,取消了對(duì)所有 topic 的訂閱時(shí);
  2. consumer 的心跳線程超時(shí)時(shí),這時(shí) consumer 會(huì)主動(dòng)發(fā)送 LEAVE_GROUP 請(qǐng)求;
  3. 在 server 端,如果在給定的時(shí)間沒收到 client 的心跳請(qǐng)求,這時(shí)候會(huì)自動(dòng)觸發(fā) LEAVE_GROUP 操作。
def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Short => Unit) {
  if (!isActive.get) {
    responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
  } else if (!isCoordinatorForGroup(groupId)) {
    responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
  } else if (isCoordinatorLoadingInProgress(groupId)) {
    responseCallback(Errors.GROUP_LOAD_IN_PROGRESS.code)
  } else {
    groupManager.getGroup(groupId) match {
      case None =>
        // if the group is marked as dead, it means some other thread has just removed the group
        // from the coordinator metadata; this is likely that the group has migrated to some other
        // coordinator OR the group is in a transient unstable phase. Let the consumer to retry
        // joining without specified consumer id,
        responseCallback(Errors.UNKNOWN_MEMBER_ID.code)

      case Some(group) =>
        group synchronized {
          if (group.is(Dead) || !group.has(memberId)) {
            responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
          } else {
            val member = group.get(memberId)
            removeHeartbeatForLeavingMember(group, member)//NOTE: 認(rèn)為心跳完成
            onMemberFailure(group, member)//NOTE: 從 group 移除當(dāng)前 member,并進(jìn)行 rebalance
            responseCallback(Errors.NONE.code)
          }
        }
    }
  }
}

private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
  trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
  group.remove(member.memberId)//NOTE: 從 Group 移除當(dāng)前 member 信息
  group.currentState match {
    case Dead | Empty =>
    case Stable | AwaitingSync => maybePrepareRebalance(group)//NOTE: 進(jìn)行 rebalance
    case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))//NOTE: 檢查 join-group 是否可以完成
  }
}
  1. 從上面可以看出,GroupCoordinator 在處理 LEAVE_GROUP 請(qǐng)求時(shí),實(shí)際上就是調(diào)用了 onMemberFailure() 方法,從 group 移除了失敗的 member 的,并且將進(jìn)行相應(yīng)的狀態(tài)轉(zhuǎn)換:
  2. 如果 group 原來(lái)是在 Dead 或 Empty 時(shí),那么由于 group 本來(lái)就沒有 member,就不再進(jìn)行任何操作;
  3. 如果 group 原來(lái)是在 Stable 或 AwaitingSync 時(shí),那么將會(huì)執(zhí)行 maybePrepareRebalance() 方法,進(jìn)行 rebalance 操作
  4. 如果 group 已經(jīng)在 PreparingRebalance 狀態(tài)了,那么這里將檢查一下 join-group 的延遲操作是否完成了,如果操作完成了,那么 GroupCoordinator 就會(huì)向 group 的 member 發(fā)送 join-group response,然后將狀態(tài)更新為 AwaitingSync.

HEARTBEAT 心跳請(qǐng)求處理
心跳請(qǐng)求是非常重要的請(qǐng)求之一:

  1. 對(duì)于 Server 端來(lái)說(shuō),它是 GroupCoordinator 判斷一個(gè) consumer member 是否存活的重要條件,如果其中一個(gè) consumer 在給定的時(shí)間沒有發(fā)送心跳請(qǐng)求,那么就會(huì)將這個(gè) consumer 從這個(gè) group 中移除,并執(zhí)行 rebalance 操作;
  2. 對(duì)于 Client 端而言,心跳請(qǐng)求是 client 感應(yīng) group 狀態(tài)變化的一個(gè)重要中介,比如:此時(shí)有一個(gè)新的 consumer 加入到 consumer group 中了,這時(shí)候會(huì)進(jìn)行 rebalace 操作,group 端的狀態(tài)會(huì)發(fā)送變化,當(dāng) group 其他 member 發(fā)送心跳請(qǐng)求,GroupCoordinator 就會(huì)通知 client 此時(shí)這個(gè) group 正處于 rebalance 階段,讓它們 rejoin group。

GroupCoordinator 處理心跳請(qǐng)求的過(guò)程如下所示。

//NOTE: Server 端處理心跳請(qǐng)求
def handleHeartbeat(groupId: String,
                  memberId: String,
                  generationId: Int,
                  responseCallback: Short => Unit) {
if (!isActive.get) {//NOTE: GroupCoordinator 已經(jīng)失敗
  responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
} else if (!isCoordinatorForGroup(groupId)) {//NOTE: 當(dāng)前的 GroupCoordinator 不包含這個(gè) group
  responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
} else if (isCoordinatorLoadingInProgress(groupId)) {//NOTE: group 的狀態(tài)信息正在 loading,直接返回成功結(jié)果
  // the group is still loading, so respond just blindly
  responseCallback(Errors.NONE.code)
} else {
  groupManager.getGroup(groupId) match {
    case None => //NOTE: 當(dāng)前 GroupCoordinator 不包含這個(gè) group
      responseCallback(Errors.UNKNOWN_MEMBER_ID.code)

    case Some(group) => //NOTE: 包含這個(gè) group
      group synchronized {
        group.currentState match {
          case Dead => //NOTE: group 的狀態(tài)已經(jīng)變?yōu)?dead,意味著 group 的 meta 已經(jīng)被清除,返回 UNKNOWN_MEMBER_ID 錯(cuò)誤
            // if the group is marked as dead, it means some other thread has just removed the group
            // from the coordinator metadata; this is likely that the group has migrated to some other
            // coordinator OR the group is in a transient unstable phase. Let the member retry
            // joining without the specified member id,
            responseCallback(Errors.UNKNOWN_MEMBER_ID.code)

          case Empty => //NOTE: group 的狀態(tài)為 Empty, 意味著 group 的成員為空,返回 UNKNOWN_MEMBER_ID 錯(cuò)誤
            responseCallback(Errors.UNKNOWN_MEMBER_ID.code)

          case AwaitingSync => //NOTE: group 狀態(tài)為 AwaitingSync, 意味著 group 剛 rebalance 結(jié)束
            if (!group.has(memberId)) //NOTE: group 不包含這個(gè) member,返回 UNKNOWN_MEMBER_ID 錯(cuò)誤
              responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
            else //NOTE: 返回當(dāng)前 group 正在進(jìn)行 rebalance,要求 client rejoin 這個(gè) group
              responseCallback(Errors.REBALANCE_IN_PROGRESS.code)

          case PreparingRebalance => //NOTE: group 狀態(tài)為 PreparingRebalance
            if (!group.has(memberId)) { //NOTE: group 不包含這個(gè) member,返回 UNKNOWN_MEMBER_ID 錯(cuò)誤
              responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
            } else if (generationId != group.generationId) {
              responseCallback(Errors.ILLEGAL_GENERATION.code)
            } else { //NOTE: 正常處理心跳信息,并返回 REBALANCE_IN_PROGRESS 錯(cuò)誤
              val member = group.get(memberId)
              //note: 更新心跳時(shí)間,認(rèn)為心跳完成,并監(jiān)控下次的調(diào)度情況(超時(shí)的話,會(huì)把這個(gè) member 從 group 中移除)
              completeAndScheduleNextHeartbeatExpiration(group, member)
              responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
            }

          case Stable =>
            if (!group.has(memberId)) {
              responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
            } else if (generationId != group.generationId) {
              responseCallback(Errors.ILLEGAL_GENERATION.code)
            } else { //NOTE: 正確處理心跳信息
              val member = group.get(memberId)
              //note: 更新心跳時(shí)間,認(rèn)為心跳完成,并監(jiān)控下次的調(diào)度情況(超時(shí)的話,會(huì)把這個(gè) member 從 group 中移除)
              completeAndScheduleNextHeartbeatExpiration(group, member)
              responseCallback(Errors.NONE.code)
            }
        }
      }
  }
}

group 的狀態(tài)機(jī)

GroupCoordinator 在進(jìn)行 group 和 offset 相關(guān)的管理操作時(shí),有一項(xiàng)重要的工作就是處理和維護(hù) group 狀態(tài)的變化,一個(gè) Group 狀態(tài)機(jī)如下如所示。

image.png

在這個(gè)狀態(tài)機(jī)中,最核心就是 rebalance 操作,簡(jiǎn)單說(shuō)一下 rebalance 過(guò)程:

  1. 當(dāng)一些條件發(fā)生時(shí)將 group 從 Stable 狀態(tài)變?yōu)?PreparingRebalance;
  2. 然后就是等待 group 中的所有 consumer member 發(fā)送 join-group 請(qǐng)求加入 group,如果都已經(jīng)發(fā)送 join-group 請(qǐng)求,此時(shí) GroupCoordinator 會(huì)向所有 member 發(fā)送 join-group response,那么 group 的狀態(tài)變?yōu)?AwaitingSync;
  3. leader consumer 會(huì)收到各個(gè) member 訂閱的 topic 詳細(xì)信息,等待其分配好 partition 后,通過(guò) sync-group 請(qǐng)求將結(jié)果發(fā)給 GroupCoordinator(非 leader consumer 發(fā)送的 sync-group 請(qǐng)求的 data 是為空的);
  4. 如果 GroupCoordinator 收到了 leader consumer 發(fā)送的 response,獲取到了這個(gè) group 各個(gè) member 所分配的 topic-partition 列表,group 的狀態(tài)就會(huì)變成 Stable。

這就是一次完整的 rebalance 過(guò)程。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容