Raft協(xié)議實(shí)現(xiàn)之etcd(二):心跳及選舉

前言

選舉是Raft實(shí)現(xiàn)數(shù)據(jù)一致性的安全保證,一個(gè)raft集群能夠正常運(yùn)行,必須有且僅有一個(gè)Leader存在,一次成功選舉是集群能夠正常運(yùn)行的前提。Raft協(xié)議對(duì)選舉的定義和安全性保證請(qǐng)參考之前的Raft選舉原理解析[傳送門]。這篇文章通過(guò)解析etcd的源碼來(lái)看一下Raft集群選舉的具體實(shí)現(xiàn)。

心跳

Raft集群在正常運(yùn)行中是不會(huì)觸發(fā)選舉的,選舉只會(huì)發(fā)生在集群初次啟動(dòng)或者其它節(jié)點(diǎn)無(wú)法收到Leader心跳的情況下。初次啟動(dòng)比較好理解,因?yàn)閞aft節(jié)點(diǎn)在啟動(dòng)時(shí),默認(rèn)都是將自己設(shè)置為Follower。收不到Leader心跳有兩種情況,一種是原來(lái)的Leader機(jī)器Crash了,還有一種是發(fā)生網(wǎng)絡(luò)分區(qū),F(xiàn)ollower跟Leader之間的網(wǎng)絡(luò)斷了,F(xiàn)ollower以為L(zhǎng)eader宕機(jī)了。下面先看一下集群一切正常時(shí),心跳是怎么流轉(zhuǎn)的。

心跳觸發(fā)

EtcdServer定時(shí)觸發(fā)
Raft集群運(yùn)行時(shí)Leader需要定時(shí)的發(fā)送心跳給所有Follower。在etcd中,通過(guò)EtcdServer中的定時(shí)器定時(shí)觸發(fā)Raft模塊來(lái)實(shí)現(xiàn),這個(gè)定時(shí)器在raftNode.start()中啟動(dòng)的。

func (r *raftNode) start(rh *raftReadyHandler) {
    internalTimeout := time.Second
    go func() {
        defer r.onStop()
        islead := false

        for {
            select {
           //通過(guò)go中的ticker觸發(fā)
            case <-r.ticker.C:
                r.tick()
            case rd := <-r.Ready():
                ...
                ...
           }
        }
       ...
   }()
}

func (r *raftNode) tick() {
    r.tickMu.Lock()
    r.Tick() //調(diào)用node.Tick()
    r.tickMu.Unlock()
}

raftNode在啟動(dòng)時(shí)會(huì)啟動(dòng)一個(gè)go routine循環(huán)監(jiān)聽(tīng)ticker定時(shí)器的channel,時(shí)間到時(shí)則調(diào)用它的tick()方法。這個(gè)ticker定時(shí)器的周期即用戶設(shè)置的raft集群的心跳周期。raftNode中組合了一個(gè)raft.Node,所以tick()方法只是加了互斥鎖之后就調(diào)用的raft.Node.Tick()方法。也就是說(shuō)觸發(fā)由EtcdServer來(lái)做,邏輯由raft模塊來(lái)處理。
raft模塊處理
Node的Tick()方法只是簡(jiǎn)單的寫個(gè)空對(duì)象到tickc的channel中,這個(gè)前一篇講過(guò),Node接口的實(shí)現(xiàn)類大部分操作都是異步完成的。

func (n *node) Tick() {
    select {
    //寫空對(duì)象到tickc channel異步執(zhí)行
    case n.tickc <- struct{}{}:
    case <-n.done:
    default:
        n.rn.raft.logger.Warningf("%x (leader %v) A tick missed to fire. Node blocks too long!", n.rn.raft.id, n.rn.raft.id == n.rn.raft.lead)
    }
}

node在啟動(dòng)時(shí)會(huì)啟動(dòng)一個(gè)go routine監(jiān)聽(tīng)tickc(在node.run()方法中)。收到觸發(fā)后直接調(diào)用的RawNode.Tick()方法,而RawNode又調(diào)用了raft.tick()。

//node啟動(dòng)時(shí)會(huì)在一個(gè)新的go routine中運(yùn)行run()
func (n *node) run() {
    var propc chan msgWithResult
    var readyc chan Ready
    var advancec chan struct{}
    var rd Ready
    r := n.rn.raft
    lead := None
   //一直循環(huán)查看各個(gè)channel
    for {
            ...
            ...
            select {
                //監(jiān)聽(tīng)到觸發(fā),則調(diào)用RawNode.Tick()
                case <-n.tickc:
                    n.rn.Tick()
                ...
            }
     }
}

//RawNode.Tick()
func (rn *RawNode) Tick() {
    //直接調(diào)用raft.tick()
    rn.raft.tick()
}

Leader tick()邏輯
在上一篇文章中講過(guò),raft的tick屬性是函數(shù)類型,當(dāng)節(jié)點(diǎn)的角色是Leader時(shí),tick指向的是raft.tickHeartbeat()。

