zookeeper單機版-數(shù)據(jù)恢復

前言

zookeeper服務端對于每次接受到的事務性操作(節(jié)點的CRUD)都會先寫log,同時zookeeper服務端還會周期性的根據(jù)事物數(shù)來持久化服務端的數(shù)據(jù)到磁盤(snapshot)
zookeeper服務端在啟動的時候會從log和snapshot文件來恢復數(shù)據(jù)

數(shù)據(jù)恢復過程

zookeeper數(shù)據(jù)恢復分成兩個部分

  • 從snapshot恢復
  • 從log恢復
    為什么需要從兩個部分恢復呢?
    從前言的描述我們知道每一個事物性的操作都會先寫log,所以說log包含了所有的事物操作記錄。snapshot是zookeeper按照一定的條件產(chǎn)生的服務端數(shù)據(jù)持久化文件,那么可不可以直接從snapshot文件去恢復數(shù)據(jù)而不用管log文件呢?答案是不可以,因為存在這樣的可能:zookeeper已經(jīng)處理了一些事物但是這些操作還沒有達到觸發(fā)系統(tǒng)生成新snapshot文件的條件,如果在這個時候zookeeper server 宕機了,zookeeper如果只是從最新的已經(jīng)生成的snapshot文件去恢復數(shù)據(jù)的話就會導致從上一個snapshot生成到宕機這段時間內(nèi)的所有事物操作都會丟失。所有這個時候需要借助log去幫忙恢復那部分沒有被snapshot持久化的事物。另一個問題是可不可以直接通過log去恢復,答案是不可以。如果全部的數(shù)據(jù)都從log中恢復那么就必須保存全的log數(shù)據(jù),同時從log恢復數(shù)據(jù)的效率也是很低的。

log,snap格式

log_snap_filename_format.png

上圖是我本機zookeeper生成的log和snapshot列表
我們注意log文件名稱格式為log.x,snapshot文件格式為snapshot.y。在zookeeper中每一個事物操作都會被分配一個事物id,事物id由服務端統(tǒng)一生成,是一個遞增的數(shù)字,比如事物B比事物A晚發(fā)生,事物A分配的事物id是1,那么事物B的事物id一定是大于1的一個數(shù),可能是2,3......具體是什么值取決于在事物A和事物B之間zookeeper是否還處理過別的事物,以及如果處理了處理了多少個。

  • snapshot,log文件名后綴的意義

我們看到snapshot文件格式為snapshot.x,log文件格式為log.y,我們知道x,y都是代表zookeeper處理事務的id,對于log來說log里面記錄的事物id都是大于或者等于y的,也就是說log.y記錄的第一個事物的id是y,那么一個log文件會記錄多少個事物呢?在zookeeper中每一個log文件都是固定64M,在生成一個新的log文件的時候會預先在磁盤把這64M空間分配好,這樣做的目的是為了加快每次寫log的速度,因為是直接分配了64M磁盤,這些空間在磁盤上是連續(xù)的,這樣一方面可以省去隨機寫磁盤導致的尋道耗時,另一方面操作系統(tǒng)讀文件的時候可以緩存整塊的log文件,減少了緩存頁換進換出的次數(shù)。對于snapshot.x來說存儲的事物id都是小于x,也就是說snapshot.x中存儲的最后一個事物的id是x

  • log文件內(nèi)容
log_after_parse.png

上圖是我本機一個zookeeper log文件解析出來的結(jié)果,我們使用框出來的這條記錄來分析下log包含的主要信息

  • 事物操作發(fā)送的時間
  • 事物操作會話id(session)
  • 事物操作在客戶端的編碼id cxid
  • 事物操作在服務端的id zxid
  • 具體的事物
  1. 操作類型(create2)
  2. 節(jié)點名稱 /test/hsbxxxxxxx
  3. 節(jié)點存儲的信息 #xxxxxx
  4. 權(quán)限控制信息 v{s{31,s{'world,'anyone}}}
  • 摘要信息
