// Gossip leader election module // Algorithm properties: // - Peers break symmetry by comparing IDs // - Each peer is either a leader or a follower, // and the aim is to have exactly 1 leader if the membership view // is the same for all peers // - If the network is partitioned into 2 or more sets, the number of leaders // is the number of network partitions, but when the partition heals, // only 1 leader should be left eventually // - Peers communicate by gossiping leadership proposal or declaration messages // The Algorithm, in pseudo code: // // // variables: // leaderKnown = false // // Invariant: // Peer listens for messages from remote peers // and whenever it receives a leadership declaration, // leaderKnown is set to true // // Startup(): // wait for membership view to stabilize, or for a leadership declaration is received // or the startup timeout expires. // goto SteadyState() // // SteadyState(): // while true: // If leaderKnown is false: // LeaderElection() // If you are the leader: // Broadcast leadership declaration // If a leadership declaration was received from // a peer with a lower ID, // become a follower // Else, you're a follower: // If haven't received a leadership declaration within // a time threshold: // set leaderKnown to false // // LeaderElection(): // Gossip leadership proposal message // Collect messages from other peers sent within a time period // If received a leadership declaration: // return // Iterate over all proposal messages collected. // If a proposal message from a peer with an ID lower // than yourself was received, return. // Else, declare yourself a leader基本上注釋寫得很清楚,就是一個簡化版的Raft共識算法。
初始化
func NewLeaderElectionService(adapter LeaderElectionAdapter, id string, callback leadershipCallback) LeaderElectionService {
if len(id) == 0 {
panic("Empty id")
}
le := &leaderElectionSvcImpl{
id: peerID(id),
proposals: util.NewSet(),
adapter: adapter,
stopChan: make(chan struct{}, 1),
interruptChan: make(chan struct{}, 1),
logger: util.GetLogger(util.ElectionLogger, ""),
callback: noopCallback,
}
if callback != nil {
le.callback = callback
}
go le.start()
return le
}
隨Gossip啟動而啟動,最后開始start
啟動
func (le *leaderElectionSvcImpl) start() {
le.stopWG.Add(2)
go le.handleMessages()
le.waitForMembershipStabilization(getStartupGracePeriod())
go le.run()
}
這里便是election模塊的核心起手式了。下面我們具體來分析下里面的實現(xiàn)。
handleMessages
func (le *leaderElectionSvcImpl) handleMessage(msg Msg) {
msgType := "proposal"
if msg.IsDeclaration() {
msgType = "declaration"
}
le.logger.Debug(le.id, ":", msg.SenderID(), "sent us", msgType)
le.Lock()
defer le.Unlock()
if msg.IsProposal() {
le.proposals.Add(string(msg.SenderID()))
} else if msg.IsDeclaration() {
atomic.StoreInt32(&le.leaderExists, int32(1))
if le.sleeping && len(le.interruptChan) == 0 {
le.interruptChan <- struct{}{}
}
if bytes.Compare(msg.SenderID(), le.id) < 0 && le.IsLeader() {
le.stopBeingLeader()
}
} else {
// We shouldn't get here
le.logger.Error("Got a message that's not a proposal and not a declaration")
}
}
首先,跟選舉有關(guān)的消息類型是LeadershipMessage
而消息又根據(jù)行為分為兩種,proposal和declaration,前者是提案表達(dá)我要參加選舉,后者是聲明表示我已經(jīng)當(dāng)選
搞清楚這些,那么這里就知道是什么意思了。
- 首先如果是提案消息的話,先收集下來,為選舉做準(zhǔn)備,具體怎么用后面再講
- 如果有聲明消息過來,說明有新的leader選舉產(chǎn)生,那么如果接收點(diǎn)是上屆leader,需要交出leader權(quán)限。
- interruptChan這里需要注意,有兩個地方會等待,一是選舉的過程中如果收到這個通知,說明有新的選舉結(jié)果產(chǎn)生,有可能是自己當(dāng)選,也有可能是別人。
- 二是成為leader之后如果收到這個通知,說明有新的leader產(chǎn)生,可能需要換屆。
func (le *leaderElectionSvcImpl) stopBeingLeader() { le.logger.Info(le.id, "Stopped being a leader") atomic.StoreInt32(&le.isLeader, int32(0)) le.callback(false) }
- 看起來也很簡單,只是設(shè)置isLeader的標(biāo)志位而已。
- 下面我們看下callback在干嘛?
callback
func (g *gossipServiceImpl) onStatusChangeFactory(chainID string, committer blocksprovider.LedgerInfo) func(bool) {
return func(isLeader bool) {
if isLeader {
yield := func() {
g.lock.RLock()
le := g.leaderElection[chainID]
g.lock.RUnlock()
le.Yield()
}
if err := g.deliveryService[chainID].StartDeliverForChannel(chainID, committer, yield); err != nil {
}
} else {
if err := g.deliveryService[chainID].StopDeliverForChannel(chainID); err != nil {
}
}
}
}
- 如果看過orderer篇的應(yīng)該知道Delivery service,用來拉取order的block,這里很關(guān)鍵,不做leader,不是簡單標(biāo)識下就完了,需要交出代成員接收block的權(quán)力。
- 同一個組織內(nèi)的peer節(jié)點(diǎn)的block同步是靠leader來運(yùn)作的。
- 這里有個地方需要注意的是yield,只在當(dāng)前節(jié)點(diǎn)是leader的情況下進(jìn)來。當(dāng)選舉結(jié)果需要換屆時,不光是交出了delivery的權(quán)力,還對le本身設(shè)置了yield的標(biāo)志位。
waitForMembershipStabilization
func (le *leaderElectionSvcImpl) waitForMembershipStabilization(timeLimit time.Duration) {
le.logger.Debug(le.id, ": Entering")
defer le.logger.Debug(le.id, ": Exiting, peers found", len(le.adapter.Peers()))
endTime := time.Now().Add(timeLimit)
viewSize := len(le.adapter.Peers())
for !le.shouldStop() {
time.Sleep(getMembershipSampleInterval())
newSize := len(le.adapter.Peers())
if newSize == viewSize || time.Now().After(endTime) || le.isLeaderExists() {
return
}
viewSize = newSize
}
}
這個方法很有意思,其目的是等待成員穩(wěn)定。這里三種方式來退出這個黑洞。
- 隔一段時間,來比對前后的peer列表個數(shù)是否一致,一致就說明穩(wěn)定
- 當(dāng)然了,不能沒完沒了的等待一致,如果到達(dá)deadline,也強(qiáng)行退出
- 或者有l(wèi)eader選出,也直接退出
run
func (le *leaderElectionSvcImpl) run() {
defer le.stopWG.Done()
for !le.shouldStop() {
if !le.isLeaderExists() {
le.leaderElection()
}
// If we are yielding and some leader has been elected,
// stop yielding
if le.isLeaderExists() && le.isYielding() {
le.stopYielding()
}
if le.shouldStop() {
return
}
if le.IsLeader() {
le.leader()
} else {
le.follower()
}
}
}
- 我們下面分步來拆解
- 這里的邏輯基于上面handleMessage的結(jié)論進(jìn)行進(jìn)一步處理
- 如果當(dāng)前沒有l(wèi)eader產(chǎn)生,那么立即發(fā)起選舉,并且自薦給其他好朋友。這里后面詳細(xì)講。
- 如果當(dāng)前有l(wèi)eader存在,而且當(dāng)前節(jié)點(diǎn)是yield狀態(tài),停止這個過度狀態(tài)。
- 如果已經(jīng)是leader,那么leader
- 否則follower,這些后面會詳細(xì)分析
leaderElection
func (le *leaderElectionSvcImpl) leaderElection() {
le.logger.Debug(le.id, ": Entering")
defer le.logger.Debug(le.id, ": Exiting")
// If we're yielding to other peers, do not participate
// in leader election
if le.isYielding() {
return
}
// Propose ourselves as a leader
le.propose()
// Collect other proposals
le.waitForInterrupt(getLeaderElectionDuration())
// If someone declared itself as a leader, give up
// on trying to become a leader too
if le.isLeaderExists() {
le.logger.Info(le.id, ": Some peer is already a leader")
return
}
if le.isYielding() {
le.logger.Debug(le.id, ": Aborting leader election because yielding")
return
}
// Leader doesn't exist, let's see if there is a better candidate than us
// for being a leader
for _, o := range le.proposals.ToArray() {
id := o.(string)
if bytes.Compare(peerID(id), le.id) < 0 {
return
}
}
// If we got here, there is no one that proposed being a leader
// that's a better candidate than us.
le.beLeader()
atomic.StoreInt32(&le.leaderExists, int32(1))
}
如果是yield狀態(tài),說明他正在移交權(quán)力給別人,所以不要參與選舉。
給其他人進(jìn)行自薦,發(fā)出選舉提案
等待interruptChan,這里前面提到過,等到的話,說明有新的選舉結(jié)果產(chǎn)生。當(dāng)然了,如果等不到,還有超時會觸發(fā)。
接下來,如果當(dāng)前l(fā)eader已經(jīng)產(chǎn)生,直接返回,因為選舉已經(jīng)結(jié)束。
這里又判斷一次yield,因為是新的階段,前面是發(fā)起選舉前,這里是收到選舉結(jié)果后。
重點(diǎn)來了,接下來是怎么從眾多提案中找到有資格擔(dān)任leader的節(jié)點(diǎn)。
算法也很簡單,就是比大小,bytes.Compare(peerID(id), le.id),從收集到的proposals里面看自己是不是最小的。如果是,做beLeader。然后標(biāo)識leader已經(jīng)產(chǎn)生。
func (le *leaderElectionSvcImpl) beLeader() { le.logger.Info(le.id, ": Becoming a leader") atomic.StoreInt32(&le.isLeader, int32(1)) le.callback(true) }
- 成為leader的結(jié)果,一是托管從orderer拉取block的任務(wù),二是標(biāo)識自己是leader。
leader
func (le *leaderElectionSvcImpl) leader() {
leaderDeclaration := le.adapter.CreateMessage(true)
le.adapter.Gossip(leaderDeclaration)
le.waitForInterrupt(getLeadershipDeclarationInterval())
}
- 首先當(dāng)leader不光是前面說的那些任務(wù),還要跟別人分享?;貞浵翿aft選舉算法的核心,怎么跟別人維持自己的leader地位,是需要定時發(fā)心跳的。而這里心跳變成了leaderDeclaration。
- 接下來就是waitForInterrupt,當(dāng)然了有新的leader發(fā)來的Declaration消息會通知到這里,另外,里面每次都會sleep一段時間。再加上外面的for循環(huán),那么效果就是每隔一段時間去發(fā)declaration去維持leader的地位,這不就是心跳么?
follower
func (le *leaderElectionSvcImpl) follower() {
le.logger.Debug(le.id, ": Entering")
defer le.logger.Debug(le.id, ": Exiting")
le.proposals.Clear()
atomic.StoreInt32(&le.leaderExists, int32(0))
select {
case <-time.After(getLeaderAliveThreshold()):
case <-le.stopChan:
le.stopChan <- struct{}{}
}
}
- 這里還是有講究的,首先走到這里說明這輪選舉很不幸已經(jīng)結(jié)束,該節(jié)點(diǎn)沒有選上。沒關(guān)系,再接再厲,清理掉這一輪的選舉提案,準(zhǔn)備為下輪做準(zhǔn)備。
- 這里很關(guān)鍵的設(shè)置了leaderExists為0,說明默認(rèn)認(rèn)為leader不存在。有兩個時間你需要搞清楚。
- peer.gossip.election.leaderElectionDuration=5s
- peer.gossip.election.leaderAliveThreshold=10s
- 我們前面講過了,leaderElectionDuration是leader發(fā)起心跳的間隔,而這里用到的leaderAliveThreshold是指leader的存活時間
- 這里就很清楚了,5秒內(nèi)我能收到心跳,那么就認(rèn)為leader是好的,否則超過10s,當(dāng)前還沒有l(wèi)eader存在,說明leader大概率掛了,需要重新選舉。這里返回后,會重新leaderElection。