品味Zookeeper之選舉及數(shù)據(jù)一致性
本文思維導(dǎo)圖

前言
為了高可用和數(shù)據(jù)安全起見,zk集群一般都是由幾個節(jié)點構(gòu)成(由n/2+1,投票機制決定,肯定是奇數(shù)個節(jié)點)。多節(jié)點證明它們之間肯定會有數(shù)據(jù)的通信,同時,為了能夠使zk集群對外是透明的,一個整體對外提供服務(wù),那么客戶端訪問zk服務(wù)器的數(shù)據(jù)肯定是要數(shù)據(jù)同步,也即數(shù)據(jù)一致性。
zk集群是Leader/Follower模式來保證數(shù)據(jù)同步的。整個集群同一時刻只能有一個Leader,其他都是Follower或Observer。Leader是通過選舉選出來的,這里涉及到ZAB協(xié)議(原子消息廣播協(xié)議)。
1.ZAB協(xié)議
1.1 概念理解
為了更好理解下文,先說ZAB協(xié)議,它是選舉過程和數(shù)據(jù)寫入過程的基石。ZAB的核心是定義會改變zk服務(wù)器數(shù)據(jù)狀態(tài)的事務(wù)請求的處理方式。
ZAB的理解:所有事務(wù)請求是由一個全局唯一的服務(wù)器來協(xié)調(diào)處理,這個的服務(wù)器就是Leader服務(wù)器,
其它服務(wù)器都是Follower服務(wù)器或Observer服務(wù)器。Leader服務(wù)器負(fù)責(zé)將一個客戶端的請求轉(zhuǎn)換成那個一個事務(wù)Proposal?(提議),將該Proposal分發(fā)給集群中所有的Follower服務(wù)器。然后Leader服務(wù)器需要等待所有Follower服務(wù)器的應(yīng)答,當(dāng)Leader服務(wù)器收到超過半數(shù)的Follower服務(wù)器進行了明確的應(yīng)答后,Leader會再次向所有的Follower服務(wù)器分發(fā)Commit消息,要求其將前一個Proposal進行提交。
注意事務(wù)提議這個詞,就類似 人大代表大會提議 ,提議就代表會有應(yīng)答,之間有通信。因此在zk的ZAB協(xié)議為了可靠性和可用性,會有投票,應(yīng)答等操作來保證整個zk集群的正常運行。
總的來說就是,涉及到客戶端對zk集群數(shù)據(jù)改變的行為都先由Leader統(tǒng)一響應(yīng),然后再把請求轉(zhuǎn)換為事務(wù)轉(zhuǎn)發(fā)給其他所有的Follower,F(xiàn)ollower應(yīng)答并處理事務(wù),最后再反饋。如果客戶端只是讀請求,那么zk集群所有的節(jié)點都可以響應(yīng)這個請求。
1.2 ZAB協(xié)議三個階段
- 1.發(fā)現(xiàn)(選舉Leader過程)
- 2.同步(選出Leader后,F(xiàn)ollower和Observer需進行數(shù)據(jù)同步)
- 3.廣播(同步之后,集群對外工作響應(yīng)請求,并進行消息廣播,實現(xiàn)數(shù)據(jù)在集群節(jié)點的副本存儲)
下面會逐點分析,但是在這之前先來了解了解zookeeper服務(wù)器的知識吧。
2.Zookeeper服務(wù)器
2.1 zk服務(wù)器角色
- Leader
- 事務(wù)請求的唯一調(diào)度和處理者,保證集群事務(wù)處理的順序序性
- 集群內(nèi)部各服務(wù)器的調(diào)度者
- Follower
- 處理客戶端非事務(wù)請求,轉(zhuǎn)發(fā)事務(wù)請求給Leader服務(wù)器
- 參與事務(wù)請求Proposal的投票
- 參與Leader的選舉投票
- Observer
- 處理客戶端非事務(wù)請求,轉(zhuǎn)發(fā)事務(wù)請求給Leader服務(wù)器
- 不參加任何形式的投票,包括選舉和事務(wù)投票(超過半數(shù)確認(rèn))
- Observer的存在是為了提高zk集群對外提供讀性能的能力
整個zk集群的角色作用如下圖:

