前言
選舉是Raft實(shí)現(xiàn)數(shù)據(jù)一致性的安全保證,一個(gè)raft集群能夠正常運(yùn)行,必須有且僅有一個(gè)Leader存在,一次成功選舉是集群能夠正常運(yùn)行的前提。Raft協(xié)議對(duì)選舉的定義和安全性保證請(qǐng)參考之前的Raft選舉原理解析[傳送門]。這篇文章通過(guò)解析etcd的源碼來(lái)看一下Raft集群選舉的具體實(shí)現(xiàn)。
心跳
Raft集群在正常運(yùn)行中是不會(huì)觸發(fā)選舉的,選舉只會(huì)發(fā)生在集群初次啟動(dòng)或者其它節(jié)點(diǎn)無(wú)法收到Leader心跳的情況下。初次啟動(dòng)比較好理解,因?yàn)閞aft節(jié)點(diǎn)在啟動(dòng)時(shí),默認(rèn)都是將自己設(shè)置為Follower。收不到Leader心跳有兩種情況,一種是原來(lái)的Leader機(jī)器Crash了,還有一種是發(fā)生網(wǎng)絡(luò)分區(qū),F(xiàn)ollower跟Leader之間的網(wǎng)絡(luò)斷了,F(xiàn)ollower以為L(zhǎng)eader宕機(jī)了。下面先看一下集群一切正常時(shí),心跳是怎么流轉(zhuǎn)的。
心跳觸發(fā)
EtcdServer定時(shí)觸發(fā)
Raft集群運(yùn)行時(shí)Leader需要定時(shí)的發(fā)送心跳給所有Follower。在etcd中,通過(guò)EtcdServer中的定時(shí)器定時(shí)觸發(fā)Raft模塊來(lái)實(shí)現(xiàn),這個(gè)定時(shí)器在raftNode.start()中啟動(dòng)的。
func (r *raftNode) start(rh *raftReadyHandler) {
internalTimeout := time.Second
go func() {
defer r.onStop()
islead := false
for {
select {
//通過(guò)go中的ticker觸發(fā)
case <-r.ticker.C:
r.tick()
case rd := <-r.Ready():
...
...
}
}
...
}()
}
func (r *raftNode) tick() {
r.tickMu.Lock()
r.Tick() //調(diào)用node.Tick()
r.tickMu.Unlock()
}
raftNode在啟動(dòng)時(shí)會(huì)啟動(dòng)一個(gè)go routine循環(huán)監(jiān)聽(tīng)ticker定時(shí)器的channel,時(shí)間到時(shí)則調(diào)用它的tick()方法。這個(gè)ticker定時(shí)器的周期即用戶設(shè)置的raft集群的心跳周期。raftNode中組合了一個(gè)raft.Node,所以tick()方法只是加了互斥鎖之后就調(diào)用的raft.Node.Tick()方法。也就是說(shuō)觸發(fā)由EtcdServer來(lái)做,邏輯由raft模塊來(lái)處理。
raft模塊處理
Node的Tick()方法只是簡(jiǎn)單的寫個(gè)空對(duì)象到tickc的channel中,這個(gè)前一篇講過(guò),Node接口的實(shí)現(xiàn)類大部分操作都是異步完成的。
func (n *node) Tick() {
select {
//寫空對(duì)象到tickc channel異步執(zhí)行
case n.tickc <- struct{}{}:
case <-n.done:
default:
n.rn.raft.logger.Warningf("%x (leader %v) A tick missed to fire. Node blocks too long!", n.rn.raft.id, n.rn.raft.id == n.rn.raft.lead)
}
}
node在啟動(dòng)時(shí)會(huì)啟動(dòng)一個(gè)go routine監(jiān)聽(tīng)tickc(在node.run()方法中)。收到觸發(fā)后直接調(diào)用的RawNode.Tick()方法,而RawNode又調(diào)用了raft.tick()。
//node啟動(dòng)時(shí)會(huì)在一個(gè)新的go routine中運(yùn)行run()
func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready
r := n.rn.raft
lead := None
//一直循環(huán)查看各個(gè)channel
for {
...
...
select {
//監(jiān)聽(tīng)到觸發(fā),則調(diào)用RawNode.Tick()
case <-n.tickc:
n.rn.Tick()
...
}
}
}
//RawNode.Tick()
func (rn *RawNode) Tick() {
//直接調(diào)用raft.tick()
rn.raft.tick()
}
Leader tick()邏輯
在上一篇文章中講過(guò),raft的tick屬性是函數(shù)類型,當(dāng)節(jié)點(diǎn)的角色是Leader時(shí),tick指向的是raft.tickHeartbeat()。
func (r *raft) tickHeartbeat() {
//1. 心跳計(jì)數(shù)+1
r.heartbeatElapsed++
r.electionElapsed++
// 選舉超時(shí)控制
if r.electionElapsed >= r.electionTimeout {
r.electionElapsed = 0
if r.checkQuorum {
r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
}
// Leader轉(zhuǎn)讓
if r.state == StateLeader && r.leadTransferee != None {
r.abortLeaderTransfer()
}
}
//2. 如果當(dāng)前已經(jīng)不是Leader了,跳過(guò)
if r.state != StateLeader {
return
}
//3. 如果心跳計(jì)數(shù)大于心跳超時(shí),則發(fā)送心跳消息
if r.heartbeatElapsed >= r.heartbeatTimeout {
//心跳計(jì)數(shù)清0
r.heartbeatElapsed = 0
//調(diào)用Step()發(fā)送心跳
r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
}
}
- 第1步首先Leader將心跳計(jì)數(shù)加1,上一篇講過(guò),etcd在記錄超時(shí)時(shí)不是以標(biāo)準(zhǔn)時(shí)間記錄的,而是記錄的心跳間隔的倍數(shù)。所以EtcdServer每觸發(fā)一次
tick(),心跳計(jì)數(shù)+1,代表距離上次發(fā)送心跳又過(guò)了1個(gè)心跳時(shí)間 - 第2步關(guān)于選舉超時(shí)和轉(zhuǎn)讓的邏輯先跳過(guò)
- 第3步中判斷是否應(yīng)該發(fā)送心跳,
heartbeatTimeout的意思是Leader應(yīng)該經(jīng)過(guò)幾次心跳時(shí)間后必須發(fā)送一次心跳。etcd中heartbeatTimeout的默認(rèn)值是1,也就是說(shuō)其實(shí)每次進(jìn)到這個(gè)方法heartbeatElapsed >= heartbeatTimeout都是成立的。當(dāng)判斷需要發(fā)送心跳時(shí),會(huì)封裝一個(gè)MsgBeat的消息提交Step方法處理,處理邏輯下面再說(shuō),先看下Follower的tick()方法。
用戶也可以把
heartbeatTimeout這個(gè)值設(shè)的很大,當(dāng)然這樣在Leader宕機(jī)到觸發(fā)重新選舉的間隔會(huì)長(zhǎng)一些。在網(wǎng)絡(luò)狀況不好的時(shí)候可以這樣設(shè)置。
Follower tick()邏輯
當(dāng)節(jié)點(diǎn)角色是Follower或者Candidate的時(shí)候,tick指向的是tickElection()
func (r *raft) tickElection() {
//選舉計(jì)數(shù)加1
r.electionElapsed++
//判斷是否超時(shí),要發(fā)起重新選舉
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
以上的邏輯很簡(jiǎn)單,因?yàn)閷?duì)于Follower來(lái)說(shuō),唯一需要關(guān)心得就是是不是很久都沒(méi)收到Leader的心跳了。所以每次tick都將選舉計(jì)數(shù)+1,當(dāng)Follower收到Leader心跳的時(shí)候會(huì)將electionElapsed清0。如果Follower收不到Leader的心跳,electionElapsed就會(huì)一直加到超過(guò)選舉超時(shí),就發(fā)起選舉。發(fā)起選舉的邏輯下面再說(shuō)。
Leader心跳發(fā)送
raft消息封裝
上面講到Leader在收到Tick請(qǐng)求后,會(huì)提交一個(gè)MsgBeat的消息給到Step()方法,對(duì)于心跳消息,會(huì)直接調(diào)用step指向的函數(shù)。跟tick一樣,step也是個(gè)函數(shù)類型,在節(jié)點(diǎn)為L(zhǎng)eader時(shí),它指向的是stepLeader(),該函數(shù)中對(duì)于MsgBeat的消息會(huì)調(diào)用bcastHeartbeat()來(lái)給集群中每個(gè)Follower發(fā)送心跳消息。
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
//判斷對(duì)于心跳消息,則廣播心跳
case pb.MsgBeat:
r.bcastHeartbeat()
return nil
case pb.MsgCheckQuorum:
...
...
}
func (r *raft) bcastHeartbeat() {
lastCtx := r.readOnly.lastPendingRequestCtx()
if len(lastCtx) == 0 {
r.bcastHeartbeatWithCtx(nil)
} else {
r.bcastHeartbeatWithCtx([]byte(lastCtx))
}
}
// 廣播心跳消息至所有節(jié)點(diǎn)
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
r.prs.Visit(func(id uint64, _ *tracker.Progress) {
//排除Leader自己
if id == r.id {
return
}
//發(fā)送心跳
r.sendHeartbeat(id, ctx)
})
}
前一篇講過(guò)raft在prs屬性中保存了所有Follower的進(jìn)度信息,包含F(xiàn)ollower的id、同步日志的進(jìn)度等。所以上面的方法就是遍歷prs所有節(jié)點(diǎn),發(fā)送心跳消息。
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
//計(jì)算消息中帶的commitIndex
commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
//封裝成一個(gè)Type是MsgHeartbeat的消息,并帶上commitIndex
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Context: ctx,
}
//發(fā)送消息
r.send(m)
}
Raft協(xié)議中定義心跳消息和日志消息其實(shí)是一個(gè)格式的,只是心跳消息沒(méi)有帶日志條目,只會(huì)攜帶CommitIndex。Leader首先看Follower已經(jīng)接收成功的日志條目的Index,即Progress.Match字段,然后跟自己的CommitIndex比較,取值較小的那個(gè)。這是為了防止Follower的日志同步落后太多,CommitIndex處的日志還沒(méi)有同步到。
封裝好消息后,調(diào)用send()方法發(fā)送。raft本身并不負(fù)責(zé)消息發(fā)送,所以這個(gè)方法只是把消息放到一個(gè)隊(duì)列中,等待EtcdServer來(lái)獲取。
func (r *raft) send(m pb.Message) {
m.From = r.id
//數(shù)據(jù)校驗(yàn),選舉類消息必須帶term屬性
if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
if m.Term == 0 {
panic(fmt.Sprintf("term should be set when sending %s", m.Type))
}
} else {
//其它類消息不能帶term屬性
if m.Term != 0 {
panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
}
//除了日志和MsgReadIndex消息外,設(shè)置term為raft當(dāng)前周期
if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
m.Term = r.Term
}
}
//將消息放入隊(duì)列
r.msgs = append(r.msgs, m)
}
EtcdServer消息傳輸
上一步中,心跳消息被放入隊(duì)列,那這些消息是什么時(shí)候被發(fā)給集群中其它節(jié)點(diǎn)呢?發(fā)送操作是由EtcdServer啟動(dòng)時(shí)的go routine處理的。具體實(shí)現(xiàn)還是在raftNode.start()中。
func (r *raftNode) start(rh *raftReadyHandler) {
internalTimeout := time.Second
go func() {
defer r.onStop()
islead := false
for {
select {
case <-r.ticker.C:
r.tick()
//調(diào)用Node.Ready(),從返回的channel中獲取數(shù)據(jù)
case rd := <-r.Ready():
if rd.SoftState != nil {
// SoftState不為空的處理邏輯
}
if len(rd.ReadStates) != 0 {
//ReadStates不為空的處理邏輯
}
// 如果是Leader發(fā)送消息給Follower
if islead {
r.transport.Send(r.processMessages(rd.Messages))
}
...
...
//處理完畢調(diào)用Advance()方法
r.Advance()
case <-r.stopped:
return
}
}
}()
}
以上的邏輯中只包含心跳相關(guān)的,當(dāng)從Ready channel中讀到數(shù)據(jù)后,直接通過(guò)transport發(fā)送出去,這里的processMessages()除了對(duì)消息封裝成傳輸協(xié)議要求的格式,還會(huì)做超時(shí)控制。
發(fā)送完畢后無(wú)論成功失敗都會(huì)調(diào)用raft的Advance()方法處理后續(xù)邏輯。Leader一次心跳發(fā)送就算結(jié)束了。
Ready數(shù)據(jù)獲取
上面的邏輯中,心跳的message是被打包在Ready數(shù)據(jù)結(jié)構(gòu)中返回的,下面看一下數(shù)據(jù)打包的過(guò)程。既然Node.Ready()返回的是個(gè)channel,則必然有地方將Ready塞進(jìn)channel中,這段邏輯是在node.run()方法中。
func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready
r := n.rn.raft
lead := None
for {
if advancec != nil {
readyc = nil
} else if n.rn.HasReady() { //判斷是否有Ready數(shù)據(jù)
// 獲取Ready數(shù)據(jù)
rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}
....
select {
....
case readyc <- rd: //數(shù)據(jù)放入ready channel中
n.rn.acceptReady(rd) //告訴raft,ready數(shù)據(jù)已被接收
advancec = n.advancec //賦值A(chǔ)dvance channel等待Ready處理完成的消息
}
}
}
上面的代碼中Ready數(shù)據(jù)是通過(guò)調(diào)用RawNode.readyWithoutAccept()獲取到的。
func (rn *RawNode) readyWithoutAccept() Ready {
return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
}
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.unstableEntries(), //未持久化的日志
CommittedEntries: r.raftLog.nextEnts(), //已提交可以apply的日志
Messages: r.msgs, //raft隊(duì)列中所有的message
}
//判斷softState有沒(méi)有變化,有則賦值
if softSt := r.softState(); !softSt.equal(prevSoftSt) {
rd.SoftState = softSt
}
//判斷hardState有沒(méi)有變化,有則賦值
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
rd.HardState = hardSt
}
//判斷是不是收到snapshot
if r.raftLog.unstable.snapshot != nil {
rd.Snapshot = *r.raftLog.unstable.snapshot
}
if len(r.readStates) != 0 {
rd.ReadStates = r.readStates
}
//處理該Ready后是否需要做fsync,將數(shù)據(jù)強(qiáng)制刷盤
rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
return rd
}
對(duì)于心跳來(lái)說(shuō),上面最關(guān)鍵的操作就是生成Ready的時(shí)候,將msg放到Ready中。
Follower心跳處理
現(xiàn)在來(lái)到心跳的接收方,心跳消息到達(dá)Follower后,傳輸層會(huì)回調(diào)EtcdServer.Process方法,將心跳消息交給raft狀態(tài)機(jī)處理。
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if s.cluster.IsIDRemoved(types.ID(m.From)) {
//發(fā)送方已經(jīng)從集群中移除
}
if m.Type == raftpb.MsgApp {
//收到日志消息記錄metrics
}
//調(diào)用raft.Step處理消息
return s.r.Step(ctx, m)
}
對(duì)于Follower來(lái)說(shuō)Step仍然進(jìn)入的是stepFollower方法,第一步是將選舉計(jì)時(shí)清0,防止發(fā)起選舉流程。
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
...
case pb.MsgHeartbeat:
r.electionElapsed = 0 //選舉超時(shí)清0
r.lead = m.From //設(shè)置Lead為心跳發(fā)送方ID
r.handleHeartbeat(m) //處理心跳消息
...
}
return nil
}
func (r *raft) handleHeartbeat(m pb.Message) {
//設(shè)置commitIndex為L(zhǎng)eader傳來(lái)的最新值
r.raftLog.commitTo(m.Commit)
//發(fā)送Response給Leader
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}
Follower對(duì)心跳消息的處理很簡(jiǎn)單,1)選舉超時(shí)計(jì)時(shí)清0;2)設(shè)置commitIndex(會(huì)檢查本地的commitIndex是不是比leader發(fā)過(guò)來(lái)的?。?)回復(fù)Leader,回復(fù)的時(shí)候按照raft協(xié)議的要求帶上自己日志的進(jìn)度。
Leader心跳回復(fù)處理
Leader收到Follower的心跳回復(fù)后,跟所有消息的處理邏輯一樣,會(huì)進(jìn)入stepLeader()方法處理
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
...
...
case pb.MsgHeartbeatResp:
//記錄Follower為Active狀態(tài)
pr.RecentActive = true
pr.ProbeSent = false
if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
pr.Inflights.FreeFirstOne()
}
//有日志要發(fā)送,繼續(xù)發(fā)送
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
}
if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
}
//處理線性讀的邏輯
if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
return nil
}
rss := r.readOnly.advance(m)
for _, rs := range rss {
req := rs.req
if req.From == None || req.From == r.id { // from local member
r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
} else {
r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
}
}
}
}
Leader收到心跳回復(fù)后,會(huì)判斷是否有新的日志要發(fā)給Follower,有的話就繼續(xù)發(fā)送。線性讀的邏輯放在后面的文章解析。
選舉
正常情況下,Leader在每次tick()方法時(shí)發(fā)送心跳,網(wǎng)絡(luò)一切正常,F(xiàn)ollower收到心跳后將選舉計(jì)時(shí)清0,集群就這樣愉快的運(yùn)行下去了。但是分布式系統(tǒng)中,意外時(shí)必然會(huì)發(fā)生的,這時(shí)候Leader宕機(jī)了,就需要集群中其它節(jié)點(diǎn)站出來(lái),競(jìng)選成為新的Leader。
選舉觸發(fā)
首先來(lái)看一下Follower是怎么認(rèn)定Leader掛了的。當(dāng)raft節(jié)點(diǎn)角色是Follower的時(shí)候,EtcdServer每次觸發(fā)tick(),進(jìn)入的是tickElection()方法:
func (r *raft) tickElection() {
r.electionElapsed++
//判斷是否要發(fā)起選舉
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
func (r *raft) promotable() bool {
pr := r.prs.Progress[r.id]
return pr != nil && !pr.IsLearner
}
func (r *raft) pastElectionTimeout() bool {
//每次tick加1后和隨機(jī)選舉超時(shí)比較
return r.electionElapsed >= r.randomizedElectionTimeout
}
在每次tick()時(shí),都會(huì)檢查是否符合發(fā)起一次新的選舉的條件。其中promotable()比較簡(jiǎn)單,就是判斷自己當(dāng)前是不是還在集群中并且不能是Learner。pastElectionTimeout()判斷是否已經(jīng)超過(guò)選舉超時(shí)時(shí)間還沒(méi)收到Leader的心跳。這里比較的值是randomizedElectionTimeout,代表一個(gè)隨機(jī)選舉超時(shí)時(shí)間,使用隨機(jī)時(shí)間的原因是防止Leader心跳超時(shí)后所有Follower同時(shí)發(fā)起選舉。我們看下etcd中這個(gè)時(shí)間是怎么算的:
func (r *raft) resetRandomizedElectionTimeout() {
r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
}
可以看到這個(gè)時(shí)間是1個(gè)選舉超時(shí)到2個(gè)選舉超時(shí)之間的隨機(jī)值。每次開(kāi)啟一個(gè)新的term,這個(gè)reset方法都會(huì)被調(diào)用一次,所以在每個(gè)選舉周期這個(gè)隨機(jī)值都是不同的,最大限度防止重復(fù)。
發(fā)送選票
狀態(tài)轉(zhuǎn)換
上一步中Follower發(fā)現(xiàn)過(guò)了選舉超時(shí)還沒(méi)收到Leader心跳,觸發(fā)Step()方法讓raft狀態(tài)機(jī)進(jìn)行狀態(tài)轉(zhuǎn)換。
func (r *raft) Step(m pb.Message) error {
...
...
switch m.Type {
case pb.MsgHup:
if r.state != StateLeader {
if !r.promotable() {
//write log
return nil
}
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
//判斷有配置變更日志,如果集群正在做配置變更,則不發(fā)起選舉
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
//write log
return nil
}
//發(fā)起選舉,根據(jù)配置判斷選舉之前是否要做預(yù)投票
if r.preVote {
r.campaign(campaignPreElection)
} else {
r.campaign(campaignElection)
}
} else {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
}
}
Step方法收到要發(fā)起選舉的消息后(MsgHup),會(huì)首先判斷已經(jīng)提交的還沒(méi)生效的日志中有沒(méi)有集群變更,有的話說(shuō)明集群正在變更,則不發(fā)起選舉。這么做的原因是有可能當(dāng)前節(jié)點(diǎn)在集群變更后已經(jīng)被從集群中移除了。
然后,根據(jù)配置中設(shè)置的選舉是否需要先預(yù)選,來(lái)調(diào)用campaign()方法發(fā)起選舉。預(yù)選的原理放在本文最后說(shuō)。
func (r *raft) campaign(t CampaignType) {
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
//需要預(yù)選的情況
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
term = r.Term + 1
} else {
//1. 變?yōu)楹蜻x人,并初始化一條拉票的消息
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
//2.判斷是否已經(jīng)贏得選舉
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
//3. 收集所有集群中節(jié)點(diǎn)id
var ids []uint64
{
idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
}
//4. 輪詢所有節(jié)點(diǎn),給除自己之外的節(jié)點(diǎn)發(fā)送一條拉票的消息
for _, id := range ids {
if id == r.id {
continue
}
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
//5.根據(jù)raft協(xié)議的定義組合投票消息
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
上面的代碼分為如下幾步:
- 在發(fā)送投票之前,節(jié)點(diǎn)首先將自己的狀態(tài)置為候選人,這一步會(huì)把term加1,然后修改自己的Vote屬性為自己的id,表示當(dāng)前周期選票投給自己。becomeCandidate() 方法如下:
func (r *raft) becomeCandidate() {
if r.state == StateLeader {
panic("invalid transition [leader -> candidate]")
}
r.step = stepCandidate //step方法指向stepCandidate
r.reset(r.Term + 1) //選舉周期+1
r.tick = r.tickElection //tick方法指向tickElection
r.Vote = r.id //投票給自己
r.state = StateCandidate //節(jié)點(diǎn)狀態(tài)轉(zhuǎn)為候選人
}
- 第二步的判斷是為了兼容單節(jié)點(diǎn)集群的場(chǎng)景,不是真正的判斷是否收到半數(shù)以上選票。對(duì)于單個(gè)節(jié)點(diǎn),只要自己給自己投票了就已經(jīng)是Leader了。
3-5. 給集群中所有節(jié)點(diǎn)發(fā)送選票, 根據(jù)raft協(xié)議的定義,投票請(qǐng)求需要包含新的選舉周期,節(jié)點(diǎn)id和最新日志的Index。然后跟其它消息一樣調(diào)用send方法提交一條消息。
選票發(fā)送
消息發(fā)送和心跳消息一樣,也是放到raft的msg隊(duì)列中。EtcdServer拿到Ready后發(fā)送給集群中其它的節(jié)點(diǎn),整個(gè)步驟中沒(méi)有針對(duì)voteMsg做特殊處理。
Follower投票
當(dāng)候選人節(jié)點(diǎn)將選票消息發(fā)出以后,在node中會(huì)放入recvc,最終會(huì)調(diào)用raft.Step(m pb.Message)處理這條消息。到Step()方法之前,邏輯跟心跳沒(méi)有區(qū)別,就不重復(fù)了,下面看下Step方法的處理。
func (r *raft) Step(m pb.Message) error {
switch {
case m.Term == 0:
// local message
case m.Term > r.Term:
//1. 消息的term比當(dāng)前節(jié)點(diǎn)的大
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
//2. 判斷是否是人工操作的強(qiáng)制transfer
force := bytes.Equal(m.Context, []byte(campaignTransfer))
//3. 根據(jù)本地記錄判斷Leader是否已超時(shí)
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
if !force && inLease {
// 如果沒(méi)超時(shí),則直接不回復(fù)候選人
return nil
}
}
switch {
case m.Type == pb.MsgPreVote:
// 預(yù)選不修改term
case m.Type == pb.MsgPreVoteResp && !m.Reject:
// 預(yù)選的response
default:
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
//4-1. 如果收到了新的term的心跳、append或者snapshot,代表新的周期開(kāi)始,自己變成Follower
r.becomeFollower(m.Term, m.From)
} else {
//4-2. 如果收到了候選人 的投票請(qǐng)求,則說(shuō)明當(dāng)前進(jìn)入重新選舉階段,將Leader設(shè)置成None
r.becomeFollower(m.Term, None)
}
}
case m.Term < r.Term:
//處理收到的消息term小于當(dāng)前節(jié)點(diǎn)
}
switch m.Type {
case pb.MsgHup:
...
case pb.MsgVote, pb.MsgPreVote:
// 5-1. 判斷是否可以投票給候選人
canVote := r.Vote == m.From ||
(r.Vote == None && r.lead == None) ||
(m.Type == pb.MsgPreVote && m.Term > r.Term)
// 5-2. 判斷候選人的日志比當(dāng)前節(jié)點(diǎn)的新
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
//6-1. 回復(fù)候選人同意
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// 將Vote屬性改為候選人的id
r.electionElapsed = 0
r.Vote = m.From
}
} else {
//6-2. 回復(fù)候選人不同意
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
}
default:
...
}
return nil
}
節(jié)點(diǎn)收到投票的消息,處理前提是收到的Term比當(dāng)前的Term要大。
- ,如果是候選人發(fā)送的投票消息,首先會(huì)做一次校驗(yàn)。1到3步是判斷這次投票的消息不是Transfer消息,并且選舉也沒(méi)有超時(shí),則直接忽略掉。再次檢查最小選舉超時(shí)是為了防止集群中只有少部分節(jié)點(diǎn)收不到心跳,而其它節(jié)點(diǎn)心跳正常的情況,減少重新選舉的次數(shù)。
Transfer消息是etcd支持的人工發(fā)起的Leader轉(zhuǎn)移請(qǐng)求,這是為了在Leader機(jī)器性能不夠或者準(zhǔn)備下線時(shí),人工發(fā)起切換Leader
- 收到term比自己大的消息時(shí),有可能是有新的Leader當(dāng)選了,發(fā)送日志或者心跳消息出來(lái)。這種情況當(dāng)前節(jié)點(diǎn)無(wú)論處于什么狀態(tài)都應(yīng)該切換成Follower。如果是候選人的投票消息,則將自己的Leader設(shè)置成None,進(jìn)入選舉中階段。
5-1. 在收到候選人的投票消息后,必須滿足3種情況下的一種才可以投同意票,① 之前已經(jīng)投給這個(gè)候選人了,可能由于網(wǎng)絡(luò)的原因再次收到重復(fù)的消息;②當(dāng)前未給任何節(jié)點(diǎn)投過(guò)票,而且當(dāng)前的Leader是None(在低4步中設(shè)置的);③ 預(yù)選消息只需要判斷term就可以了
5-2.投同意票還有一個(gè)條件就是候選人的日志比當(dāng)前節(jié)點(diǎn)的新,raft中新的標(biāo)準(zhǔn)就是最后一條日志要么term更大,要么term相同Index更大
6-1. 第5步的條件都滿足后就可以回復(fù)同意給候選人了,同時(shí)將自己Vote值改為候選人的ID,這一步很關(guān)鍵,保證了同一個(gè)term中,只能投票給一個(gè)候選人
6-2. 如果第5步中的條件不滿足則拒絕候選人
選舉完成
對(duì)于候選人來(lái)說(shuō),選票發(fā)出去之后無(wú)非面臨如下2種結(jié)果:
- 失敗,規(guī)定時(shí)間內(nèi)沒(méi)有收到超過(guò)半數(shù)同意票
- 成功,規(guī)定時(shí)間內(nèi)收到超過(guò)半數(shù)同意票
處理選票回復(fù)
在集群網(wǎng)絡(luò)正常時(shí),候選人應(yīng)該很快會(huì)收到各個(gè)節(jié)點(diǎn)對(duì)她選票的回復(fù)。消息的處理在stepCandidate()函數(shù)中:
func stepCandidate(r *raft, m pb.Message) error {
var myVoteRespType pb.MessageType
if r.state == StatePreCandidate {
myVoteRespType = pb.MsgPreVoteResp
} else {
myVoteRespType = pb.MsgVoteResp
}
switch m.Type {
case pb.MsgProp:
//cadidate狀態(tài)下不接受客戶端數(shù)據(jù)修改請(qǐng)求
return ErrProposalDropped
case pb.MsgApp:
//投票期間收到日志,說(shuō)明其它節(jié)點(diǎn)成為L(zhǎng)eader
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
//投票期間收到心跳,說(shuō)明其它節(jié)點(diǎn)成為L(zhǎng)eader
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleHeartbeat(m)
case pb.MsgSnap:
//投票期間收到日志快照,說(shuō)明其它節(jié)點(diǎn)成為L(zhǎng)eader
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
case myVoteRespType:
//收到投票反饋,則記錄并判斷是否超過(guò)半數(shù)
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
switch res {
case quorum.VoteWon:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
//贏得選舉,則成為L(zhǎng)eader,開(kāi)始廣播心跳和日志
r.becomeLeader()
r.bcastAppend()
}
case quorum.VoteLost:
// 輸?shù)暨x舉,重新變成Follower
r.becomeFollower(r.Term, None)
}
case pb.MsgTimeoutNow:
...
}
return nil
}
當(dāng)發(fā)起選舉的節(jié)點(diǎn)收到消息時(shí),如果消息是日志、快照或者心跳消息,說(shuō)明別的節(jié)點(diǎn)已經(jīng)成為L(zhǎng)eader,它已經(jīng)輸?shù)袅?,則直接成為Follower。
如果收到的是其它節(jié)點(diǎn)的投票回復(fù),則會(huì)統(tǒng)計(jì)自己的選票,如果超過(guò)半數(shù)支持,是則成為L(zhǎng)eader;超過(guò)半數(shù)拒絕,則回到Follower狀態(tài)等待下個(gè)選舉超時(shí)。如果都沒(méi)到,則繼續(xù)等。
在贏得選舉成為L(zhǎng)eader的情況下,根據(jù)raft協(xié)議,需要馬上開(kāi)始發(fā)送心跳,以防止其它Follower開(kāi)始新的選舉。
超時(shí)失敗
除了以上候選人節(jié)點(diǎn)在收到明確的消息時(shí),可以判斷自己是否成功之外,還有另外一種場(chǎng)景。比如一個(gè)節(jié)點(diǎn)和其它節(jié)點(diǎn)網(wǎng)絡(luò)狀況不佳或者是多個(gè)節(jié)點(diǎn)同時(shí)成為候選人。這種場(chǎng)景下,即不會(huì)收到足夠的投票,也沒(méi)有收到別人成為L(zhǎng)eader消息,為了防止節(jié)點(diǎn)一直等下去,需要一個(gè)超時(shí)的機(jī)制。
etcd在整個(gè)選票發(fā)送及等待選票的過(guò)程中,tick()方法是一直在運(yùn)行的,如果自己一直沒(méi)有當(dāng)選,別人也沒(méi)當(dāng)選超時(shí)的話。候選人會(huì)發(fā)起重新一輪的選舉,邏輯跟第一輪是一樣的?;仡櫹逻@個(gè)方法,其實(shí)無(wú)論是處于Follower還是Candidate狀態(tài),tick的邏輯是一樣的。
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
預(yù)選舉
什么是預(yù)選
etcd中,raft狀態(tài)機(jī)新增了一個(gè)狀態(tài)叫做預(yù)候選人。如果用戶在啟動(dòng)etcd時(shí)配置了PreVote屬性為true,則每次選舉開(kāi)始之前,都會(huì)先來(lái)一輪預(yù)選。所謂預(yù)選,就是節(jié)點(diǎn)在成為正式候選人之前,先發(fā)送一個(gè)預(yù)選的消息給集群內(nèi)所有節(jié)點(diǎn)(MsgPreVote),如果超過(guò)半數(shù)節(jié)點(diǎn)都同意,候選人才會(huì)開(kāi)始一次正式的選舉。
在預(yù)選階段,候選節(jié)點(diǎn)的狀態(tài)變?yōu)?code>PreCadidate。而其它節(jié)點(diǎn)仍然保持原來(lái)的狀態(tài),也就是說(shuō)這時(shí)候又有其它Follower要發(fā)起選舉并發(fā)送預(yù)選請(qǐng)求,其它節(jié)點(diǎn)也是會(huì)同意的。
當(dāng)進(jìn)入預(yù)選狀態(tài)的節(jié)點(diǎn),收到超過(guò)半數(shù)同意后,則正式進(jìn)入候選人狀態(tài)(Candidate)
為什么需要預(yù)選
添加預(yù)選的原因是為了在網(wǎng)絡(luò)狀況不佳時(shí),減少選舉次數(shù)。舉個(gè)具體場(chǎng)景,當(dāng)集群中的網(wǎng)絡(luò)不穩(wěn)定時(shí),會(huì)有部分Follower不能及時(shí)地收到Leader的心跳,這時(shí)候就會(huì)有Follower發(fā)起選舉。但是網(wǎng)絡(luò)原因,它自身也很難拿到超過(guò)半數(shù)選票當(dāng)選,或者當(dāng)選之后也很快就會(huì)有別的節(jié)點(diǎn)因?yàn)槭詹坏叫奶俅伟l(fā)起選舉,這就導(dǎo)致了集群經(jīng)常處于選舉狀態(tài)而不可用。為了防止這種情況的頻繁發(fā)生,添加預(yù)選階段,等于把Leader掛掉這件事從單個(gè)節(jié)點(diǎn)自己判斷,變成了半數(shù)節(jié)點(diǎn)一起判斷,大大減少了誤判。
凡事都有利有弊,當(dāng)Leader雖然沒(méi)掛掉,但性能有問(wèn)題時(shí),可能只影響了不到一半的節(jié)點(diǎn)。添加預(yù)選之后可能會(huì)導(dǎo)致性能不佳的Leader很難被選下去,從而影響讀寫性能。
總結(jié)
選舉是保證raft安全性的基礎(chǔ),心跳是保證集群能夠盡快從Leader宕機(jī)或者網(wǎng)絡(luò)分區(qū)中恢復(fù)的關(guān)鍵。etcd中將心跳和計(jì)時(shí)做了集成,抽象成tick。tick操作在Leader端用來(lái)觸發(fā)raft定時(shí)發(fā)送心跳,而在Follower端是為了觸發(fā)檢查L(zhǎng)eader是否超時(shí)。
Raft模塊通過(guò)tick操作來(lái)觸發(fā)狀態(tài)機(jī)在不同狀態(tài)中的轉(zhuǎn)換,通過(guò)綁定不同的函數(shù)來(lái)對(duì)消息進(jìn)行處理和反饋。
選舉完成后,etcd就可以通過(guò)在集群中復(fù)制日志來(lái)保證用戶對(duì)數(shù)據(jù)讀寫的分布式保存和一致性保證了。下一篇將重點(diǎn)解析etcd中日志復(fù)制和提交生效的過(guò)程。
【鏈接】
Raft協(xié)議實(shí)現(xiàn)之etcd(一):基本架構(gòu)
分布式一致性協(xié)議-Raft詳解 (一) 選舉