[TOC]
GroupCoordinator handles general group membership and offset management.
Each Kafka server instantiates a coordinator which is responsible for a set of groups. Groups are assigned to coordinators based on their group names.
簡(jiǎn)單來(lái)說(shuō)就是,GroupCoordinator 是負(fù)責(zé)進(jìn)行 consumer 的 group 成員與 offset 管理(但每個(gè) GroupCoordinator 只是管理一部分的 consumer group member 和 offset 信息),那它是怎么管理的呢?這個(gè)從 GroupCoordinator 處理的 client 端請(qǐng)求類型可以看出來(lái),它處理的請(qǐng)求類型主要有以下幾種:
ApiKeys.OFFSET_COMMIT;
ApiKeys.OFFSET_FETCH;
ApiKeys.JOIN_GROUP;
ApiKeys.LEAVE_GROUP;
ApiKeys.SYNC_GROUP;
ApiKeys.DESCRIBE_GROUPS;
ApiKeys.LIST_GROUPS;
ApiKeys.HEARTBEAT;
而 Kafka Server 端要處理的請(qǐng)求總共有以下 21 種,其中有 8 種是由 GroupCoordinator 來(lái)完成的。
ApiKeys.forId(request.requestId) match {
case ApiKeys.PRODUCE => handleProducerRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
GroupCoordinator 簡(jiǎn)介
GroupCoordinator 的啟動(dòng)
Broker 在啟動(dòng)時(shí),也就是 KafkaServer 在 startup() 方法中會(huì)有以下一段內(nèi)容,它表示每個(gè) Broker 在啟動(dòng)是都會(huì)啟動(dòng) GroupCoordinator 服務(wù)。
/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, Time.SYSTEM)
groupCoordinator.startup()//note: 啟動(dòng) groupCoordinator
GroupCoordinator 服務(wù)在調(diào)用 setup() 方法啟動(dòng)后,進(jìn)行的操作如下,實(shí)際上只是把一個(gè)標(biāo)志變量值 isActive 設(shè)置為 true,并且啟動(dòng)了一個(gè)后臺(tái)線程來(lái)刪除過(guò)期的 group metadata。
/**
* Startup logic executed at the same time when the server starts up.
*/
def startup(enableMetadataExpiration: Boolean = true) {
info("Starting up.")
if (enableMetadataExpiration)
groupManager.enableMetadataExpiration()
isActive.set(true)
info("Startup complete.")
}
group 如何選擇相應(yīng)的 GroupCoordinator
要說(shuō)這個(gè),就必須介紹一下這個(gè) __consumer_offsets topic 了,它是 Kafka 內(nèi)部使用的一個(gè) topic,專門用來(lái)存儲(chǔ) group 消費(fèi)的情況,默認(rèn)情況下有50個(gè) partition,每個(gè) partition 默認(rèn)有三個(gè)副本,而具體的一個(gè) group 的消費(fèi)情況要存儲(chǔ)到哪一個(gè) partition 上,是根據(jù) abs(GroupId.hashCode()) % NumPartitions 來(lái)計(jì)算的(其中,NumPartitions 是 __consumer_offsets 的 partition 數(shù),默認(rèn)是50個(gè))。
對(duì)于 consumer group 而言,是根據(jù)其 group.id 進(jìn)行 hash 并計(jì)算得到其具對(duì)應(yīng)的 partition 值,該 partition leader 所在 Broker 即為該 Group 所對(duì)應(yīng)的 GroupCoordinator,GroupCoordinator 會(huì)存儲(chǔ)與該 group 相關(guān)的所有的 Meta 信息。
GroupCoordinator 的 metadata
對(duì)于 consumer group 而言,其對(duì)應(yīng)的 metadata 信息主要包含一下內(nèi)容:
/**
* Group contains the following metadata:
*
* Membership metadata:
* 1. Members registered in this group
* 2. Current protocol assigned to the group (e.g. partition assignment strategy for consumers)
* 3. Protocol metadata associated with group members
*
* State metadata:
* 1. group state
* 2. generation id
* 3. leader id
*/
@nonthreadsafe
//NOTE: group 的 meta 信息,對(duì) group 級(jí)別而言,每個(gè) group 都會(huì)有一個(gè)實(shí)例對(duì)象
private[coordinator] class GroupMetadata(val groupId: String, initialState: GroupState = Empty) {
private var state: GroupState = initialState // group 的狀態(tài)
private val members = new mutable.HashMap[String, MemberMetadata] // group 的 member 信息
private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata] //對(duì)應(yīng)的 commit offset
private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata] // commit offset 成功后更新到上面的 map 中
var protocolType: Option[String] = None
var generationId = 0 // generation id
var leaderId: String = null // leader consumer id
var protocol: String = null
}
而對(duì)于每個(gè) consumer 而言,其 metadata 信息主要包括以下內(nèi)容:
/**
* Member metadata contains the following metadata:
*
* Heartbeat metadata:
* 1. negotiated heartbeat session timeout 心跳超時(shí)時(shí)間
* 2. timestamp of the latest heartbeat 上次發(fā)送心跳的時(shí)間
*
* Protocol metadata:
* 1. the list of supported protocols (ordered by preference) 支持的 partition reassign 協(xié)議
* 2. the metadata associated with each protocol
*
* In addition, it also contains the following state information:
*
* 1. Awaiting rebalance callback: when the group is in the prepare-rebalance state,
* its rebalance callback will be kept in the metadata if the
* member has sent the join group request
* 2. Awaiting sync callback: when the group is in the awaiting-sync state, its sync callback
* is kept in metadata until the leader provides the group assignment
* and the group transitions to stable
*/
@nonthreadsafe
//NOTE: 記錄 group 中每個(gè)成員的狀態(tài)信息
private[coordinator] class MemberMetadata(val memberId: String,
val groupId: String,
val clientId: String,
val clientHost: String,
val rebalanceTimeoutMs: Int,
val sessionTimeoutMs: Int,
val protocolType: String,
var supportedProtocols: List[(String, Array[Byte])]) {}
GroupCoordinator 請(qǐng)求處理
正如前面所述,Kafka Server 端可以介紹的21種請(qǐng)求中,其中有8種是由 GroupCoordinator 來(lái)處理的,這里主要介紹一下,GroupCoordinator 如何處理這些請(qǐng)求的。
Offset 請(qǐng)求的處理
關(guān)于 Offset 請(qǐng)求的處理,有兩個(gè):
- OFFSET_FETCH:查詢 offset;
- OFFSET_COMMIT:提供 offset;
OFFSET_FETCH 請(qǐng)求處理
關(guān)于 OFFSET_FETCH 請(qǐng)求,Server 端的處理如下,新版 offset 默認(rèn)是保存在 Kafka 中,這里也以保存在 Kafka 中為例,從下面的實(shí)現(xiàn)中也可以看出,在 fetch commit 是分兩種情況
- 獲取 group 所消費(fèi)的所有 topic-partition 的 offset;
- 獲取指定 topic-partition 的 offset。
兩種情況都是調(diào)用 coordinator.handleFetchOffsets() 方法實(shí)現(xiàn)的。
/**
* Handle an offset fetch request
*/
def handleOffsetFetchRequest(request: RequestChannel.Request) {
val header = request.header
val offsetFetchRequest = request.body.asInstanceOf[OffsetFetchRequest]
def authorizeTopicDescribe(partition: TopicPartition) =
authorize(request.session, Describe, new Resource(auth.Topic, partition.topic)) //note: 驗(yàn)證 Describe 權(quán)限
val offsetFetchResponse =
// reject the request if not authorized to the group
if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId)))
offsetFetchRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED)
else {
if (header.apiVersion == 0) {
val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
.partition(authorizeTopicDescribe)
// version 0 reads offsets from ZK
val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
try {
if (!metadataCache.contains(topicPartition.topic))
(topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
else {
val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
payloadOpt match {
case Some(payload) =>
(topicPartition, new OffsetFetchResponse.PartitionData(
payload.toLong, OffsetFetchResponse.NO_METADATA, Errors.NONE))
case None =>
(topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
}
}
} catch {
case e: Throwable =>
(topicPartition, new OffsetFetchResponse.PartitionData(
OffsetFetchResponse.INVALID_OFFSET, OffsetFetchResponse.NO_METADATA, Errors.forException(e)))
}
}.toMap
val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava, header.apiVersion)
} else {
// versions 1 and above read offsets from Kafka
if (offsetFetchRequest.isAllPartitions) {//note: 獲取這個(gè) group 消費(fèi)的所有 tp offset
val (error, allPartitionData) = coordinator.handleFetchOffsets(offsetFetchRequest.groupId)
if (error != Errors.NONE)
offsetFetchRequest.getErrorResponse(error)
else {
// clients are not allowed to see offsets for topics that are not authorized for Describe
//note: 如果沒有 Describe 權(quán)限的話,不能查看相應(yīng)的 offset
val authorizedPartitionData = allPartitionData.filter { case (topicPartition, _) => authorizeTopicDescribe(topicPartition) }
new OffsetFetchResponse(Errors.NONE, authorizedPartitionData.asJava, header.apiVersion)
}
} else { //note: 獲取指定列表的 tp offset
val (authorizedPartitions, unauthorizedPartitions) = offsetFetchRequest.partitions.asScala
.partition(authorizeTopicDescribe)
val (error, authorizedPartitionData) = coordinator.handleFetchOffsets(offsetFetchRequest.groupId,
Some(authorizedPartitions))
if (error != Errors.NONE)
offsetFetchRequest.getErrorResponse(error)
else {
val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNKNOWN_PARTITION).toMap
new OffsetFetchResponse(Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava, header.apiVersion)
}
}
}
}
trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
requestChannel.sendResponse(new Response(request, offsetFetchResponse))
}
在 coordinator.handleFetchOffsets() 的實(shí)現(xiàn)中,主要是調(diào)用了 groupManager.getOffsets() 獲取相應(yīng)的 offset 信息,在查詢時(shí)加鎖的原因應(yīng)該是為了避免在查詢的過(guò)程中 offset 不斷更新。
def getOffsets(groupId: String, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
val group = groupMetadataCache.get(groupId)
if (group == null) {
topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE))
}.toMap
} else {
group synchronized {
if (group.is(Dead)) { //note: group 狀態(tài)已經(jīng)變成 dead, offset 返回 -1(INVALID_OFFSET)
topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE))
}.toMap
} else {
topicPartitionsOpt match {
case None => //note: 返回 group 消費(fèi)的所有 tp 的 offset 信息(只返回這邊已有 offset 的 tp)
// Return offsets for all partitions owned by this consumer group. (this only applies to consumers
// that commit offsets to Kafka.)
group.allOffsets.map { case (topicPartition, offsetAndMetadata) =>
topicPartition -> new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE)
}
case Some(topicPartitions) =>
topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
val partitionData = group.offset(topicPartition) match {
case None => //note: offset 沒有的話就返回-1
new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE)
case Some(offsetAndMetadata) =>
new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE)
}
topicPartition -> partitionData
}.toMap
}
}
}
}
}
OFFSET_COMMIT 請(qǐng)求處理
private def doCommitOffsets(group: GroupMetadata,
memberId: String,
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Short] => Unit) {
var delayedOffsetStore: Option[DelayedStore] = None
group synchronized {
if (group.is(Dead)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
} else if (generationId < 0 && group.is(Empty)) {//note: 來(lái)自 assign 的情況
// the group is only using Kafka to store offsets
delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
offsetMetadata, responseCallback)
} else if (group.is(AwaitingSync)) {
responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS.code))
} else if (!group.has(memberId)) {//note: 有可能 simple 與 high level 的沖突了,這里就直接拒絕相應(yīng)的請(qǐng)求
responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID.code))
} else if (generationId != group.generationId) {
responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
} else {
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)//note: 更新下次需要的心跳時(shí)間
delayedOffsetStore = groupManager.prepareStoreOffsets(group, memberId, generationId,
offsetMetadata, responseCallback) //note: commit offset
}
}
// store the offsets without holding the group lock
delayedOffsetStore.foreach(groupManager.store)
}
這里主要介紹一下 groupManager.prepareStoreOffsets() 方法,處理邏輯如下,這里簡(jiǎn)單說(shuō)一下其 offset 存儲(chǔ)的過(guò)程:
- 首先過(guò)濾掉那些 offset 超過(guò)范圍的 metadata;
- 將 offset 信息追加到 replicated log 中;
- 調(diào)用 prepareOffsetCommit() 方法,先將 offset 信息更新到 group 的 pendingOffsetCommits 中(這時(shí)還沒有真正提交,后面如果失敗的話,是可以撤回的);
- 在 putCacheCallback 回調(diào)函數(shù)中,如果 offset 信息追加到 replicated log 成功,那么就更新緩存(將 group 的 pendingOffsetCommits 中的信息更新到 offset 變量中)。
/**
* Store offsets by appending it to the replicated log and then inserting to cache
*/
//note: 記錄 commit 的 offset
def prepareStoreOffsets(group: GroupMetadata,
consumerId: String,
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Short] => Unit): Option[DelayedStore] = {
// first filter out partitions with offset metadata size exceeding limit
//note: 首先過(guò)濾掉 offset 信息超過(guò)范圍的 metadata
val filteredOffsetMetadata = offsetMetadata.filter { case (_, offsetAndMetadata) =>
validateOffsetMetadataLength(offsetAndMetadata.metadata)
}
// construct the message set to append
//note: 構(gòu)造一個(gè) msg set 追加
getMagicAndTimestamp(partitionFor(group.groupId)) match {
case Some((magicValue, timestampType, timestamp)) =>
val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
Record.create(magicValue, timestampType, timestamp,
GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition), //note: key是一個(gè)三元組: group、topic、partition
GroupMetadataManager.offsetCommitValue(offsetAndMetadata))
}.toSeq
val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId))
//note: 將 offset 信息追加到 replicated log 中
val entries = Map(offsetTopicPartition -> MemoryRecords.withRecords(timestampType, compressionType, records:_*))
// set the callback function to insert offsets into cache after log append completed
def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]) {
// the append response should only contain the topics partition
if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition))
throw new IllegalStateException("Append status %s should only have one partition %s"
.format(responseStatus, offsetTopicPartition))
// construct the commit response status and insert
// the offset and metadata to cache if the append status has no error
val status = responseStatus(offsetTopicPartition)
val responseCode =
group synchronized {
if (status.error == Errors.NONE) { //note: 如果已經(jīng)追加到了 replicated log 中了,那么就更新其緩存
if (!group.is(Dead)) { //note: 更新到 group 的 offset 中
filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
group.completePendingOffsetWrite(topicPartition, offsetAndMetadata)
}
}
Errors.NONE.code
} else {
if (!group.is(Dead)) {
filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
group.failPendingOffsetWrite(topicPartition, offsetAndMetadata)
}
}
debug(s"Offset commit $filteredOffsetMetadata from group ${group.groupId}, consumer $consumerId " +
s"with generation $generationId failed when appending to log due to ${status.error.exceptionName}")
// transform the log append error code to the corresponding the commit status error code
val responseError = status.error match {
case Errors.UNKNOWN_TOPIC_OR_PARTITION
| Errors.NOT_ENOUGH_REPLICAS
| Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
Errors.GROUP_COORDINATOR_NOT_AVAILABLE
case Errors.NOT_LEADER_FOR_PARTITION =>
Errors.NOT_COORDINATOR_FOR_GROUP
case Errors.MESSAGE_TOO_LARGE
| Errors.RECORD_LIST_TOO_LARGE
| Errors.INVALID_FETCH_SIZE =>
Errors.INVALID_COMMIT_OFFSET_SIZE
case other => other
}
responseError.code
}
}
// compute the final error codes for the commit response
val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
(topicPartition, responseCode)
else
(topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
}
// finally trigger the callback logic passed from the API layer
responseCallback(commitStatus)
}
group synchronized {
group.prepareOffsetCommit(offsetMetadata) //note: 添加到 group 的 pendingOffsetCommits 中
}
Some(DelayedStore(entries, putCacheCallback)) //note:
case None =>
val commitStatus = offsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
(topicPartition, Errors.NOT_COORDINATOR_FOR_GROUP.code)
}
responseCallback(commitStatus)
None
}
}
group 相關(guān)的處理
這一小節(jié)主要介紹 GroupCoordinator 處理 group 相關(guān)的請(qǐng)求。
JOIN_GROUP 和 SYNC_GROUP請(qǐng)求處理
這兩個(gè)請(qǐng)求的處理實(shí)際上在 Kafka 源碼解析之 Consumer 如何加入一個(gè) Group 中已經(jīng)詳細(xì)介紹過(guò),這里就不再陳述。
DESCRIBE_GROUPS 請(qǐng)求處理
關(guān)于 DESCRIBE_GROUPS 請(qǐng)求處理實(shí)現(xiàn)如下,主要是返回 group 中各個(gè) member 的詳細(xì)信息,包含的變量信息為 memberId, clientId, clientHost, metadata(protocol), assignment。
def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
if (!isActive.get) {
(Errors.GROUP_COORDINATOR_NOT_AVAILABLE, GroupCoordinator.EmptyGroup)
} else if (!isCoordinatorForGroup(groupId)) {
(Errors.NOT_COORDINATOR_FOR_GROUP, GroupCoordinator.EmptyGroup)
} else if (isCoordinatorLoadingInProgress(groupId)) {
(Errors.GROUP_LOAD_IN_PROGRESS, GroupCoordinator.EmptyGroup)
} else {
groupManager.getGroup(groupId) match { //note: 返回 group 詳細(xì)信息,主要是 member 的詳細(xì)信息
case None => (Errors.NONE, GroupCoordinator.DeadGroup)
case Some(group) =>
group synchronized {
(Errors.NONE, group.summary)
}
}
}
}
LEAVE_GROUP 請(qǐng)求處理
在什么情況下,Server 會(huì)收到 LEAVE_GROUP 的請(qǐng)求呢?一般來(lái)說(shuō)是:
- consumer 調(diào)用 unsubscribe() 方法,取消了對(duì)所有 topic 的訂閱時(shí);
- consumer 的心跳線程超時(shí)時(shí),這時(shí) consumer 會(huì)主動(dòng)發(fā)送 LEAVE_GROUP 請(qǐng)求;
- 在 server 端,如果在給定的時(shí)間沒收到 client 的心跳請(qǐng)求,這時(shí)候會(huì)自動(dòng)觸發(fā) LEAVE_GROUP 操作。
def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Short => Unit) {
if (!isActive.get) {
responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
} else if (!isCoordinatorForGroup(groupId)) {
responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
} else if (isCoordinatorLoadingInProgress(groupId)) {
responseCallback(Errors.GROUP_LOAD_IN_PROGRESS.code)
} else {
groupManager.getGroup(groupId) match {
case None =>
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the consumer to retry
// joining without specified consumer id,
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
case Some(group) =>
group synchronized {
if (group.is(Dead) || !group.has(memberId)) {
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
} else {
val member = group.get(memberId)
removeHeartbeatForLeavingMember(group, member)//NOTE: 認(rèn)為心跳完成
onMemberFailure(group, member)//NOTE: 從 group 移除當(dāng)前 member,并進(jìn)行 rebalance
responseCallback(Errors.NONE.code)
}
}
}
}
}
private def onMemberFailure(group: GroupMetadata, member: MemberMetadata) {
trace("Member %s in group %s has failed".format(member.memberId, group.groupId))
group.remove(member.memberId)//NOTE: 從 Group 移除當(dāng)前 member 信息
group.currentState match {
case Dead | Empty =>
case Stable | AwaitingSync => maybePrepareRebalance(group)//NOTE: 進(jìn)行 rebalance
case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))//NOTE: 檢查 join-group 是否可以完成
}
}
- 從上面可以看出,GroupCoordinator 在處理 LEAVE_GROUP 請(qǐng)求時(shí),實(shí)際上就是調(diào)用了 onMemberFailure() 方法,從 group 移除了失敗的 member 的,并且將進(jìn)行相應(yīng)的狀態(tài)轉(zhuǎn)換:
- 如果 group 原來(lái)是在 Dead 或 Empty 時(shí),那么由于 group 本來(lái)就沒有 member,就不再進(jìn)行任何操作;
- 如果 group 原來(lái)是在 Stable 或 AwaitingSync 時(shí),那么將會(huì)執(zhí)行 maybePrepareRebalance() 方法,進(jìn)行 rebalance 操作
- 如果 group 已經(jīng)在 PreparingRebalance 狀態(tài)了,那么這里將檢查一下 join-group 的延遲操作是否完成了,如果操作完成了,那么 GroupCoordinator 就會(huì)向 group 的 member 發(fā)送 join-group response,然后將狀態(tài)更新為 AwaitingSync.
HEARTBEAT 心跳請(qǐng)求處理
心跳請(qǐng)求是非常重要的請(qǐng)求之一:
- 對(duì)于 Server 端來(lái)說(shuō),它是 GroupCoordinator 判斷一個(gè) consumer member 是否存活的重要條件,如果其中一個(gè) consumer 在給定的時(shí)間沒有發(fā)送心跳請(qǐng)求,那么就會(huì)將這個(gè) consumer 從這個(gè) group 中移除,并執(zhí)行 rebalance 操作;
- 對(duì)于 Client 端而言,心跳請(qǐng)求是 client 感應(yīng) group 狀態(tài)變化的一個(gè)重要中介,比如:此時(shí)有一個(gè)新的 consumer 加入到 consumer group 中了,這時(shí)候會(huì)進(jìn)行 rebalace 操作,group 端的狀態(tài)會(huì)發(fā)送變化,當(dāng) group 其他 member 發(fā)送心跳請(qǐng)求,GroupCoordinator 就會(huì)通知 client 此時(shí)這個(gè) group 正處于 rebalance 階段,讓它們 rejoin group。
GroupCoordinator 處理心跳請(qǐng)求的過(guò)程如下所示。
//NOTE: Server 端處理心跳請(qǐng)求
def handleHeartbeat(groupId: String,
memberId: String,
generationId: Int,
responseCallback: Short => Unit) {
if (!isActive.get) {//NOTE: GroupCoordinator 已經(jīng)失敗
responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
} else if (!isCoordinatorForGroup(groupId)) {//NOTE: 當(dāng)前的 GroupCoordinator 不包含這個(gè) group
responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
} else if (isCoordinatorLoadingInProgress(groupId)) {//NOTE: group 的狀態(tài)信息正在 loading,直接返回成功結(jié)果
// the group is still loading, so respond just blindly
responseCallback(Errors.NONE.code)
} else {
groupManager.getGroup(groupId) match {
case None => //NOTE: 當(dāng)前 GroupCoordinator 不包含這個(gè) group
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
case Some(group) => //NOTE: 包含這個(gè) group
group synchronized {
group.currentState match {
case Dead => //NOTE: group 的狀態(tài)已經(jīng)變?yōu)?dead,意味著 group 的 meta 已經(jīng)被清除,返回 UNKNOWN_MEMBER_ID 錯(cuò)誤
// if the group is marked as dead, it means some other thread has just removed the group
// from the coordinator metadata; this is likely that the group has migrated to some other
// coordinator OR the group is in a transient unstable phase. Let the member retry
// joining without the specified member id,
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
case Empty => //NOTE: group 的狀態(tài)為 Empty, 意味著 group 的成員為空,返回 UNKNOWN_MEMBER_ID 錯(cuò)誤
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
case AwaitingSync => //NOTE: group 狀態(tài)為 AwaitingSync, 意味著 group 剛 rebalance 結(jié)束
if (!group.has(memberId)) //NOTE: group 不包含這個(gè) member,返回 UNKNOWN_MEMBER_ID 錯(cuò)誤
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
else //NOTE: 返回當(dāng)前 group 正在進(jìn)行 rebalance,要求 client rejoin 這個(gè) group
responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
case PreparingRebalance => //NOTE: group 狀態(tài)為 PreparingRebalance
if (!group.has(memberId)) { //NOTE: group 不包含這個(gè) member,返回 UNKNOWN_MEMBER_ID 錯(cuò)誤
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
} else if (generationId != group.generationId) {
responseCallback(Errors.ILLEGAL_GENERATION.code)
} else { //NOTE: 正常處理心跳信息,并返回 REBALANCE_IN_PROGRESS 錯(cuò)誤
val member = group.get(memberId)
//note: 更新心跳時(shí)間,認(rèn)為心跳完成,并監(jiān)控下次的調(diào)度情況(超時(shí)的話,會(huì)把這個(gè) member 從 group 中移除)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
}
case Stable =>
if (!group.has(memberId)) {
responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
} else if (generationId != group.generationId) {
responseCallback(Errors.ILLEGAL_GENERATION.code)
} else { //NOTE: 正確處理心跳信息
val member = group.get(memberId)
//note: 更新心跳時(shí)間,認(rèn)為心跳完成,并監(jiān)控下次的調(diào)度情況(超時(shí)的話,會(huì)把這個(gè) member 從 group 中移除)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.NONE.code)
}
}
}
}
}
group 的狀態(tài)機(jī)
GroupCoordinator 在進(jìn)行 group 和 offset 相關(guān)的管理操作時(shí),有一項(xiàng)重要的工作就是處理和維護(hù) group 狀態(tài)的變化,一個(gè) Group 狀態(tài)機(jī)如下如所示。

在這個(gè)狀態(tài)機(jī)中,最核心就是 rebalance 操作,簡(jiǎn)單說(shuō)一下 rebalance 過(guò)程:
- 當(dāng)一些條件發(fā)生時(shí)將 group 從 Stable 狀態(tài)變?yōu)?PreparingRebalance;
- 然后就是等待 group 中的所有 consumer member 發(fā)送 join-group 請(qǐng)求加入 group,如果都已經(jīng)發(fā)送 join-group 請(qǐng)求,此時(shí) GroupCoordinator 會(huì)向所有 member 發(fā)送 join-group response,那么 group 的狀態(tài)變?yōu)?AwaitingSync;
- leader consumer 會(huì)收到各個(gè) member 訂閱的 topic 詳細(xì)信息,等待其分配好 partition 后,通過(guò) sync-group 請(qǐng)求將結(jié)果發(fā)給 GroupCoordinator(非 leader consumer 發(fā)送的 sync-group 請(qǐng)求的 data 是為空的);
- 如果 GroupCoordinator 收到了 leader consumer 發(fā)送的 response,獲取到了這個(gè) group 各個(gè) member 所分配的 topic-partition 列表,group 的狀態(tài)就會(huì)變成 Stable。
這就是一次完整的 rebalance 過(guò)程。