func (r *raft) tickHeartbeat() {
    //1. 心跳計(jì)數(shù)+1
    r.heartbeatElapsed++
    r.electionElapsed++
    // 選舉超時(shí)控制  
    if r.electionElapsed >= r.electionTimeout {
        r.electionElapsed = 0
        if r.checkQuorum {
            r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
        }
        // Leader轉(zhuǎn)讓
        if r.state == StateLeader && r.leadTransferee != None {
            r.abortLeaderTransfer()
        }
    }
     //2. 如果當(dāng)前已經(jīng)不是Leader了,跳過(guò)
    if r.state != StateLeader {
        return
    }
     //3. 如果心跳計(jì)數(shù)大于心跳超時(shí),則發(fā)送心跳消息
    if r.heartbeatElapsed >= r.heartbeatTimeout {
        //心跳計(jì)數(shù)清0
        r.heartbeatElapsed = 0
        //調(diào)用Step()發(fā)送心跳
        r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
    }
}
  • 第1步首先Leader將心跳計(jì)數(shù)加1,上一篇講過(guò),etcd在記錄超時(shí)時(shí)不是以標(biāo)準(zhǔn)時(shí)間記錄的,而是記錄的心跳間隔的倍數(shù)。所以EtcdServer每觸發(fā)一次tick(),心跳計(jì)數(shù)+1,代表距離上次發(fā)送心跳又過(guò)了1個(gè)心跳時(shí)間
  • 第2步關(guān)于選舉超時(shí)和轉(zhuǎn)讓的邏輯先跳過(guò)
  • 第3步中判斷是否應(yīng)該發(fā)送心跳,heartbeatTimeout的意思是Leader應(yīng)該經(jīng)過(guò)幾次心跳時(shí)間后必須發(fā)送一次心跳。etcd中heartbeatTimeout的默認(rèn)值是1,也就是說(shuō)其實(shí)每次進(jìn)到這個(gè)方法heartbeatElapsed >= heartbeatTimeout都是成立的。當(dāng)判斷需要發(fā)送心跳時(shí),會(huì)封裝一個(gè)MsgBeat的消息提交Step方法處理,處理邏輯下面再說(shuō),先看下Follower的tick()方法。

用戶也可以把heartbeatTimeout這個(gè)值設(shè)的很大,當(dāng)然這樣在Leader宕機(jī)到觸發(fā)重新選舉的間隔會(huì)長(zhǎng)一些。在網(wǎng)絡(luò)狀況不好的時(shí)候可以這樣設(shè)置。

Follower tick()邏輯
當(dāng)節(jié)點(diǎn)角色是Follower或者Candidate的時(shí)候,tick指向的是tickElection()

func (r *raft) tickElection() {
    //選舉計(jì)數(shù)加1
    r.electionElapsed++
    //判斷是否超時(shí),要發(fā)起重新選舉
    if r.promotable() && r.pastElectionTimeout() {
        r.electionElapsed = 0
        r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
    }
}

以上的邏輯很簡(jiǎn)單,因?yàn)閷?duì)于Follower來(lái)說(shuō),唯一需要關(guān)心得就是是不是很久都沒(méi)收到Leader的心跳了。所以每次tick都將選舉計(jì)數(shù)+1,當(dāng)Follower收到Leader心跳的時(shí)候會(huì)將electionElapsed清0。如果Follower收不到Leader的心跳,electionElapsed就會(huì)一直加到超過(guò)選舉超時(shí),就發(fā)起選舉。發(fā)起選舉的邏輯下面再說(shuō)。

Leader心跳發(fā)送

raft消息封裝
上面講到Leader在收到Tick請(qǐng)求后,會(huì)提交一個(gè)MsgBeat的消息給到Step()方法,對(duì)于心跳消息,會(huì)直接調(diào)用step指向的函數(shù)。跟tick一樣,step也是個(gè)函數(shù)類型,在節(jié)點(diǎn)為L(zhǎng)eader時(shí),它指向的是stepLeader(),該函數(shù)中對(duì)于MsgBeat的消息會(huì)調(diào)用bcastHeartbeat()來(lái)給集群中每個(gè)Follower發(fā)送心跳消息。

func stepLeader(r *raft, m pb.Message) error {
    switch m.Type {
    //判斷對(duì)于心跳消息,則廣播心跳
    case pb.MsgBeat:
        r.bcastHeartbeat()
        return nil
        case pb.MsgCheckQuorum:
        ...
        ...
}

func (r *raft) bcastHeartbeat() {
    lastCtx := r.readOnly.lastPendingRequestCtx()
    if len(lastCtx) == 0 {
        r.bcastHeartbeatWithCtx(nil)
    } else {
        r.bcastHeartbeatWithCtx([]byte(lastCtx))
    }
}
// 廣播心跳消息至所有節(jié)點(diǎn)
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
    r.prs.Visit(func(id uint64, _ *tracker.Progress) {
        //排除Leader自己
        if id == r.id {
            return
        }
       //發(fā)送心跳
        r.sendHeartbeat(id, ctx)
    })
}