1. 摘要版本 2
2. 摘要信息 10546528799
  • snapshot文件內(nèi)容
snapshot_znode_parse.png

snapshot_session_parse.png

上面兩個圖是我們本機一個snapshot文件解析的結(jié)果,我們可以看到snapshot包含了兩部分信息:znode和session

  1. znode : 在snapshot文件生成的時間點zookeeper會持久化客戶端在zookeeper上創(chuàng)建的所有節(jié)點信息。
  2. session: 客戶端創(chuàng)建的session信息也會被持久化到snapshot中
znode的數(shù)據(jù)內(nèi)容

znode主要保存以下信息

  • Czxid 創(chuàng)建節(jié)點的事物id
  • ctime 創(chuàng)建時間
  • mZxid 修改節(jié)點的事物id
  • mtime 修改時間
  • pZxid 孩子節(jié)點的最新事物id
  • cversion 創(chuàng)建的版本信息
  • dataVersion 數(shù)據(jù)版本信息
  • aclVersion acl 版本信息
  • ephemeralOwner 如果是瞬時節(jié)點,對應的是sessionid
  • dataLength 節(jié)點存儲數(shù)據(jù)的長度
    還有一些信息沒有顯示比如節(jié)點存儲的數(shù)據(jù),節(jié)點的acl信息
session信息

session主要保存了下面的信息

  • sessionid
  • 超時時間
  • sessionid所擁有的瞬時節(jié)點數(shù)量

通過上面的知識鋪墊我們現(xiàn)在正式進入zookeeper數(shù)據(jù)恢復的流程
從上面的介紹我們可以推論出,如果我們能找到最新的沒有損害的snapshot.x_newest,然后根據(jù)x去獲得所有比x大的log.m我們標記為log<m> 以及最大的比x小的log.y_less_but_max,這樣我們通過snapshot.x_newest,log<m>和log.y_less_but_max就可以恢復出zookeeper的數(shù)據(jù)了,對于圖log_snap_filename_format.png中數(shù)據(jù),只需要使用snapshot.30和log.27就可以完成數(shù)據(jù)的恢復

ZookeeperServer.startData

zookeeper在啟動的時候通過ZookeeperServer.startData方法進行數(shù)據(jù)恢復

  public void startdata() throws IOException, InterruptedException {
        //check to see if zkDb is not null  
        //ZKDatabase是zookeeper數(shù)據(jù)存儲對象
        if (zkDb == null) {
            zkDb = new ZKDatabase(this.txnLogFactory);
        }
        if (!zkDb.isInitialized()) {
           //進行數(shù)據(jù)恢復
            loadData();
        }
    }
loadData
  public void loadData() throws IOException, InterruptedException {
      
        if (zkDb.isInitialized()) {
            setZxid(zkDb.getDataTreeLastProcessedZxid());
        } else {
            //通過zkDb.loadDataBase()進行數(shù)據(jù)恢復
            setZxid(zkDb.loadDataBase());
        }

        // Clean up dead sessions
       //處理掉已經(jīng)掛掉的會話
        List<Long> deadSessions = new ArrayList<>();
        for (Long session : zkDb.getSessions()) {
            if (zkDb.getSessionWithTimeOuts().get(session) == null) {
                deadSessions.add(session);
            }
        }

        for (long session : deadSessions) {
            // TODO: Is lastProcessedZxid really the best thing to use?
            killSession(session, zkDb.getDataTreeLastProcessedZxid());
        }
       
       //數(shù)據(jù)恢復完成后對數(shù)據(jù)庫生成一個全新的snapshot
        // Make a clean snapshot
        takeSnapshot();
    }
ZKDatabase.loadDatabase()

zookeeper數(shù)據(jù)恢復其實就是恢復ZKDatabase中各個屬性的數(shù)據(jù)

