[Zookeeper] Zookeeper 請求流程1 (Learner部分)

目錄:

  • 數(shù)據(jù)同步與初始化(選舉完leader之后)
  • 分角色業(yè)務(wù)處理分析(leader,follower,observer)

1.數(shù)據(jù)同步與初始化


選舉完leader之后,只有當(dāng)各個角色與leader保持?jǐn)?shù)據(jù)同步,才能對外提供服務(wù)。

其中,服務(wù)器間數(shù)據(jù)同步過程

基本分為三種方式:

  1. SNAP方式(snapshot,同步整個文件)
  2. DIFF方式
  3. TRUNC方式

1.1 基本流程

其中第3步:Follower返回Leader的AckEpoch,會包含當(dāng)前的最大zxid,Leader節(jié)點會將該zxid與其minZxid,maxZxid進(jìn)行比較。
這個 [minZxid,maxZxid] 實際上是在leader端緩存的一個事務(wù)隊列。

其中第6步:發(fā)送的NewLeader,說明當(dāng)前數(shù)據(jù)已經(jīng)同步完(Leader已經(jīng)將該同步的內(nèi)容發(fā)送給Follower)

三種方式的區(qū)別:

  1. 如果Follower端的zxid小于minZxid,說明Leader與Follower之間數(shù)據(jù)差距非常大,直接采取Snap方式,F(xiàn)ollower就去接收Leader發(fā)送的snapshot文件

  2. 如果Follower端的zxid處于minZxid,maxZxid之間,采取Diff方式,即Leader只要發(fā)送區(qū)間為[zxid,maxZxid]的事務(wù)即可,F(xiàn)ollower接收到這些事務(wù),進(jìn)行持久化并更新內(nèi)存

  3. 如果Follower端的zxid大于maxZxid,采取Trunc方式,F(xiàn)ollower則將大于maxZxid的事務(wù)日志刪除

1.2 類說明

  1. Learner類
    Learner包括Follower和Observer,其中比較重要的leaderIs,leaderOs,表示是鏈接到Leader的輸入流,輸出流

  2. LearnerHandler(繼承自ZooKeeperThread)

1.3 詳細(xì)說明

在QuorumPeer中,進(jìn)行FastLeaderElection之后,即在QuorumPeer的run方法中,

       try {
            /*
             * Main loop
             */
            while (running) {
                switch (getPeerState()) {
                case LOOKING:
                    LOG.info("LOOKING");

                    if (Boolean.getBoolean("readonlymode.enabled")) {
                        LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                        // Create read-only server but don't start it immediately
                        final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
                                logFactory, this,
                                new ZooKeeperServer.BasicDataTreeBuilder(),
                                this.zkDb);
    
                        Thread roZkMgr = new Thread() {
                            public void run() {
                                try {
                                    // lower-bound grace period to 2 secs
                                    sleep(Math.max(2000, tickTime));
                                    if (ServerState.LOOKING.equals(getPeerState())) {
                                        roZk.startup();
                                    }
                                } catch (InterruptedException e) {
                                    LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                                } catch (Exception e) {
                                    LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                                }
                            }
                        };
                        try {
                            roZkMgr.start();
                            setBCVote(null);
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception",e);
                            setPeerState(ServerState.LOOKING);
                        } finally {
                            // If the thread is in the the grace period, interrupt
                            // to come out of waiting.
                            roZkMgr.interrupt();
                            roZk.shutdown();
                        }
                    } else {
                        try {
                            setBCVote(null);
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }
                    }
                    break;
                case OBSERVING:
                    try {
                        LOG.info("OBSERVING");
                        setObserver(makeObserver(logFactory));
                        observer.observeLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e );                        
                    } finally {
                        observer.shutdown();
                        setObserver(null);
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                case FOLLOWING:
                    try {
                        LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        follower.shutdown();
                        setFollower(null);
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                case LEADING:
                    LOG.info("LEADING");
                    try {
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        setPeerState(ServerState.LOOKING);
                    }
                    break;
                }
            }
        } finally {
            LOG.warn("QuorumPeer main thread exited");
            try {
                MBeanRegistry.getInstance().unregisterAll();
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            jmxQuorumBean = null;
            jmxLocalPeerBean = null;
        }
    }

