Partition狀態(tài)
Partition有如下四種狀態(tài)
- NonExistentPartition: 這個partition還沒有被創(chuàng)建或者是創(chuàng)建后又被刪除了;
- NewPartition: 這個parition已創(chuàng)建, replicas也已分配好,但leader/isr還未就緒;
- OnlinePartition: 這個partition的leader選好;
- OfflinePartition: 這個partition的leader掛了,這個parition狀態(tài)為OfflinePartition;
- 狀態(tài)轉(zhuǎn)換圖:

PartitionStateTransaction.png
PartitionStateMachine
- 所在文件: core/src/main/scala/kafka/controller/PartitionStateMachine.scala
-
startup: 啟動PartitionStateMachine
-
initializePartitionState(): 初始化已經(jīng)存在的Partition的狀態(tài)
for((topicPartition, replicaAssignment) <- controllerContext.partitionReplicaAssignment) {
// check if leader and isr path exists for partition. If not, then it is in NEW state
controllerContext.partitionLeadershipInfo.get(topicPartition) match {
case Some(currentLeaderIsrAndEpoch) =>
// else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader) match {
case true => // leader is alive
partitionState.put(topicPartition, OnlinePartition)
case false =>
partitionState.put(topicPartition, OfflinePartition)
}
case None =>
partitionState.put(topicPartition, NewPartition)
}
}
-
triggerOnlinePartitionStateChange: 更新當前所有parititon的狀態(tài), 其中包括partition 選主, IRS的分配等操作, 將產(chǎn)生的LeaderAndIsrRequest,UpdateMetadataRequest通過ControllerBrokerRequestBatch 發(fā)送到各個broker node;
brokerRequestBatch.newBatch()
for((topicAndPartition, partitionState) <- partitionState
if(!controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic))) {
if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector,
(new CallbackBuilder).build)
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
-
handleStateChange: partition狀態(tài)轉(zhuǎn)換處理, 代碼看著有點多,實現(xiàn)上沒什么特別的,就是之前介紹過的一些partition選主, isr分配, 會生成LeaderAndIsrRequest,UpdateMetadataRequest, 添加到ControllerBrokerRequestBatch里,等待發(fā)送到各broker node:brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
targetState match {
case NewPartition =>
// pre: partition did not exist before this
assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
partitionState.put(topicAndPartition, NewPartition)
// post: partition has been assigned replicas
case OnlinePartition =>
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
partitionState(topicAndPartition) match {
case NewPartition =>
// initialize leader and isr path for new partition
initializeLeaderAndIsrForPartition(topicAndPartition) //初次分配leader
case OfflinePartition =>
electLeaderForPartition(topic, partition, leaderSelector) //使用[PartitionLeaderSelector](http://m.itdecent.cn/p/505fa1f9b61a)選主
case OnlinePartition => // invoked when the leader needs to be re-elected
electLeaderForPartition(topic, partition, leaderSelector)
case _ => // should never come here since illegal previous states are checked above
}
partitionState.put(topicAndPartition, OnlinePartition)
// post: partition has a leader
case OfflinePartition =>
// pre: partition should be in New or Online state
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
// should be called when the leader for a partition is no longer alive
partitionState.put(topicAndPartition, OfflinePartition)
// post: partition has no alive leader
case NonExistentPartition =>
// pre: partition should be in Offline state
assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
partitionState.put(topicAndPartition, NonExistentPartition)
// post: partition state is deleted from all brokers and zookeeper
}
-
registerListeners: PartitionStateMachine的另一重要作用就是監(jiān)聽zk上Topic的改變和刪除,其實就是watch相關(guān)的zk節(jié)點,- 監(jiān)聽zk節(jié)點:
/brokers/topics, topic的增加, 回調(diào)處理TopicChangeListener
- 監(jiān)聽zk節(jié)點:
private def registerTopicChangeListener() = {
zkUtils.zkClient.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
}
- 監(jiān)聽zk節(jié)點:
/admin/delete_topics, topic的刪除, 回調(diào)處理DeleteChangeListener
private def registerDeleteTopicListener() = {
zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
}
- 監(jiān)聽zk節(jié)點:
/brokers/topics/[topic], topic的partition的增加, 回調(diào)處理AddPartitionsListener
def registerPartitionChangeListener(topic: String) = {
addPartitionsListener.put(topic, new AddPartitionsListener(topic))
zkUtils.zkClient.subscribeDataChanges(getTopicPath(topic), addPartitionsListener(topic))
}
補一張圖

2017500806.jpg