前一篇講過(guò)raft在prs屬性中保存了所有Follower的進(jìn)度信息,包含F(xiàn)ollower的id、同步日志的進(jìn)度等。所以上面的方法就是遍歷prs所有節(jié)點(diǎn),發(fā)送心跳消息。

func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
    //計(jì)算消息中帶的commitIndex
    commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
       //封裝成一個(gè)Type是MsgHeartbeat的消息,并帶上commitIndex
    m := pb.Message{
        To:      to,
        Type:    pb.MsgHeartbeat,
        Commit:  commit,
        Context: ctx,
    }
       //發(fā)送消息
    r.send(m)
}

Raft協(xié)議中定義心跳消息和日志消息其實(shí)是一個(gè)格式的,只是心跳消息沒(méi)有帶日志條目,只會(huì)攜帶CommitIndex。Leader首先看Follower已經(jīng)接收成功的日志條目的Index,即Progress.Match字段,然后跟自己的CommitIndex比較,取值較小的那個(gè)。這是為了防止Follower的日志同步落后太多,CommitIndex處的日志還沒(méi)有同步到。
封裝好消息后,調(diào)用send()方法發(fā)送。raft本身并不負(fù)責(zé)消息發(fā)送,所以這個(gè)方法只是把消息放到一個(gè)隊(duì)列中,等待EtcdServer來(lái)獲取。

func (r *raft) send(m pb.Message) {
    m.From = r.id
    //數(shù)據(jù)校驗(yàn),選舉類消息必須帶term屬性
    if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
        if m.Term == 0 {
            panic(fmt.Sprintf("term should be set when sending %s", m.Type))
        }
    } else {
         //其它類消息不能帶term屬性
        if m.Term != 0 {
            panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
        }
       //除了日志和MsgReadIndex消息外,設(shè)置term為raft當(dāng)前周期
        if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
            m.Term = r.Term
        }
    }
    //將消息放入隊(duì)列
    r.msgs = append(r.msgs, m)
}

EtcdServer消息傳輸
上一步中,心跳消息被放入隊(duì)列,那這些消息是什么時(shí)候被發(fā)給集群中其它節(jié)點(diǎn)呢?發(fā)送操作是由EtcdServer啟動(dòng)時(shí)的go routine處理的。具體實(shí)現(xiàn)還是在raftNode.start()中。

func (r *raftNode) start(rh *raftReadyHandler) {
    internalTimeout := time.Second
    go func() {
        defer r.onStop()
        islead := false
        for {
            select {
            case <-r.ticker.C:
                r.tick()
              //調(diào)用Node.Ready(),從返回的channel中獲取數(shù)據(jù)
            case rd := <-r.Ready():
                if rd.SoftState != nil {
                    // SoftState不為空的處理邏輯
                }
                if len(rd.ReadStates) != 0 {
                    //ReadStates不為空的處理邏輯
                }
                // 如果是Leader發(fā)送消息給Follower
                if islead {
                    r.transport.Send(r.processMessages(rd.Messages))
                }
                ...
                ...
                //處理完畢調(diào)用Advance()方法
                r.Advance()
            case <-r.stopped:
                return
            }
        }
    }()
}

以上的邏輯中只包含心跳相關(guān)的,當(dāng)從Ready channel中讀到數(shù)據(jù)后,直接通過(guò)transport發(fā)送出去,這里的processMessages()除了對(duì)消息封裝成傳輸協(xié)議要求的格式,還會(huì)做超時(shí)控制。
發(fā)送完畢后無(wú)論成功失敗都會(huì)調(diào)用raft的Advance()方法處理后續(xù)邏輯。Leader一次心跳發(fā)送就算結(jié)束了。
Ready數(shù)據(jù)獲取
上面的邏輯中,心跳的message是被打包在Ready數(shù)據(jù)結(jié)構(gòu)中返回的,下面看一下數(shù)據(jù)打包的過(guò)程。既然Node.Ready()返回的是個(gè)channel,則必然有地方將Ready塞進(jìn)channel中,這段邏輯是在node.run()方法中。

func (n *node) run() {
    var propc chan msgWithResult
    var readyc chan Ready
    var advancec chan struct{}
    var rd Ready
    r := n.rn.raft
    lead := None
    for {
        if advancec != nil {
            readyc = nil
        } else if n.rn.HasReady() { //判斷是否有Ready數(shù)據(jù)
            // 獲取Ready數(shù)據(jù)
            rd = n.rn.readyWithoutAccept()
            readyc = n.readyc
        }
        ....
        select {
            ....
            case readyc <- rd: //數(shù)據(jù)放入ready channel中
                n.rn.acceptReady(rd)  //告訴raft,ready數(shù)據(jù)已被接收
                advancec = n.advancec  //賦值A(chǔ)dvance channel等待Ready處理完成的消息
            }
    }
}

