Zookeeper之Leader選舉

????Zookeeper是采用的zab協(xié)議進(jìn)行實(shí)現(xiàn)的,而不是完全Paxos實(shí)現(xiàn)的。在主備系統(tǒng)架構(gòu)模式下,采用zab協(xié)議來保證集群中所有副本的數(shù)據(jù)一致性。主系統(tǒng)接受處理所有的事務(wù)性請(qǐng)求,然后將數(shù)據(jù)變更狀態(tài)以proposal提案的形式同步給所有的副本進(jìn)程。所以在這個(gè)過程中,Leader機(jī)器顯得格外重要。

????Leader選舉就是在集群中選舉出一個(gè)主進(jìn)程,用來接收處理所有客戶端的事務(wù)性請(qǐng)求。有個(gè)隱式條件就是集群中的服務(wù)器大于等于2臺(tái)才能開始Leader選舉。

Leader選舉時(shí)機(jī):
  • 服務(wù)器啟動(dòng)時(shí)期的Leader選舉
  • 服務(wù)器運(yùn)行期間的Leader選舉
選舉流程:
  • 發(fā)送當(dāng)前自己機(jī)器的選票信息給集群中的其它機(jī)器
  • 接收集群中其它機(jī)器發(fā)送過來的選票信息
  • 處理接收到的投票信息
  • 統(tǒng)計(jì)投票信息
  • 改變當(dāng)前服務(wù)器的狀態(tài)
選票PK規(guī)則:
  • 首先比對(duì)zxId
  • 再比對(duì)sid
Zookeeper中l(wèi)eader選舉的實(shí)現(xiàn)

????zookeeper中的leader選舉由FastLeaderElection具體實(shí)現(xiàn)。其中有幾個(gè)重要的類:

  • Notification:代表收到的投票信息類
  • ToSend:發(fā)送給其它服務(wù)器的投票信息
  • WorkerReceiver和WorkerSender以及Messager
protected class Messenger {
        // 選票發(fā)送器
        WorkerSender ws;
        // 選票接收器
        WorkerReceiver wr;
    }
  • recvqueue:收票隊(duì)列
  • sendqueue:發(fā)送選票隊(duì)列

????WorkerReceiver和WorkerSender不停地從QuorumCnxManager中獲取收到的選票信息,以及向集群中所有其它looking機(jī)器發(fā)送選票信息。

FastLeaderElection繼承自Election,實(shí)現(xiàn)了其中的選舉leader的方法