public long loadDataBase() throws IOException {
        long startTime = Time.currentElapsedTime();
         //snaplog把數(shù)據(jù)恢復到dataTree,sessionsWithTimeouts中,返回恢復數(shù)據(jù)后得到的最大的zxid
        //dataTree存儲了zookeeper的節(jié)點信息,sessionsWithTimeouts存儲了會話的信息
        long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
        initialized = true;
        long loadTime = Time.currentElapsedTime() - startTime;
        ServerMetrics.getMetrics().DB_INIT_TIME.add(loadTime);
        LOG.info("Snapshot loaded in {} ms, highest zxid is 0x{}, digest is {}",
                loadTime, Long.toHexString(zxid), dataTree.getTreeDigest());
        return zxid;
    }

到這里我們講解下DataTree。

DataTree

DataTree是zookeeper的數(shù)據(jù)存儲引擎類,下圖是DataTree關(guān)鍵屬性的截圖


dataTree.png

從上圖我們知道nodes屬性存儲是zookeeper所有的節(jié)點信息


NodeHashMap.png

zookeeper在內(nèi)部使用ConcurrentHashMap作為節(jié)點的容器,容器的key是節(jié)點的名稱,value是節(jié)點對象的表示類DataNode,DataNode的屬性如下


data_node.png
snapLog.restore

我們繼續(xù)講解數(shù)據(jù)恢復方法鏈上的snapLog.restore方法

 public long restore(DataTree dt, Map<Long, Integer> sessions, PlayBackListener listener) throws IOException {
        long snapLoadingStartTime = Time.currentElapsedTime();
       //snapLog是snapshot的表示類,snapLog.deserialize用來恢復snapshot數(shù)據(jù)
        long deserializeResult = snapLog.deserialize(dt, sessions);
        ServerMetrics.getMetrics().STARTUP_SNAP_LOAD_TIME.add(Time.currentElapsedTime() - snapLoadingStartTime);
        //創(chuàng)建日志文件表示類
        FileTxnLog txnLog = new FileTxnLog(dataDir);
        boolean trustEmptyDB;
        File initFile = new File(dataDir.getParent(), "initialize");
        if (Files.deleteIfExists(initFile.toPath())) {
            LOG.info("Initialize file found, an empty database will not block voting participation");
            trustEmptyDB = true;
        } else {
            trustEmptyDB = autoCreateDB;
        }
        //log文件恢復邏輯在RestoreFinalizer.run中實現(xiàn)
        RestoreFinalizer finalizer = () -> {
            long highestZxid = fastForwardFromEdits(dt, sessions, listener);
            // The snapshotZxidDigest will reset after replaying the txn of the
            // zxid in the snapshotZxidDigest, if it's not reset to null after
            // restoring, it means either there are not enough txns to cover that
            // zxid or that txn is missing
            DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot();
            if (snapshotZxidDigest != null) {
                LOG.warn(
                        "Highest txn zxid 0x{} is not covering the snapshot digest zxid 0x{}, "
                                + "which might lead to inconsistent state",
                        Long.toHexString(highestZxid),
                        Long.toHexString(snapshotZxidDigest.getZxid()));
            }
            return highestZxid;
        };

        if (-1L == deserializeResult) {
            /* this means that we couldn't find any snapshot, so we need to
             * initialize an empty database (reported in ZOOKEEPER-2325) */
            if (txnLog.getLastLoggedZxid() != -1) {
                // ZOOKEEPER-3056: provides an escape hatch for users upgrading
                // from old versions of zookeeper (3.4.x, pre 3.5.3).
                if (!trustEmptySnapshot) {
                    throw new IOException(EMPTY_SNAPSHOT_WARNING + "Something is broken!");
                } else {
                    LOG.warn("{}This should only be allowed during upgrading.", EMPTY_SNAPSHOT_WARNING);
                    return finalizer.run();
                }
            }

            if (trustEmptyDB) {
                /* TODO: (br33d) we should either put a ConcurrentHashMap on restore()
                 *       or use Map on save() */
                save(dt, (ConcurrentHashMap<Long, Integer>) sessions, false);

                /* return a zxid of 0, since we know the database is empty */
                return 0L;
            } else {
                /* return a zxid of -1, since we are possibly missing data */
                LOG.warn("Unexpected empty data tree, setting zxid to -1");
                dt.lastProcessedZxid = -1L;
                return -1L;
            }
        }

        return finalizer.run();
    }