上面的代碼中Ready數(shù)據(jù)是通過(guò)調(diào)用RawNode.readyWithoutAccept()獲取到的。

func (rn *RawNode) readyWithoutAccept() Ready {
    return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
}

func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
    rd := Ready{
        Entries:          r.raftLog.unstableEntries(),  //未持久化的日志
        CommittedEntries: r.raftLog.nextEnts(),   //已提交可以apply的日志
        Messages:         r.msgs,  //raft隊(duì)列中所有的message
    }
    //判斷softState有沒(méi)有變化,有則賦值
    if softSt := r.softState(); !softSt.equal(prevSoftSt) {
        rd.SoftState = softSt
    }
    //判斷hardState有沒(méi)有變化,有則賦值
    if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
        rd.HardState = hardSt
    }
    //判斷是不是收到snapshot
    if r.raftLog.unstable.snapshot != nil {
        rd.Snapshot = *r.raftLog.unstable.snapshot
    }
    if len(r.readStates) != 0 {
        rd.ReadStates = r.readStates
    }
    //處理該Ready后是否需要做fsync,將數(shù)據(jù)強(qiáng)制刷盤
    rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
    return rd
}

對(duì)于心跳來(lái)說(shuō),上面最關(guān)鍵的操作就是生成Ready的時(shí)候,將msg放到Ready中。

Follower心跳處理

現(xiàn)在來(lái)到心跳的接收方,心跳消息到達(dá)Follower后,傳輸層會(huì)回調(diào)EtcdServer.Process方法,將心跳消息交給raft狀態(tài)機(jī)處理。

func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
    if s.cluster.IsIDRemoved(types.ID(m.From)) {
        //發(fā)送方已經(jīng)從集群中移除
    }
    if m.Type == raftpb.MsgApp {
        //收到日志消息記錄metrics
    }
    //調(diào)用raft.Step處理消息
    return s.r.Step(ctx, m)
}

對(duì)于Follower來(lái)說(shuō)Step仍然進(jìn)入的是stepFollower方法,第一步是將選舉計(jì)時(shí)清0,防止發(fā)起選舉流程。

func stepFollower(r *raft, m pb.Message) error {
    switch m.Type {
    ...
    case pb.MsgHeartbeat:
        r.electionElapsed = 0 //選舉超時(shí)清0
        r.lead = m.From  //設(shè)置Lead為心跳發(fā)送方ID
        r.handleHeartbeat(m) //處理心跳消息
    ...
    }
    return nil
}

func (r *raft) handleHeartbeat(m pb.Message) {
     //設(shè)置commitIndex為L(zhǎng)eader傳來(lái)的最新值
    r.raftLog.commitTo(m.Commit)
     //發(fā)送Response給Leader
    r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

Follower對(duì)心跳消息的處理很簡(jiǎn)單,1)選舉超時(shí)計(jì)時(shí)清0;2)設(shè)置commitIndex(會(huì)檢查本地的commitIndex是不是比leader發(fā)過(guò)來(lái)的?。?)回復(fù)Leader,回復(fù)的時(shí)候按照raft協(xié)議的要求帶上自己日志的進(jìn)度。

Leader心跳回復(fù)處理

Leader收到Follower的心跳回復(fù)后,跟所有消息的處理邏輯一樣,會(huì)進(jìn)入stepLeader()方法處理

func stepLeader(r *raft, m pb.Message) error {
    switch m.Type {
      ...
      ...
    case pb.MsgHeartbeatResp:
        //記錄Follower為Active狀態(tài)
        pr.RecentActive = true
        pr.ProbeSent = false
        if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
            pr.Inflights.FreeFirstOne()
        }
         //有日志要發(fā)送,繼續(xù)發(fā)送
        if pr.Match < r.raftLog.lastIndex() {
            r.sendAppend(m.From)
        }
        if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
            return nil
        }
         //處理線性讀的邏輯
        if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
            return nil
        }
        rss := r.readOnly.advance(m)
        for _, rs := range rss {
            req := rs.req
            if req.From == None || req.From == r.id { // from local member
                r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
            } else {
                r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
            }
        }
    }
}

Leader收到心跳回復(fù)后,會(huì)判斷是否有新的日志要發(fā)給Follower,有的話就繼續(xù)發(fā)送。線性讀的邏輯放在后面的文章解析。

選舉

正常情況下,Leader在每次tick()方法時(shí)發(fā)送心跳,網(wǎng)絡(luò)一切正常,F(xiàn)ollower收到心跳后將選舉計(jì)時(shí)清0,集群就這樣愉快的運(yùn)行下去了。但是分布式系統(tǒng)中,意外時(shí)必然會(huì)發(fā)生的,這時(shí)候Leader宕機(jī)了,就需要集群中其它節(jié)點(diǎn)站出來(lái),競(jìng)選成為新的Leader。

選舉觸發(fā)