通過getPeerState()方法,獲取當(dāng)前服務(wù)器的state,如果是FOLLOWING狀態(tài),

followLeader()方法:Follower在提供服務(wù)給客戶端之間完成注冊到Leader的動作。
注冊分為以下3個主要步驟:

  1. 調(diào)用connectToLeader方法連接到Leader。
  2. 調(diào)用registerWithLeader方法注冊到Leader,交換各自的sid、zxid和Epoch等信息,Leader以此決定事務(wù)同步的方式。
  3. 調(diào)用SyncWithLeader跟Leader進(jìn)行事務(wù)數(shù)據(jù)同步,處理SNAP/DIFF/TRUNC包。
  • connectToLeader:創(chuàng)建Socket連接到Leader,該方法定義在Follower父類Learner中,加了重試機(jī)制,最多可以嘗試5次連接。連接成功后,Leader會創(chuàng)建一個LearnerHandler專門處理與該Follower之間的QuorumPacket消息的傳遞。

  • registerWithLeader:首先會發(fā)送FOLLOWERINFO包給Leader,告訴Leader自己的身份屬性(Follower的zxid,sid)。然后等待Leader回復(fù)的LEADINFO包,獲取Leader的Epoch和zxid值,并更新Follower的Epoch和zxid值,以Leader信息為準(zhǔn)。
    最后,給Leader發(fā)送ACKINFO包,告訴Leader這次Follower已經(jīng)與Leader的zxid同步了。

  • SyncWithLeader:與Leader同步數(shù)據(jù),即同步Leader的事務(wù)到Follower

3.1
首先讀取同步數(shù)據(jù)包,主要代碼如下:

QuorumPacket qp = new QuorumPacket();
        long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
        // In the DIFF case we don't need to do a snapshot because the transactions will sync on top of any existing snapshot
        // For SNAP and TRUNC the snapshot is needed to save that history
        boolean snapshotNeeded = true;
        readPacket(qp);
        // 提交的packets
        LinkedList<Long> packetsCommitted = new LinkedList<Long>();
        // 未提交的packets
        LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
        synchronized (zk) {
            // Diff方式下,不需要進(jìn)行snapshot
            if (qp.getType() == Leader.DIFF) {
                LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
                snapshotNeeded = false;
            }
            else if (qp.getType() == Leader.SNAP) {
                LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
                // The leader is going to dump the database
                // clear our own database and read
                // 清空日志,minZxid和maxZxid都為0,,新構(gòu)建DataTree
                zk.getZKDatabase().clear();
                zk.getZKDatabase().deserializeSnapshot(leaderIs);
                String signature = leaderIs.readString("signature");
                if (!signature.equals("BenWasHere")) {
                    LOG.error("Missing signature. Got " + signature);
                    throw new IOException("Missing signature");                   
                }
                // 設(shè)置最近的zxid
                zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
            } else if (qp.getType() == Leader.TRUNC) {
                //we need to truncate the log to the lastzxid of the leader
                LOG.warn("Truncating log to get in sync with the leader 0x"
                        + Long.toHexString(qp.getZxid()));
                boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
                if (!truncated) {
                    // not able to truncate the log
                    LOG.error("Not able to truncate the log "
                            + Long.toHexString(qp.getZxid()));
                    System.exit(13);
                }
                zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
            }

<1> SNAP:快照模式,這種模式下Leader將整個完整數(shù)據(jù)庫傳給Follower
<2> TRUNC:截斷模式,這種模式表明Follower的數(shù)據(jù)比Leader還多,為了維持一致性需要將Follower多余的數(shù)據(jù)刪除
<3> DIFF:差異模式,說明Follower比Leader的事務(wù)少,需要給Follower補(bǔ)足,這時候Leader會將需要補(bǔ)充的事務(wù)生成PROPOSAL包和COMMIT包發(fā)給Follower執(zhí)行。

3.2
處理后續(xù)消息(即QuorumPacket類型)
比如Proposal,Commit,NewLeader等,其中Proposal是指在同步期間收到的Leader發(fā)送的寫請求信息,緩存在packetsNotCommitted中。

            outerLoop:
            while (self.isRunning()) {
                readPacket(qp);
                switch(qp.getType()) {
                case Leader.PROPOSAL:
                    PacketInFlight pif = new PacketInFlight();
                    pif.hdr = new TxnHeader();
                    pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
                    if (pif.hdr.getZxid() != lastQueued + 1) {
                    LOG.warn("Got zxid 0x"
                            + Long.toHexString(pif.hdr.getZxid())
                            + " expected 0x"
                            + Long.toHexString(lastQueued + 1));
                    }
                    lastQueued = pif.hdr.getZxid();
                    packetsNotCommitted.add(pif);
                    break;
                case Leader.COMMIT:
                    if (!writeToTxnLog) {
                        pif = packetsNotCommitted.peekFirst();
                        if (pif.hdr.getZxid() != qp.getZxid()) {
                            LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
                        } else {
                            zk.processTxn(pif.hdr, pif.rec);
                            packetsNotCommitted.remove();
                        }
                    } else {
                        packetsCommitted.add(qp.getZxid());
                    }
                    break;
                case Leader.INFORM:
                    /*
                     * Only observer get this type of packet. We treat this
                     * as receiving PROPOSAL and COMMMIT.
                     */
                    PacketInFlight packet = new PacketInFlight();
                    packet.hdr = new TxnHeader();
                    packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
                    // Log warning message if txn comes out-of-order
                    if (packet.hdr.getZxid() != lastQueued + 1) {
                        LOG.warn("Got zxid 0x"
                                + Long.toHexString(packet.hdr.getZxid())
                                + " expected 0x"
                                + Long.toHexString(lastQueued + 1));
                    }
                    lastQueued = packet.hdr.getZxid();
                    if (!writeToTxnLog) {
                        // Apply to db directly if we haven't taken the snapshot
                        zk.processTxn(packet.hdr, packet.rec);
                    } else {
                        packetsNotCommitted.add(packet);
                        packetsCommitted.add(qp.getZxid());
                    }
                    break;
                case Leader.UPTODATE:
                    if (isPreZAB1_0) {
                        zk.takeSnapshot();
                        self.setCurrentEpoch(newEpoch);
                    }
                    self.cnxnFactory.setZooKeeperServer(zk);                
                    break outerLoop;
                case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
                 
                    File updating = new File(self.getTxnFactory().getSnapDir(),
                                        QuorumPeer.UPDATING_EPOCH_FILENAME);
                    if (!updating.exists() && !updating.createNewFile()) {
                        throw new IOException("Failed to create " +
                                              updating.toString());
                    }
                    if (snapshotNeeded) {
                        zk.takeSnapshot();
                    }
                    self.setCurrentEpoch(newEpoch);
                    if (!updating.delete()) {
                        throw new IOException("Failed to delete " +
                                              updating.toString());
                    }
                    writeToTxnLog = true; 
                    isPreZAB1_0 = false;
                    writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                    break;
                }
            }
        }

之后,Leader會發(fā)送NEWLEAER包,F(xiàn)ollower收到NEWLEADER包后回復(fù)ACK給Leader,
最后,Leader發(fā)送UPTODATE包表示同步完成,F(xiàn)ollower這時啟動服務(wù)端并跳出本次循環(huán),準(zhǔn)備結(jié)束整個注冊過程。

3.3 Follower主流程
Follower是Learner的子類,F(xiàn)ollower的啟動方法就是followLeader。