2.2 zk服務(wù)器狀態(tài)
- LOOKING
- 尋找Leader狀態(tài)
- 當(dāng)服務(wù)器處于這種狀態(tài)時,表示當(dāng)前沒有Leader,需要進入選舉流程
- FOLLOWING
- 從機狀態(tài),表明當(dāng)前服務(wù)器角色是Follower
- OBSERVING
- 觀察者狀態(tài),表明當(dāng)前服務(wù)器角色是Observer
- LEADING
- 領(lǐng)導(dǎo)者狀態(tài),表明當(dāng)前服務(wù)器角色是Leader
-
ServerState 類維護服務(wù)器四種狀態(tài)。
image
zk服務(wù)器的狀態(tài)是隨著機器的變化而變化的。比如Leader宕機了,服務(wù)器狀態(tài)就變?yōu)長OOKING,通過選舉后,某機器成為Leader,服務(wù)器狀態(tài)就轉(zhuǎn)換為LEADING。其他情況類似。
2.3 zk服務(wù)器通信
集群嘛,節(jié)點之間肯定是要通信的。zokeeper通信有兩個特點:
1.使用的通信協(xié)議是TCP協(xié)議。在集群中到底是怎么連接的呢?還記得在配置zookeeper時要創(chuàng)建一個data目錄并在其他創(chuàng)建一個myid文件并寫入唯一的數(shù)字嗎?zk服務(wù)器的TCP連接方向就是依賴這個myid文件里面的數(shù)字大小排列。數(shù)小的向數(shù)大的發(fā)起TCP連接。比如有3個節(jié)點,myid文件內(nèi)容分別為1,2,3。zk集群的tcp連接順序是1向2發(fā)起TCP連接,2向3發(fā)起TCP連接。如果有n個節(jié)點,那么tcp連接順序也以此類推。這樣整個zk集群就會連接起來。
-
2.zk服務(wù)器是多端口的。例如配置如下:
tickTime=2000 dataDir=/home/liangjf/app/zookeeper/data dataLogDir=/home/liangjf/app/zookeeper/log clientPort=2181 initLimit=5 syncLimit=2 server.1=192.168.1.1:2888:3888 server.2=192.168.1.2:2888:3888 server.3=192.168.1.3:2888:3888 第1個端口是通信和數(shù)據(jù)同步端口,默認(rèn)是2888
第2個端口是投票端口,默認(rèn)是3888
3.選舉機制
3.1 選舉算法
從zookeeper開始發(fā)布以來,選舉的算法也慢慢優(yōu)化?,F(xiàn)在為了可靠性和高可用,從3.4.0版本開始zookeeper只支持基于Tcp的FastLeaderElection選舉協(xié)議。
- LeaderElection
- Udp協(xié)議
- AuthFastLeaderElection
- udp
- FastLeaderElection
- Udp
- Tcp
FastLeaderElection選舉協(xié)議使用TCP實現(xiàn)Leader投票選舉算法。它使用了類對象quorumcnxmanager管理連接。該算法是基于推送的,可以通過調(diào)節(jié)參數(shù)來改變選舉的過程。第一,finalizewait決定等到?jīng)Q定Leader的時間。這是Leader選舉算法的一部分。
final static int finalizeWait = 200;(選舉Leader過程的進程時間)
final static int maxNotificationInterval = 60000;(通知檢查選中Leader的時間間隔)
final static int IGNOREVALUE = -1;
這里先不詳細分析,下面3.5 選舉算法源碼分析及舉栗子才分析。
3.2 何時觸發(fā)選舉
選舉Leader不是隨時選舉的,畢竟選舉有產(chǎn)生大量的通信,造成網(wǎng)絡(luò)IO的消耗。因此下面情況才會出現(xiàn)選舉:
- 集群啟動
- 服務(wù)器處于尋找Leader狀態(tài)
- 當(dāng)服務(wù)器處于LOOKING狀態(tài)時,表示當(dāng)前沒有Leader,需要進入選舉流程
- 崩潰恢復(fù)
- Leader宕機
- 網(wǎng)絡(luò)原因?qū)е逻^半節(jié)點與Leader心跳中斷
3.3 如何成為Leader
- 數(shù)據(jù)新舊程度
- 只有擁有最新數(shù)據(jù)的節(jié)點才能有機會成為Leader
- 通過zxid的大小來表示數(shù)據(jù)的新,zxid越大代表數(shù)據(jù)越新
- myid
- 集群啟動時,會在data目錄下配置myid文件,里面的數(shù)字代表當(dāng)前zk服務(wù)器節(jié)點的編號
- 當(dāng)zk服務(wù)器節(jié)點數(shù)據(jù)一樣新時, myid中數(shù)字越大的就會被選舉成ОLeader
- 當(dāng)集群中已經(jīng)有Leader時,新加入的節(jié)點不會影響原來的集群
- 投票數(shù)量
- 只有得到集群中多半的投票,才能成為Leader
- 多半即:n/2+1,其中n為集群中的節(jié)點數(shù)量
3.4 重要的zxid
由3.3知道zxid是判斷能否成為Leader的條件之一,它代表服務(wù)器的數(shù)據(jù)版本的新舊程度。
zxid由兩部分構(gòu)成:主進程周期epoch和事務(wù)單調(diào)遞增的計數(shù)器。zxid是一個64位的數(shù),高32位代表主進程周期epoch,低32位代表事務(wù)單調(diào)遞增的計數(shù)器。
主進程周期epoch也叫epoch,是選舉的輪次,每選舉一次就遞增1。事務(wù)單調(diào)遞增的計數(shù)器在每次選舉完成之后就會從0開始。
如果是比較數(shù)據(jù)新舊的話,直接比較就可以了。因為如果是主進程周期越大,即高32位越大,那么低32位就不用再看了。如果主進程周期一致,低32位越大,整個zxid就越大。所以直接比較整個64位就可以了,不必高32位于高32位對比,低32位與低32位比較。
3.5 選舉算法源碼分析及舉栗子
3.5.1 舉栗子
zookeeper選舉有兩種情況:
- 1.集群首次啟動
- 2.集群在工作時Leader宕機
選主原則如下(在選舉時,對比次序是從上往下)
- 1.
New epoch is higher - 主周期更大,代所有一切是最新,就成為leader
- 2.
New epoch is the same as current epoch, but new zxid is higher - 主周期一致就是在同一輪選票中,zxid越大就成為leader,因為數(shù)據(jù)更新
- 3.
New epoch is the same as current epoch, new zxid is the same as current zxid, but server id is higher - 主周期和zxid一致,就看機器的id(myid),myid越大就成為leader
同時,在選舉的時候是投票方式進行的,除主進程周期外,投票格式為(myid,zxid)。
第一種情況,比較容易理解,下面以3臺機器為例子。
- 三個zk節(jié)點A,B,C,三者開始都沒有數(shù)據(jù),即Zxid一致,對應(yīng)的myid為1,2,3。
- A啟動myid為1的節(jié)點,zxid為0,此時只有一臺服務(wù)器無法選舉出Leader
- B啟動myid為2的節(jié)點,zxid為0,B的zxid與A一樣,比較myid,B的myid為2比A為1大,B成Leader
- C啟動myid為3的節(jié)點,因為已經(jīng)有Leader節(jié)點,則C直接加入集群,承認(rèn)B是leader
第二種情況,已5臺機器為例子。
- 五個節(jié)點A,B,C,D,E,B是Leader,其他是Follower,myid分別為1,2,3,4,5,zxid分別為3,4,5,6,6。運行到某個時刻時A,B掉線或宕機,此時剩下C D E。在同一輪選舉中,C,D,E分別投自己和交叉投票。
- 第一次投票,都是投自己。
- 投票情況為:C:(3,5) D:(4,6) E:(5,6)。
- 同時也會收到其他機器的投票。
- 投票情況為:C:(3,5)(4,6)(5,6),D:(4,6)(3,5)(5,6),E:(5,6)(4,6)(3,5)
- 機器內(nèi)部會根據(jù)選主原則對比投票,變更投票,投票情況為:C:(3,5)(4,6)(5,6)【不變更】。 D:(4,6)(4,6)(5,6)【變更】。E:(5,6)(5,6)(5,6)【變更】
- 統(tǒng)計票數(shù),C-1票,D-3票,E-5票。因此E成為Leader。
接下來就是對新Leader節(jié)點的檢查,數(shù)據(jù)同步,廣播,對外提供服務(wù)。
3.5.1 選舉算法源碼分析
選舉算法的全部代碼在FastLeaderElection類中。其他的lookForLeader函數(shù)是選舉Leader的入口函數(shù)。
//每一輪選舉就會增大一次邏輯時鐘,同時更新事務(wù)
synchronized(this){
logicalclock++;
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
//一直循環(huán)選舉直到找到leader,這里把打印和不相關(guān)的都刪除了,方便分析。
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
//從通知隊列拉取一個投票通知
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
if(n == null){
//看是否選舉時通知發(fā)送/接收超時
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
}
else if(self.getVotingView().containsKey(n.sid)) {
switch (n.state) {
case LOOKING://只有zk服務(wù)器狀態(tài)為LOOKING時才會進行選舉
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock) {
//如果選舉時的邏輯時鐘大于發(fā)送通知來源的機器的邏輯時鐘,就把對方的修改為自己的。
logicalclock = n.electionEpoch;
recvset.clear();
//并統(tǒng)計票數(shù),如果能成為leader就更新事務(wù)
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
//否者更新事務(wù)為對方的投票信息
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock) {
//如果通知來演的機器的邏輯時鐘比本次我的選舉時鐘低,直接返回,什么都不做。因為對方?jīng)]機會成為leader
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
//如果Epoch一樣,就看zxid的比較。不過還是會更新事務(wù)和回傳通知
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
//把所有接收到的投票信息都放到recvset集合
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//統(tǒng)計誰的投票超過半數(shù),就成為leader
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock, proposedEpoch))) {
//驗證一下,被選舉的leader是否有變化,就是看符不符合
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
//符合就放進recvqueue集合
recvqueue.put(n);
break;
}
}
//改變選舉為leader的機器的狀態(tài)為LEADING
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case FOLLOWING:
case LEADING:
//在同一輪選舉中,判斷所有的通知,并確認(rèn)自己是leader
if(n.electionEpoch == logicalclock){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
//在對外提供服務(wù)前,先廣播一次自己是leader的消息給所有follower,讓大家認(rèn)同我為leader。
outofelection.put(n.sid, new Vote(n.leader, n.zxid,
n.electionEpoch, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized(this){
logicalclock = n.electionEpoch;
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
}
}
}
比較重要的子函數(shù)有以下這些:
- 1.totalOrderPredicate。(投票比較變更原則,選舉的核心)
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
//按照這樣的順序比較優(yōu)先:Epoch > Zxid > myid
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
- 2.termPredicate。(最終的計算票數(shù)。先把投票放到集合中,然后再統(tǒng)計。集合能去重)
private boolean termPredicate(
HashMap<Long, Vote> votes,
Vote vote) {
HashSet<Long> set = new HashSet<Long>();
for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
if (vote.equals(entry.getValue())){
set.add(entry.getKey());
}
}
return self.getQuorumVerifier().containsQuorum(set);
}
- 3.Messenger。(構(gòu)造Messenger的時候創(chuàng)建2條線程WorkerSender和WorkerReceiver用于整個選舉的集群投票通信)
Messenger(QuorumCnxManager manager) {
this.ws = new WorkerSender(manager);
Thread t = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
this.wr = new WorkerReceiver(manager);
t = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
t.setDaemon(true);
t.start();
}
其他細節(jié)不多說了,主要是sendqueue和recvqueue隊列存放待發(fā)送投票通知和接收投票通知,WorkerSender和WorkerReceiver兩條線程用于投票的通信,QuorumCnxManager manager用于真正和其他機器的tcp連接維護管理,Messenger是整個投票通信的管理者。
3.數(shù)據(jù)同步機制
3.1 同步準(zhǔn)備
完成選舉之后,為了數(shù)據(jù)一致性,需要進行數(shù)據(jù)同步流程。
3.1.1 Leader準(zhǔn)備
- Leader告訴其它follower當(dāng)前最新數(shù)據(jù)是什么即zxid
- Leader構(gòu)建一個NEWLEADER的包,包括當(dāng)前最大的zxid,發(fā)送給所有的follower或者Observer
- Leader給每個follower創(chuàng)建一個線程LearnerHandler來負(fù)責(zé)處理每個follower的數(shù)據(jù)同步請求,同時主線程開始阻塞,等到超過一半的follwer同步完成,同步過程才完成,leader才真正成為leader
3.1.2 Follower準(zhǔn)備
- 選舉完成后,嘗試與leader建立同步連接,如果一段時間沒有連接上就報連接超時,重新回到選舉狀態(tài)FOLLOWING
- 向leader發(fā)送FOLLOWERINFO包,帶上follower自己最大的zxid
3.2 同步初始化
同步初始化涉及到三個東西:minCommittedLog、maxCommittedLog、zxid
– minCommittedLog:最小的事務(wù)日志id,即zxid沒有被快照存儲的日志文件的第一條,每次快照存儲
完,會重新生成一個事務(wù)日志文件
– maxCommittedLog:事務(wù)日志中最大的事務(wù),即zxid
4.數(shù)據(jù)同步場景
- 直接差異化同步(DIFF同步)
- 僅回滾同步TRUNC?,即刪除多余的事務(wù)日志,比如原來的Leader宕機后又重新加入,可能存在它自己寫
入提交但是別的節(jié)點還沒來得及提交 - 先回滾再差異化同步(TRUNC+DIFF同步)
- 全量同步(SNAP同步)
不同的數(shù)據(jù)同步算法適用不同的場景。
5.廣播流程
- 集群選舉完成,并且完成數(shù)據(jù)同步后,開始對外服務(wù),接收讀寫請求
- 當(dāng)leader接收到客戶端新的事務(wù)請求后,會生成對新的事務(wù)proposal,并根據(jù)zxid的順序向所有的
follower分發(fā)事務(wù)proposal - 當(dāng)follower收到leader的proposal時,根據(jù)接收的先后順序處理proposal
- 當(dāng)Leader收到follower針對某個proposal過半的ack后,則發(fā)起事務(wù)提交,重新發(fā)起一個commit的
proposal - Follower收到commit的proposal后,記錄事務(wù)提交,并把數(shù)據(jù)更新到內(nèi)存數(shù)據(jù)庫
- 補充說明
- 由于只有過半的機器給出反饋,則可能存在某時刻某些節(jié)點數(shù)據(jù)不是最新的
- 如果需要確定讀取到的數(shù)據(jù)是最新的,則可以在讀取之前,調(diào)用sync方法進行數(shù)據(jù)同步
6.小結(jié)
在zookeeper中,除了watcher機制,會話管理,最重要的就是選舉了。它是zookeeper集群的核心,也是廣泛應(yīng)用在商業(yè)中的前提。洋洋灑灑一大篇,可能存在一些不足,后面更加深入理解再來補充吧。