首先來(lái)看一下Follower是怎么認(rèn)定Leader掛了的。當(dāng)raft節(jié)點(diǎn)角色是Follower的時(shí)候,EtcdServer每次觸發(fā)tick(),進(jìn)入的是tickElection()方法:

func (r *raft) tickElection() {
    r.electionElapsed++
     //判斷是否要發(fā)起選舉
    if r.promotable() && r.pastElectionTimeout() {
        r.electionElapsed = 0
        r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
    }
}

func (r *raft) promotable() bool {
    pr := r.prs.Progress[r.id]
    return pr != nil && !pr.IsLearner
}
func (r *raft) pastElectionTimeout() bool {
    //每次tick加1后和隨機(jī)選舉超時(shí)比較
    return r.electionElapsed >= r.randomizedElectionTimeout
}

在每次tick()時(shí),都會(huì)檢查是否符合發(fā)起一次新的選舉的條件。其中promotable()比較簡(jiǎn)單,就是判斷自己當(dāng)前是不是還在集群中并且不能是Learner。pastElectionTimeout()判斷是否已經(jīng)超過(guò)選舉超時(shí)時(shí)間還沒(méi)收到Leader的心跳。這里比較的值是randomizedElectionTimeout,代表一個(gè)隨機(jī)選舉超時(shí)時(shí)間,使用隨機(jī)時(shí)間的原因是防止Leader心跳超時(shí)后所有Follower同時(shí)發(fā)起選舉。我們看下etcd中這個(gè)時(shí)間是怎么算的:

func (r *raft) resetRandomizedElectionTimeout() {
    r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
}

可以看到這個(gè)時(shí)間是1個(gè)選舉超時(shí)到2個(gè)選舉超時(shí)之間的隨機(jī)值。每次開(kāi)啟一個(gè)新的term,這個(gè)reset方法都會(huì)被調(diào)用一次,所以在每個(gè)選舉周期這個(gè)隨機(jī)值都是不同的,最大限度防止重復(fù)。

發(fā)送選票

狀態(tài)轉(zhuǎn)換
上一步中Follower發(fā)現(xiàn)過(guò)了選舉超時(shí)還沒(méi)收到Leader心跳,觸發(fā)Step()方法讓raft狀態(tài)機(jī)進(jìn)行狀態(tài)轉(zhuǎn)換。

func (r *raft) Step(m pb.Message) error {
    ...
    ...
    switch m.Type {
    case pb.MsgHup:
        if r.state != StateLeader {
            if !r.promotable() {
                //write log
                return nil
            }
            ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
            if err != nil {
                r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
            }
            //判斷有配置變更日志,如果集群正在做配置變更,則不發(fā)起選舉
            if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
                //write log
                return nil
            }
            //發(fā)起選舉,根據(jù)配置判斷選舉之前是否要做預(yù)投票
            if r.preVote {
                r.campaign(campaignPreElection)
            } else {
                r.campaign(campaignElection)
            }
        } else {
            r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
        }
}

Step方法收到要發(fā)起選舉的消息后(MsgHup),會(huì)首先判斷已經(jīng)提交的還沒(méi)生效的日志中有沒(méi)有集群變更,有的話說(shuō)明集群正在變更,則不發(fā)起選舉。這么做的原因是有可能當(dāng)前節(jié)點(diǎn)在集群變更后已經(jīng)被從集群中移除了。
然后,根據(jù)配置中設(shè)置的選舉是否需要先預(yù)選,來(lái)調(diào)用campaign()方法發(fā)起選舉。預(yù)選的原理放在本文最后說(shuō)。

func (r *raft) campaign(t CampaignType) {
    var term uint64
    var voteMsg pb.MessageType
    if t == campaignPreElection { 
        //需要預(yù)選的情況
        r.becomePreCandidate()
        voteMsg = pb.MsgPreVote
        term = r.Term + 1
    } else {
        //1. 變?yōu)楹蜻x人,并初始化一條拉票的消息
        r.becomeCandidate()
        voteMsg = pb.MsgVote
        term = r.Term
    }
    //2.判斷是否已經(jīng)贏得選舉
    if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
        if t == campaignPreElection {
            r.campaign(campaignElection)
        } else {
            r.becomeLeader()
        }
        return
    }
    //3. 收集所有集群中節(jié)點(diǎn)id
    var ids []uint64
    {
        idMap := r.prs.Voters.IDs()
        ids = make([]uint64, 0, len(idMap))
        for id := range idMap {
            ids = append(ids, id)
        }
        sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
    }
   //4. 輪詢所有節(jié)點(diǎn),給除自己之外的節(jié)點(diǎn)發(fā)送一條拉票的消息
    for _, id := range ids {
        if id == r.id {
            continue
        }
        var ctx []byte
        if t == campaignTransfer {
            ctx = []byte(t)
        }
        //5.根據(jù)raft協(xié)議的定義組合投票消息
        r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
    }
}