// 尋找Leader角色
            QuorumServer leaderServer = findLeader();            
            try {
                // 嘗試5次連接Leader
                connectToLeader(leaderServer.addr, leaderServer.hostname);
                // 建立Following連接,
                long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

                //check to see if the leader zxid is lower than ours
                //this should never happen but is just a safety check
                long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
                if (newEpoch < self.getAcceptedEpoch()) {
                    LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                            + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                    throw new IOException("Error: Epoch of leader is lower");
                }
                // 進(jìn)行數(shù)據(jù)同步
               syncWithLeader(newEpochZxid);                
                QuorumPacket qp = new QuorumPacket();
                while (this.isRunning()) {
                    readPacket(qp);
                    processPacket(qp);
                }

啟動時,首先與Leader同步數(shù)據(jù),然后啟動FollowerZooKeeperServer,在FollowerZooKeeperServer運(yùn)行的同時,額外啟動while循環(huán)等待QuorumPacket包,調(diào)用processPacket方法處理這些包。

processPacket處理QuorumPeer傳送的
QuorumPacket,最主要是處理兩種QuorumPacket:PROPOSAL和COMMIT。當(dāng)然還有PING、COMMITANDACTIVATE等包類型。

該方法在收到Leader發(fā)送過來的QuorumPacket時被調(diào)用,主要是響應(yīng)PROPOSAL和COMMIT兩種類型的消息。PROPOSAL是Leader將要執(zhí)行的寫事務(wù)命令;COMMIT是提交命令。Follower只有在收到COMMIT消息后才會讓PROPOSAL命令的內(nèi)容生效。

同一個寫事務(wù)命令會在Leader和多個Follower上都執(zhí)行一次,保證集群數(shù)據(jù)的一致性。

        case Leader.PROPOSAL:            
            TxnHeader hdr = new TxnHeader();
            Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
            if (hdr.getZxid() != lastQueued + 1) {
                LOG.warn("Got zxid 0x"
                        + Long.toHexString(hdr.getZxid())
                        + " expected 0x"
                        + Long.toHexString(lastQueued + 1));
            }
            lastQueued = hdr.getZxid();
            fzk.logRequest(hdr, txn);
            break;
        case Leader.COMMIT:
            fzk.commit(qp.getZxid());
            break;

Follower收到PROPOSAL消息后調(diào)用FollowerZooKeeperServer的logRequest方法;收到COMMIT消息后調(diào)用FollowerZooKeeperServer的commit方法。

  • PROPOSAL包
    Leader發(fā)送給集群中所有follower的寫請求包。
    Leader執(zhí)行寫操作時需要告之集群中的Learner,讓大家也執(zhí)行寫操作,保證集群數(shù)據(jù)的一致性。PROPOSAL是嚴(yán)格按照順序執(zhí)行的,這也是ZOOKEEPER的核心設(shè)計思想之一

  • COMMIT包
    當(dāng)Leader認(rèn)為一個Proposal已被大多數(shù)Follower持久化并等待執(zhí)行后會發(fā)送COMMIT包,通知各Follower可以提交執(zhí)行該P(yáng)roposal了,最后調(diào)用到FinalRequestProcessor執(zhí)行寫操作,通過這種機(jī)制保證寫操作能被大半數(shù)集群機(jī)器執(zhí)行

3.4 Observer主流程
Observer和Follower功能類似,主要的差別就是不參與選舉。

Observer的入口方法是observerLeader。當(dāng)QuorumPeer的狀態(tài)是OBSERVING時會啟動Observer并調(diào)用observerLeader方法。

observerLeader同F(xiàn)ollower的followLeader方法類似,首先注冊到Leader,事務(wù)同步后進(jìn)入QuorumPacket包循環(huán)處理過程,調(diào)用processPacket方法處理QuorumPacket。

processPacket比Follower要簡單許多,最主要是處理INFORM包來執(zhí)行Leader的寫請求命令。

這里處理的是INFORM消息,Leader群發(fā)寫事務(wù)時,給Follower發(fā)的是PROPOSAL并要等待Follower確認(rèn);而給Observer發(fā)的則是INFORM消息并且不需要Obverver回復(fù)ACK消息來確認(rèn)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容