SnapShot.deserialize

我看下從snapshot恢復數(shù)據(jù)的源代碼

public long deserialize(DataTree dt, Map<Long, Integer> sessions) throws IOException {
        // we run through 100 snapshots (not all of them)
        // if we cannot get it running within 100 snapshots
        // we should  give up
      //findNValidSnapshots方法從存放snapshot文件的文件夾中按照snapshot后綴事物id從大到小排序獲取最多前100個,
        List<File> snapList = findNValidSnapshots(100);
        if (snapList.size() == 0) {
            return -1L;
        }
        File snap = null;
        long snapZxid = -1;
        boolean foundValid = false;
        //下面會嘗試從snapList保存的snapshot.x文件去恢復數(shù)據(jù),只要有一個snapshot.x能恢復成功,剩下的snapshot.x就不會再被解析
        for (int i = 0, snapListSize = snapList.size(); i < snapListSize; i++) {
            //獲取當前解析的snapshot文件
            snap = snapList.get(i);
            LOG.info("Reading snapshot {}", snap);
            //獲取當前snapshot文件的后綴,也就是這個snapshot文件保存的最大事物id
            snapZxid = Util.getZxidFromName(snap.getName(), SNAPSHOT_FILE_PREFIX);
            try (CheckedInputStream snapIS = SnapStream.getInputStream(snap)) {
              //使用當前snapshot文件創(chuàng)建數(shù)據(jù)讀取流
                InputArchive ia = BinaryInputArchive.getArchive(snapIS);
               //解析snapshot文件保存的節(jié)點信息和session信息
                deserialize(dt, sessions, ia);
               //檢查snapshot中的數(shù)據(jù)是否被損壞
                SnapStream.checkSealIntegrity(snapIS, ia);

                // Digest feature was added after the CRC to make it backward
                // compatible, the older code can still read snapshots which
                // includes digest.
                //
                // To check the intact, after adding digest we added another
                // CRC check.
                //解析摘要信息
                if (dt.deserializeZxidDigest(ia, snapZxid)) {
               //檢查snapshot中的數(shù)據(jù)是否被損壞

                    SnapStream.checkSealIntegrity(snapIS, ia);
                }
                //解析成功,停止繼續(xù)解析剩下的snapshot文件
                foundValid = true;
                break;
            } catch (IOException e) {
                LOG.warn("problem reading snap file {}", snap, e);
            }
        }
        if (!foundValid) {
            throw new IOException("Not able to find valid snapshots in " + snapDir);
        }
         //記錄DataTree中目前最新的事物id為snapZxid
        dt.lastProcessedZxid = snapZxid;
        lastSnapshotInfo = new SnapshotInfo(dt.lastProcessedZxid, snap.lastModified() / 1000);

        // compare the digest if this is not a fuzzy snapshot, we want to compare
        // and find inconsistent asap.
        if (dt.getDigestFromLoadedSnapshot() != null) {
            dt.compareSnapshotDigests(dt.lastProcessedZxid);
        }
        return dt.lastProcessedZxid;
    }
deserialize

snapshot文件解析deserialize方法

public void deserialize(DataTree dt, Map<Long, Integer> sessions, InputArchive ia) throws IOException {
        
        FileHeader header = new FileHeader();
       //先反序列化文件頭FileHeader
        header.deserialize(ia, "fileheader");   
        if (header.getMagic() != SNAP_MAGIC) {
            throw new IOException("mismatching magic headers " + header.getMagic() + " !=  " + FileSnap.SNAP_MAGIC);
        }
         //
        SerializeUtils.deserializeSnapshot(dt, ia, sessions);
    }
SerializeUtils.deserializeSnapshot