上面的代碼分為如下幾步:

  1. 在發(fā)送投票之前,節(jié)點(diǎn)首先將自己的狀態(tài)置為候選人,這一步會(huì)把term加1,然后修改自己的Vote屬性為自己的id,表示當(dāng)前周期選票投給自己。becomeCandidate() 方法如下:
func (r *raft) becomeCandidate() {
    if r.state == StateLeader {
        panic("invalid transition [leader -> candidate]")
    }
    r.step = stepCandidate  //step方法指向stepCandidate
    r.reset(r.Term + 1)  //選舉周期+1
    r.tick = r.tickElection //tick方法指向tickElection
    r.Vote = r.id  //投票給自己
    r.state = StateCandidate  //節(jié)點(diǎn)狀態(tài)轉(zhuǎn)為候選人
}
  1. 第二步的判斷是為了兼容單節(jié)點(diǎn)集群的場(chǎng)景,不是真正的判斷是否收到半數(shù)以上選票。對(duì)于單個(gè)節(jié)點(diǎn),只要自己給自己投票了就已經(jīng)是Leader了。

3-5. 給集群中所有節(jié)點(diǎn)發(fā)送選票, 根據(jù)raft協(xié)議的定義,投票請(qǐng)求需要包含新的選舉周期,節(jié)點(diǎn)id和最新日志的Index。然后跟其它消息一樣調(diào)用send方法提交一條消息。

選票發(fā)送
消息發(fā)送和心跳消息一樣,也是放到raft的msg隊(duì)列中。EtcdServer拿到Ready后發(fā)送給集群中其它的節(jié)點(diǎn),整個(gè)步驟中沒(méi)有針對(duì)voteMsg做特殊處理。

Follower投票

當(dāng)候選人節(jié)點(diǎn)將選票消息發(fā)出以后,在node中會(huì)放入recvc,最終會(huì)調(diào)用raft.Step(m pb.Message)處理這條消息。到Step()方法之前,邏輯跟心跳沒(méi)有區(qū)別,就不重復(fù)了,下面看下Step方法的處理。

func (r *raft) Step(m pb.Message) error {
    switch {
    case m.Term == 0:
        // local message
    case m.Term > r.Term:
        //1. 消息的term比當(dāng)前節(jié)點(diǎn)的大
        if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
            //2. 判斷是否是人工操作的強(qiáng)制transfer
            force := bytes.Equal(m.Context, []byte(campaignTransfer))
            //3. 根據(jù)本地記錄判斷Leader是否已超時(shí)
            inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
            if !force && inLease {
                // 如果沒(méi)超時(shí),則直接不回復(fù)候選人
                return nil
            }
        }
        switch {
        case m.Type == pb.MsgPreVote:
            // 預(yù)選不修改term
        case m.Type == pb.MsgPreVoteResp && !m.Reject:
            // 預(yù)選的response
        default:
            if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
                //4-1. 如果收到了新的term的心跳、append或者snapshot,代表新的周期開(kāi)始,自己變成Follower
                r.becomeFollower(m.Term, m.From)
            } else {
                //4-2. 如果收到了候選人 的投票請(qǐng)求,則說(shuō)明當(dāng)前進(jìn)入重新選舉階段,將Leader設(shè)置成None
                r.becomeFollower(m.Term, None)
            }
        }
    case m.Term < r.Term:
        //處理收到的消息term小于當(dāng)前節(jié)點(diǎn)
    }

    switch m.Type {
    case pb.MsgHup:
        ...
    case pb.MsgVote, pb.MsgPreVote:
        // 5-1. 判斷是否可以投票給候選人
        canVote := r.Vote == m.From ||
            (r.Vote == None && r.lead == None) ||
            (m.Type == pb.MsgPreVote && m.Term > r.Term)
        // 5-2. 判斷候選人的日志比當(dāng)前節(jié)點(diǎn)的新
        if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
            //6-1. 回復(fù)候選人同意
            r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
            if m.Type == pb.MsgVote {
                // 將Vote屬性改為候選人的id
                r.electionElapsed = 0
                r.Vote = m.From
            }
        } else {
            //6-2. 回復(fù)候選人不同意
            r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
        }

    default:
        ...
    }
    return nil
}

節(jié)點(diǎn)收到投票的消息,處理前提是收到的Term比當(dāng)前的Term要大。

  1. ,如果是候選人發(fā)送的投票消息,首先會(huì)做一次校驗(yàn)。1到3步是判斷這次投票的消息不是Transfer消息,并且選舉也沒(méi)有超時(shí),則直接忽略掉。再次檢查最小選舉超時(shí)是為了防止集群中只有少部分節(jié)點(diǎn)收不到心跳,而其它節(jié)點(diǎn)心跳正常的情況,減少重新選舉的次數(shù)。