public Vote lookForLeader() throws InterruptedException {

            try {
                //TODO 所有收到的選票集合
                Map<Long, Vote> recvset = new HashMap<Long, Vote>();

                synchronized (this) {
                    //TODO 邏輯時(shí)鐘++
                    logicalclock.incrementAndGet();
                    //TODO 更新選票 推選的leaderId、zxId 和 選舉周期
                    updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                }

                //TODO 給集群中的其它服務(wù)器發(fā)送當(dāng)前服務(wù)器的投票信息
                sendNotifications();

                SyncedLearnerTracker voteSet;

                //當(dāng)前服務(wù)器是選舉狀態(tài)
                while ((self.getPeerState() == QuorumPeer.ServerState.LOOKING) && (!stop)) {
                    //TODO 從QuorumCnxManager中獲取收到的外部 投票信息
                    FastLeaderElection.Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

                    //TODO 收到的外部投票信息為空
                    if (n == null) {
                        //TODO 當(dāng)前服務(wù)器選票信息是否發(fā)送完
                        if (manager.haveDelivered()) {
                            //TODO 發(fā)送完了繼續(xù)發(fā)送當(dāng)前服務(wù)器的選票信息給集群中的其它服務(wù)器
                            sendNotifications();
                        } else {
                            //TODO 沒有發(fā)送完就當(dāng)前服務(wù)器建立和其它服務(wù)器的鏈接信息
                            manager.connectAll();
                        }
                    } else if (validVoter(n.sid) && validVoter(n.leader)) {
                        //TODO 收到的選票中,投票者和被推選者都是 屬于投票集合中

                        //TODO 查看收到的選票的狀態(tài)
                        switch (n.state) {
                            case LOOKING:
                                if (getInitLastLoggedZxid() == -1) {
                                    LOG.debug("Ignoring notification as our zxid is -1");
                                    break;
                                }
                                if (n.zxid == -1) {
                                    LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                                    break;
                                }
                                // If notification > current, replace and send messages out
                                //TODO 如果收到的投票選舉周期大于當(dāng)前機(jī)器的時(shí)鐘周期
                                if (n.electionEpoch > logicalclock.get()) {
                                    //TODO 更新當(dāng)前機(jī)器的時(shí)鐘周期
                                    logicalclock.set(n.electionEpoch);
                                    //TODO 清空所有收到的投票信息
                                    recvset.clear();
                                    //TODO 如果收到的選票信息優(yōu)于當(dāng)前服務(wù)器選票信息,變更當(dāng)前服務(wù)器的投票信息
                                    if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                                    } else {
                                        //TODO 更新選票為自己當(dāng)前服務(wù)器的選票信息
                                        updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                                    }
                                    //TODO 發(fā)送選票信息
                                    sendNotifications();
                                } else if (n.electionEpoch < logicalclock.get()) {
                                    //TODO 如果邏輯時(shí)鐘小于當(dāng)前邏輯時(shí)鐘,忽略
                                    LOG.debug(
                                            "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
                                            Long.toHexString(n.electionEpoch),
                                            Long.toHexString(logicalclock.get()));
                                    break;
                                } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                    //TODO 選舉周期相同,但是收到的選票更優(yōu),更新選票信息
                                    updateProposal(n.leader, n.zxid, n.peerEpoch);
                                    //TODO 發(fā)送選票信息
                                    sendNotifications();
                                }

                                //TODO 收到的選票信息放入集合中
                                recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                                //TODO 投票歸檔,查看是否已leader選舉完成
                                voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

                                if (voteSet.hasAllQuorums()) {

                                    while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                            recvqueue.put(n);
                                            break;
                                        }
                                    }

                                    if (n == null) {
                                        setPeerState(proposedLeader, voteSet);
                                        Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                                        leaveInstance(endVote);
                                        return endVote;
                                    }
                                }
                                break;
                            case OBSERVING:
                                LOG.debug("Notification from observer: {}", n.sid);
                                break;
                            case FOLLOWING:
                            case LEADING:

                                if (n.electionEpoch == logicalclock.get()) {
                                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                                    voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                                    if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
                                        setPeerState(n.leader, voteSet);
                                        Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                                        leaveInstance(endVote);
                                        return endVote;
                                    }
                                }

                                outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                                voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

                                if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                                    synchronized (this) {
                                        logicalclock.set(n.electionEpoch);
                                        setPeerState(n.leader, voteSet);
                                    }
                                    Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                                    leaveInstance(endVote);
                                    return endVote;
                                }
                                break;
                            default:
                                LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid);
                                break;
                        }
                    } else {
                        if (!validVoter(n.leader)) {
                            LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                        }
                        if (!validVoter(n.sid)) {
                            LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                        }
                    }
                }
                return null;
            } finally {
                try {
                    if (self.jmxLeaderElectionBean != null) {
                        MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
                    }
                } catch (Exception e) {
                    LOG.warn("Failed to unregister with JMX", e);
                }
                self.jmxLeaderElectionBean = null;
                LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
            }
        }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Leader選舉是保證分布式數(shù)據(jù)一致性的關(guān)鍵所在。當(dāng)Zookeeper集群中的一臺(tái)服務(wù)器出現(xiàn)以下兩種情況之一時(shí),需...
    tracy_668閱讀 1,326評(píng)論 1 11
  • 【轉(zhuǎn)自】http://www.cnblogs.com/leesf456/p/6107600.html 一、前言 前...
    lxqfirst閱讀 898評(píng)論 0 0
  • 一、Leader選舉過程 Leader選舉是保證分布式數(shù)據(jù)一致性的關(guān)鍵所在。當(dāng)Zookeeper集群中的一臺(tái)服務(wù)器...
    yannhuang閱讀 1,304評(píng)論 0 2
  • 一、前言 前面學(xué)習(xí)了Zookeeper服務(wù)端的相關(guān)細(xì)節(jié),其中對(duì)于集群?jiǎn)?dòng)而言,很重要的一部分就是Leader選舉,...
    數(shù)據(jù)萌新閱讀 1,327評(píng)論 0 0
  • 推薦指數(shù): 6.0 書籍主旨關(guān)鍵詞:特權(quán)、焦點(diǎn)、注意力、語言聯(lián)想、情景聯(lián)想 觀點(diǎn): 1.統(tǒng)計(jì)學(xué)現(xiàn)在叫數(shù)據(jù)分析,社會(huì)...
    Jenaral閱讀 6,038評(píng)論 0 5

友情鏈接更多精彩內(nèi)容