public static void deserializeSnapshot(DataTree dt, InputArchive ia, Map<Long, Integer> sessions) throws IOException {
        //先恢復session信息
        int count = ia.readInt("count");
        while (count > 0) {
            //session信息包含兩個部分id,timeout,下面分別從文件中反序列出來
            long id = ia.readLong("id");
            int to = ia.readInt("timeout");
            sessions.put(id, to);
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(
                    LOG,
                    ZooTrace.SESSION_TRACE_MASK,
                    "loadData --- session in archive: " + id + " with timeout: " + to);
            }
            count--;
        }
       //dataTree對象的反序列化
        dt.deserialize(ia, "tree");
    }

DataTree.deserialize()
public void deserialize(InputArchive ia, String tag) throws IOException {
        //先解析acl的信息
        aclCache.deserialize(ia);
        nodes.clear();
        pTrie.clear();
        nodeDataSize.set(0);
        //讀取節(jié)點信息
        String path = ia.readString("path");
        while (!"/".equals(path)) {
            DataNode node = new DataNode();
          //反序列化DataNode對象
            ia.readRecord(node, "node");
            nodes.put(path, node);
            synchronized (node) {
                aclCache.addUsage(node.acl);
            }
            int lastSlash = path.lastIndexOf('/');
            if (lastSlash == -1) {
                root = node;
            } else {
                String parentPath = path.substring(0, lastSlash);
                DataNode parent = nodes.get(parentPath);
                if (parent == null) {
                    throw new IOException("Invalid Datatree, unable to find "
                                          + "parent "
                                          + parentPath
                                          + " of path "
                                          + path);
                }
               //那自己加入到父節(jié)點信息中
                parent.addChild(path.substring(lastSlash + 1));
                long eowner = node.stat.getEphemeralOwner();
                EphemeralType ephemeralType = EphemeralType.get(eowner);
                if (ephemeralType == EphemeralType.CONTAINER) {
                    containers.add(path);
                } else if (ephemeralType == EphemeralType.TTL) {
                    ttls.add(path);
                } else if (eowner != 0) {
                    HashSet<String> list = ephemerals.get(eowner);
                    if (list == null) {
                        list = new HashSet<String>();
                        ephemerals.put(eowner, list);
                    }
                    list.add(path);
                }
            }
            path = ia.readString("path");
        }
        // have counted digest for root node with "", ignore here to avoid
        // counting twice for root node
        nodes.putWithoutDigest("/", root);

        nodeDataSize.set(approximateDataSize());

        // we are done with deserializing the
        // the datatree
        // update the quotas - create path trie
        // and also update the stat nodes
       //跟新各個節(jié)點的用戶設置的配額信息
        setupQuota();
       //把沒有使用到的acl信息清楚
        aclCache.purgeUnused();
    }

上面是snapshot恢復,我接下來看下log文件的恢復

RestoreFinalizer.run
   
long highestZxid = fastForwardFromEdits(dt, sessions, listener);
            // The snapshotZxidDigest will reset after replaying the txn of the
            // zxid in the snapshotZxidDigest, if it's not reset to null after
            // restoring, it means either there are not enough txns to cover that
            // zxid or that txn is missing
            DataTree.ZxidDigest snapshotZxidDigest = dt.getDigestFromLoadedSnapshot();
            if (snapshotZxidDigest != null) {
                LOG.warn(
                        "Highest txn zxid 0x{} is not covering the snapshot digest zxid 0x{}, "
                                + "which might lead to inconsistent state",
                        Long.toHexString(highestZxid),
                        Long.toHexString(snapshotZxidDigest.getZxid()));
            }
            return highestZxid;