Transfer消息是etcd支持的人工發(fā)起的Leader轉(zhuǎn)移請(qǐng)求,這是為了在Leader機(jī)器性能不夠或者準(zhǔn)備下線時(shí),人工發(fā)起切換Leader

  1. 收到term比自己大的消息時(shí),有可能是有新的Leader當(dāng)選了,發(fā)送日志或者心跳消息出來(lái)。這種情況當(dāng)前節(jié)點(diǎn)無(wú)論處于什么狀態(tài)都應(yīng)該切換成Follower。如果是候選人的投票消息,則將自己的Leader設(shè)置成None,進(jìn)入選舉中階段。

5-1. 在收到候選人的投票消息后,必須滿足3種情況下的一種才可以投同意票,① 之前已經(jīng)投給這個(gè)候選人了,可能由于網(wǎng)絡(luò)的原因再次收到重復(fù)的消息;②當(dāng)前未給任何節(jié)點(diǎn)投過(guò)票,而且當(dāng)前的Leader是None(在低4步中設(shè)置的);③ 預(yù)選消息只需要判斷term就可以了
5-2.投同意票還有一個(gè)條件就是候選人的日志比當(dāng)前節(jié)點(diǎn)的新,raft中新的標(biāo)準(zhǔn)就是最后一條日志要么term更大,要么term相同Index更大

6-1. 第5步的條件都滿足后就可以回復(fù)同意給候選人了,同時(shí)將自己Vote值改為候選人的ID,這一步很關(guān)鍵,保證了同一個(gè)term中,只能投票給一個(gè)候選人
6-2. 如果第5步中的條件不滿足則拒絕候選人

選舉完成

對(duì)于候選人來(lái)說(shuō),選票發(fā)出去之后無(wú)非面臨如下2種結(jié)果:

  • 失敗,規(guī)定時(shí)間內(nèi)沒(méi)有收到超過(guò)半數(shù)同意票
  • 成功,規(guī)定時(shí)間內(nèi)收到超過(guò)半數(shù)同意票

處理選票回復(fù)

在集群網(wǎng)絡(luò)正常時(shí),候選人應(yīng)該很快會(huì)收到各個(gè)節(jié)點(diǎn)對(duì)她選票的回復(fù)。消息的處理在stepCandidate()函數(shù)中:

func stepCandidate(r *raft, m pb.Message) error {
    var myVoteRespType pb.MessageType
    if r.state == StatePreCandidate {
        myVoteRespType = pb.MsgPreVoteResp
    } else {
        myVoteRespType = pb.MsgVoteResp
    }
    switch m.Type {
    case pb.MsgProp:
        //cadidate狀態(tài)下不接受客戶端數(shù)據(jù)修改請(qǐng)求
        return ErrProposalDropped
    case pb.MsgApp:
        //投票期間收到日志,說(shuō)明其它節(jié)點(diǎn)成為L(zhǎng)eader
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleAppendEntries(m)
    case pb.MsgHeartbeat:
       //投票期間收到心跳,說(shuō)明其它節(jié)點(diǎn)成為L(zhǎng)eader
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleHeartbeat(m)
    case pb.MsgSnap:
       //投票期間收到日志快照,說(shuō)明其它節(jié)點(diǎn)成為L(zhǎng)eader
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleSnapshot(m)
    case myVoteRespType:
        //收到投票反饋,則記錄并判斷是否超過(guò)半數(shù)
        gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
        switch res {
        case quorum.VoteWon:
            if r.state == StatePreCandidate {
                r.campaign(campaignElection)
            } else {
                //贏得選舉,則成為L(zhǎng)eader,開(kāi)始廣播心跳和日志
                r.becomeLeader()
                r.bcastAppend()
            }
        case quorum.VoteLost:
            // 輸?shù)暨x舉,重新變成Follower
            r.becomeFollower(r.Term, None)
        }
    case pb.MsgTimeoutNow:
        ...
    }
    return nil
}

當(dāng)發(fā)起選舉的節(jié)點(diǎn)收到消息時(shí),如果消息是日志、快照或者心跳消息,說(shuō)明別的節(jié)點(diǎn)已經(jīng)成為L(zhǎng)eader,它已經(jīng)輸?shù)袅?,則直接成為Follower。
如果收到的是其它節(jié)點(diǎn)的投票回復(fù),則會(huì)統(tǒng)計(jì)自己的選票,如果超過(guò)半數(shù)支持,是則成為L(zhǎng)eader;超過(guò)半數(shù)拒絕,則回到Follower狀態(tài)等待下個(gè)選舉超時(shí)。如果都沒(méi)到,則繼續(xù)等。
在贏得選舉成為L(zhǎng)eader的情況下,根據(jù)raft協(xié)議,需要馬上開(kāi)始發(fā)送心跳,以防止其它Follower開(kāi)始新的選舉。

超時(shí)失敗

除了以上候選人節(jié)點(diǎn)在收到明確的消息時(shí),可以判斷自己是否成功之外,還有另外一種場(chǎng)景。比如一個(gè)節(jié)點(diǎn)和其它節(jié)點(diǎn)網(wǎng)絡(luò)狀況不佳或者是多個(gè)節(jié)點(diǎn)同時(shí)成為候選人。這種場(chǎng)景下,即不會(huì)收到足夠的投票,也沒(méi)有收到別人成為L(zhǎng)eader消息,為了防止節(jié)點(diǎn)一直等下去,需要一個(gè)超時(shí)的機(jī)制。
etcd在整個(gè)選票發(fā)送及等待選票的過(guò)程中,tick()方法是一直在運(yùn)行的,如果自己一直沒(méi)有當(dāng)選,別人也沒(méi)當(dāng)選超時(shí)的話。候選人會(huì)發(fā)起重新一輪的選舉,邏輯跟第一輪是一樣的?;仡櫹逻@個(gè)方法,其實(shí)無(wú)論是處于Follower還是Candidate狀態(tài),tick的邏輯是一樣的。

func (r *raft) tickElection() {
    r.electionElapsed++
    if r.promotable() && r.pastElectionTimeout() {
        r.electionElapsed = 0
        r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
    }
}

預(yù)選舉

什么是預(yù)選
etcd中,raft狀態(tài)機(jī)新增了一個(gè)狀態(tài)叫做預(yù)候選人。如果用戶在啟動(dòng)etcd時(shí)配置了PreVote屬性為true,則每次選舉開(kāi)始之前,都會(huì)先來(lái)一輪預(yù)選。所謂預(yù)選,就是節(jié)點(diǎn)在成為正式候選人之前,先發(fā)送一個(gè)預(yù)選的消息給集群內(nèi)所有節(jié)點(diǎn)(MsgPreVote),如果超過(guò)半數(shù)節(jié)點(diǎn)都同意,候選人才會(huì)開(kāi)始一次正式的選舉。
在預(yù)選階段,候選節(jié)點(diǎn)的狀態(tài)變?yōu)?code>PreCadidate。而其它節(jié)點(diǎn)仍然保持原來(lái)的狀態(tài),也就是說(shuō)這時(shí)候又有其它Follower要發(fā)起選舉并發(fā)送預(yù)選請(qǐng)求,其它節(jié)點(diǎn)也是會(huì)同意的。
當(dāng)進(jìn)入預(yù)選狀態(tài)的節(jié)點(diǎn),收到超過(guò)半數(shù)同意后,則正式進(jìn)入候選人狀態(tài)(Candidate)
為什么需要預(yù)選
添加預(yù)選的原因是為了在網(wǎng)絡(luò)狀況不佳時(shí),減少選舉次數(shù)。舉個(gè)具體場(chǎng)景,當(dāng)集群中的網(wǎng)絡(luò)不穩(wěn)定時(shí),會(huì)有部分Follower不能及時(shí)地收到Leader的心跳,這時(shí)候就會(huì)有Follower發(fā)起選舉。但是網(wǎng)絡(luò)原因,它自身也很難拿到超過(guò)半數(shù)選票當(dāng)選,或者當(dāng)選之后也很快就會(huì)有別的節(jié)點(diǎn)因?yàn)槭詹坏叫奶俅伟l(fā)起選舉,這就導(dǎo)致了集群經(jīng)常處于選舉狀態(tài)而不可用。為了防止這種情況的頻繁發(fā)生,添加預(yù)選階段,等于把Leader掛掉這件事從單個(gè)節(jié)點(diǎn)自己判斷,變成了半數(shù)節(jié)點(diǎn)一起判斷,大大減少了誤判。
凡事都有利有弊,當(dāng)Leader雖然沒(méi)掛掉,但性能有問(wèn)題時(shí),可能只影響了不到一半的節(jié)點(diǎn)。添加預(yù)選之后可能會(huì)導(dǎo)致性能不佳的Leader很難被選下去,從而影響讀寫性能。

總結(jié)

選舉是保證raft安全性的基礎(chǔ),心跳是保證集群能夠盡快從Leader宕機(jī)或者網(wǎng)絡(luò)分區(qū)中恢復(fù)的關(guān)鍵。etcd中將心跳和計(jì)時(shí)做了集成,抽象成tick。tick操作在Leader端用來(lái)觸發(fā)raft定時(shí)發(fā)送心跳,而在Follower端是為了觸發(fā)檢查L(zhǎng)eader是否超時(shí)。
Raft模塊通過(guò)tick操作來(lái)觸發(fā)狀態(tài)機(jī)在不同狀態(tài)中的轉(zhuǎn)換,通過(guò)綁定不同的函數(shù)來(lái)對(duì)消息進(jìn)行處理和反饋。
選舉完成后,etcd就可以通過(guò)在集群中復(fù)制日志來(lái)保證用戶對(duì)數(shù)據(jù)讀寫的分布式保存和一致性保證了。下一篇將重點(diǎn)解析etcd中日志復(fù)制和提交生效的過(guò)程。

【鏈接】
Raft協(xié)議實(shí)現(xiàn)之etcd(一):基本架構(gòu)
分布式一致性協(xié)議-Raft詳解 (一) 選舉

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容