fastForwardFromEdits
 public long fastForwardFromEdits(
        DataTree dt,
        Map<Long, Integer> sessions,
        PlayBackListener listener) throws IOException {
       // txnLog.read(dt.lastProcessedZxid + 1) 生成的就是所有l(wèi)og后綴大于lastProcessedZxid和最大的小于lastProcessedZxid + 1的log的iterator對象
        TxnIterator itr = txnLog.read(dt.lastProcessedZxid + 1);
        long highestZxid = dt.lastProcessedZxid;
        TxnHeader hdr;
        int txnLoaded = 0;
        long startTime = Time.currentElapsedTime();
        try {
            while (true) {
                // iterator points to
                // the first valid txn when initialized
                hdr = itr.getHeader();
                if (hdr == null) {
                    //empty logs
                    return dt.lastProcessedZxid;
                }
                if (hdr.getZxid() < highestZxid && highestZxid != 0) {
                     //如果解析到的事物日志id小于當前highestZxid的值那么打印錯誤
                    LOG.error("{}(highestZxid) > {}(next log) for type {}", highestZxid, hdr.getZxid(), hdr.getType());
                } else {
                    //更新highestZxid為當前事物日志的id
                    highestZxid = hdr.getZxid();
                }
                try {
                    //解析這條事物日志結(jié)果寫入到DataTree中,這個地方在這里我們就不展開去講解了,在zookeeper創(chuàng)建節(jié)點的分析中我們會解析
                    processTransaction(hdr, dt, sessions, itr.getTxn());
                    dt.compareDigest(hdr, itr.getTxn(), itr.getDigest());
                    txnLoaded++;
                } catch (KeeperException.NoNodeException e) {
                    throw new IOException("Failed to process transaction type: "
                                          + hdr.getType()
                                          + " error: "
                                          + e.getMessage(),
                                          e);
                }
                listener.onTxnLoaded(hdr, itr.getTxn(), itr.getDigest());
                
                if (!itr.next()) {
                    break;
                }
            }
        } finally {
            if (itr != null) {
                itr.close();
            }
        }

        long loadTime = Time.currentElapsedTime() - startTime;
        LOG.info("{} txns loaded in {} ms", txnLoaded, loadTime);
        ServerMetrics.getMetrics().STARTUP_TXNS_LOADED.add(txnLoaded);
        ServerMetrics.getMetrics().STARTUP_TXNS_LOAD_TIME.add(loadTime);
         //返回zookeeper已經(jīng)處理的最大事物id
        return highestZxid;
    }

TxnIterator.next()

next()是解析log文件下一條記錄的方法

public boolean next() throws IOException {
            if (ia == null) {
                return false;
            }
            try {
                //解析日志條目的crc碼
                long crcValue = ia.readLong("crcvalue");
                //讀取一行日志
                byte[] bytes = Util.readTxnBytes(ia);
                // Since we preallocate, we define EOF to be an
                if (bytes == null || bytes.length == 0) {
                    throw new EOFException("Failed to read " + logFile);
                }
                // EOF or corrupted record
                // validate CRC
                Checksum crc = makeChecksumAlgorithm();
                crc.update(bytes, 0, bytes.length);
                if (crcValue != crc.getValue()) {
                    throw new IOException(CRC_ERROR);
                }
                //把日志二進制數(shù)據(jù)轉(zhuǎn)化成TxnLogEntry對象
                TxnLogEntry logEntry = SerializeUtils.deserializeTxn(bytes);
                hdr = logEntry.getHeader();
               //record是日志操作體
                record = logEntry.getTxn();
              //日志的摘要
                digest = logEntry.getDigest();
            } catch (EOFException e) {
                LOG.debug("EOF exception", e);
                inputStream.close();
                inputStream = null;
                ia = null;
                hdr = null;
                // this means that the file has ended
                // we should go to the next file
                //本log已經(jīng)解析完成,創(chuàng)建下一個文件的讀取流
                if (!goToNextLog()) {
                    return false;
                }
                // if we went to the next log file, we should call next() again
                //解析下一個文件的第一條記錄
                return next();
            } catch (IOException e) {
                inputStream.close();
                throw e;
            }
            return true;
        }

結(jié)語

上面就是zookeeper數(shù)據(jù)恢復的流程和源碼

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容