Hadoop 源碼學習筆記(5)--Hdfs 之 BlockManager 解析

之前的章節(jié)說過 FSDirectory 中記錄了所有的文件節(jié)點信息,而具體的文件內(nèi)容則被分布式的存儲在各個 DataNode 上。盡管通過 FSDirectory 我們能夠知道每個節(jié)點對應的路徑的真實路徑,但假如我們需要對整個存儲塊信息進行統(tǒng)一歸屬,那么 FSDirectory 由于是樹狀結(jié)構(gòu)的形式,每次查找信息都需要遍歷樹中的每一個節(jié)點,效率太低,因此產(chǎn)生了 BlockManager 負責統(tǒng)一調(diào)度 DataNode 的存儲消息。

如果把 Hdfs 比喻成一個人的軀干,那么 NameNode 就是他的大腦,了解每一個節(jié)點的狀態(tài)信息,控制并管理四肢( DataNode )的運作。而 BlockManager 則是整個軀干中的心臟,他源源不斷的接收來自四肢的血液( BlockInfo), 再反將血液( Command )傳輸回四肢中,讓其能夠正常運作。

BlockManager

BlockManager 主要的成員有以下這些:

  1. blocksMap: 雖然已經(jīng)有了 FSDirectory 類負責維護整個文件系統(tǒng)的樹狀結(jié)構(gòu),但在樹狀結(jié)構(gòu)中進行數(shù)據(jù)查找的效率較低,在 BlockManager 內(nèi)部在 blocksMap 中維護了一個定長的數(shù)組,在 Block 類中通過重載 hashcode() 函數(shù),實現(xiàn) blockId 和 hash 碼的一一對應,確保不會出現(xiàn)多個 block 對應同一個 hash 碼的情況,使得從 blocksMap 中取數(shù)據(jù)的時間復雜度為 O(1) 。
  2. DatanodeManager: BlockManager 負責接收管理來自 DataNode 的消息,具體的管理操作由 DatanodeManager 接管,他負責監(jiān)控 DataNode 節(jié)點的狀態(tài)變化以及消費 Block 信息變化指令。
  3. DocommissionManager: 管理需要退役或檢修的節(jié)點信息,在確保這些節(jié)點上的數(shù)據(jù)都被成功轉(zhuǎn)移后,才將節(jié)點置為退役和檢修狀態(tài),避免直接設置導致的數(shù)據(jù)丟失。

DataNode 的三板斧 register, heartbeat & reportBlock

DataNode 同 NameNode 之間是一個單向通信模型。NameNode 為了保證自身的運行效率,不會主動向 DataNode 發(fā)起通信請求,因此所有通信行為都由 DataNode 主動觸發(fā)。

DataNode 中針對每個 NameNode 節(jié)點會單獨啟動一個 BPServiceActor 的線程對象,這個對象負責同 NameNode 建立通信鏈接,并定時發(fā)送心跳和存儲塊信息。

BPServiceActor

如圖所示,啟動 BPServiceActor 后,首先向 NameNode 進行 register, 之后會進行一次完整的節(jié)點存儲 block 信息上報,然后進入心跳流程,定時通過 sendHeartbeat 告知 NameNode 當前節(jié)點存活,如果有發(fā)生塊信息變動,則在發(fā)送心跳之后,會嘗試 sendIBRs (increment block report) 發(fā)送增量的塊信息變動情況。

DatanodeManager 節(jié)點管理

DatanodeManager

Register

當接收到來自 DataNode 的 register 請求后,會依據(jù)傳遞過來的 DatanodeRegistration 構(gòu)造出一個 DatanodeDescriptor 對象,并放入 HeartbeatManager 中的數(shù)組中,此時我們認為 DataNode 節(jié)點已經(jīng)在集群中注冊完畢,但這個節(jié)點中究竟有哪些 Block 信息仍然是未知的。

blockReport

DataNode 通過 blockReport 之后,在 BlockManager::processReport 對上報的 Block 信息進行消費

public boolean processReport(final DatanodeID nodeID,
      final DatanodeStorage storage,
      final BlockListAsLongs newReport,
    DatanodeStorageInfo storageInfo = node.getStorageInfo(storage.getStorageID());

    if (storageInfo == null) {
        // We handle this for backwards compatibility.
        storageInfo = node.updateStorage(storage);
    }
    
    if (storageInfo.getBlockReportCount() == 0) {
        processFirstBlockReport(storageInfo, newReport);
    } else {
        invalidatedBlocks = processReport(storageInfo, newReport, context);
    }
    storageInfo.receivedBlockReport();
}

DataNode 節(jié)點上可能有多個存儲位置用于存放 Block 數(shù)據(jù),對于每個存儲位置,都有一個 DataStorage 對象和他對應。在 BlockManager::processReport 會以 DataStorage 為單位進行消費。

private Collection<Block> processReport(
      final DatanodeStorageInfo storageInfo,
      final BlockListAsLongs report,
      BlockReportContext context) throws IOException {
    Iterable<BlockReportReplica> sortedReport;
    if (!sorted) {
        Set<BlockReportReplica> set = new FoldedTreeSet<>();
        for (BlockReportReplica iblk : report) {
            set.add(new BlockReportReplica(iblk));
        }
        sortedReport = set;
    } else {
        sortedReport = report;
    }
    // 篩選出需要進行額外處理的 Block
    reportDiffSorted(storageInfo, sortedReport,
                     toAdd, toRemove, toInvalidate, toCorrupt, toUC);
}

需要留意的是 FoldedTreeSet 是一個基于紅黑樹構(gòu)造的 BlockReportReplica 遍歷器,如果發(fā)現(xiàn)在 DataNode 端沒有預先對 report 進行排序,則在這里會對 report 進行再次排序,方便在 reportDiffSorted 中對上報的 Block 信息進行篩選。

在 reportDiffSorted 中會對當前上報的 Block 進行分類,分別拆到不同的 List 中,其中:

  1. toAdd: 被認為是正式數(shù)據(jù),需要同 BlockInfo 進行關(guān)聯(lián)的數(shù)據(jù)
  2. toRemove: replica 的 blockId 比節(jié)點中的 blockId 更大,認為是無效數(shù)據(jù)
  3. toInvalidate: 已經(jīng)被 NameNode 移除的節(jié)點的 Replica 文件,需要通知 DataNode 移除數(shù)據(jù)
  4. toCorrupt: 與其對應的 Block 對應的節(jié)點存在,但數(shù)據(jù)和節(jié)點中的描述數(shù)據(jù)存在差異,被認為是無效數(shù)據(jù)
  5. toUC: 數(shù)據(jù)正處于寫入過程中,等待后續(xù)寫入完畢

對上報的 Block 進行解析處理之后,會根據(jù)其具體類型作出對應的處理操作,所有數(shù)據(jù)處理完畢之后,整個 Storage 的 block 信息對于 NameNode 就是已知的。

heartbeat

每次 DataNode 發(fā)送心跳后,都會在 HeartbeatManager 中更新其最近的訪問時間,用以識別該節(jié)點仍舊存活。

在 DatanodeDescriptor 中有多個集合,以 invalidateBlocks, replicateBlocks 為例,他們分別代表著已經(jīng)廢棄的 Block 和 需要向其他節(jié)點進行備份的 Block。 在更新完畢心跳信息后,會根據(jù)集合信息建立返回指令,告訴 DataNode 在本地節(jié)點需要做的行為操作,例如刪除無效的 Block 以及向其他節(jié)點進行備份操作保證數(shù)據(jù)冗余度。

同時在 HeartbeatManager 中還有一個 HeartbeatManager.Monitor 線程,通過輪詢 heartCheck() 檢測是否有 DataNode 過期。一旦發(fā)現(xiàn)有節(jié)點過期,則將其從 DatanodeManager 中移除,同時也會將其存儲空間中的 Block 信息從對應節(jié)點中移除,避免無效訪問。

Block 容災備份

為了避免由于節(jié)點異常導致的數(shù)據(jù)丟失, Hdfs 采取多地備份的策略,同時在多個 DataNode 中持有同一份數(shù)據(jù)。

在 Hdfs 中多副本容災的策略由兩部分組成,首先是在文件首次創(chuàng)建時,通過數(shù)據(jù)管道同時在多個 DataNode 中創(chuàng)建 Block ,保證創(chuàng)建時就擁有多個副本;其次是在節(jié)點因為斷開鏈接或則文件異常損壞的情況下,通過復制現(xiàn)有的副本文件到另一個節(jié)點中,保證副本的冗余度。

第一點的數(shù)據(jù)管道在前一篇文章中已經(jīng)做過介紹,在這里不再多說。第二點的副本復制策略是通過 RedundancyMonitor 線程進行實現(xiàn)的。

public void run() {
    while (namesystem.isRunning()) {
        computeDatanodeWork();
        processPendingReconstructions();
        rescanPostponedMisreplicatedBlocks();
    }
}

RedundancyMonitor 的 while 循環(huán)中,主要執(zhí)行了 computeDatanodeWork, processPendingReconstructions, rescanPostonedMisreplicatedBlocks 三個方法

computeDatanodeWork

int computeDatanodeWork() {
    int workFound = this.computeBlockReconstructionWork(blocksToProcess);
    workFound += this.computeInvalidateWork(nodesToProcess);
}

BlockManager 中有一個 LowRedundancyBlocks 對象負責記錄那些副本數(shù)量較低的 Block 信息,在 LowRedundancyBlocks 中有一個 priorityQueues 的優(yōu)先級隊列,隊列按照當前副本數(shù)量進行排列,急需備份的 Block 會被優(yōu)先取出來中進行消費。

這里我們需要知道,當前節(jié)點異常或某個 Storage 上保存的文件異常時,會對受到影響的 BlockInfo 進行分析,如果發(fā)現(xiàn)副本數(shù)量不足,則會加入 priorityQueues 隊列中,等待節(jié)點進行備份。

computeDatanodeWork 會先從 priorityQueues 取出指定個數(shù)的高優(yōu)任務,將其加入 DatanodeDescriptor 的 replicateBlocks 集合中。然后再從 BlockManager.invalidateBlocks 中拿到被移除的無效文件的 Block 信息,加入 DatanodeDescriptor 的 invalidateBlocks 集合中。

當 DataNode 發(fā)送心跳信息到 NameNode 時,會從對應的 DatanodeDescriptor 中取出 replicateBlocks 以及 invalidateBlocks,分別對應需要進行再次備份和已經(jīng)無效的 Block 塊,將其作為返回信息,返回給 DataNode 進行內(nèi)部處理

processPendingReconstructions

private void processPendingReconstructions() {
    BlockInfo[] timedOutItems = pendingReconstruction.getTimedOutBlocks();
    for (int i = 0; i < timedOutItems.length; i++) {
        if (isNeededReconstruction(bi, num)) {
            neededReconstruction.add(bi, num.liveReplicas(),
                num.readOnlyReplicas(), num.outOfServiceReplicas(),
                getExpectedRedundancyNum(bi));
        }
    }
}

pendingReconstruction 中存放著一些 NameNode 認為正在進行數(shù)據(jù)生成的 Block 信息,如果長時間等待之后,發(fā)現(xiàn) Block 的數(shù)據(jù)還沒有生成完畢,就將對應的 Block 信息再次放入 pendingReconstruction 中,等待其他 DataNode 進行數(shù)據(jù)生成。

rescanPostponedMisreplicatedBlocks

void rescanPostponedMisreplicatedBlocks() {
    Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
    for (int i=0; i < blocksPerPostpondedRescan && it.hasNext(); i++) {
        MisReplicationResult res = processMisReplicatedBlock(bi);
        if (res == MisReplicationResult.POSTPONE) {
            rescannedMisreplicatedBlocks.add(b);
        }
    }
}

由于數(shù)據(jù)在多個節(jié)點中進行傳輸,無法實時在 NameNode 的內(nèi)存中進行查看,因此有時候我們無法確定某些 Block 具體處于什么狀態(tài),此時就會返回一個 MisReplicationResult.POSTPONE 結(jié)果,將 Block 再次加入回等待隊列,直到能夠完全確定塊狀態(tài),再在 processMisReplicatedBlock 中對其進行處理。

DataNode 節(jié)點狀態(tài)變化

DataNode State

DataNode 注冊到 NameNode 之后,默認是 Normal 狀態(tài)。但是某些特定情況下,我們?nèi)绻M麑?jié)點信息操作,則可能將其變化成其他的狀態(tài)。

Docommission 退役

有時候處于業(yè)務發(fā)展需求,部分機器可能無法滿足性能要求,因此我們選擇將其下線,但此時這些機器上可能正存放著一些關(guān)鍵的 Block 塊,為了確保下線機器不會影響線上的正常服務。

我們會在 NameNode 的節(jié)點中配置一個 dfs.hosts.exclude ,用于標識那些需要廢棄的節(jié)點。在 NameNode 運行過程中,我們也可以通過 DFSAdmin 實時通知集群需要下線的節(jié)點信息。

當發(fā)現(xiàn)有機器命中了廢棄節(jié)點之后,首先檢查立刻移除該節(jié)點是否會造成 Block 丟失,如果會,則進入 DECOMMISSION_INPROGRESS 狀態(tài)。

當節(jié)點處于 DECOMMISSION_INPROGRESS 狀態(tài)時,不會再有新的 Block 塊被放到這個節(jié)點中,同時這個節(jié)點中那些需要保留下的 Block 會放入 LowRedundancyBlocks.priorityQueues 隊列中等待被備份。

在 BlockManger 中還有一個 DecommissionManager.Monitor 線程不斷的檢查那些處于 DECOMMISSION_INPROGRESS 狀態(tài)的節(jié)點,如果發(fā)現(xiàn)節(jié)點不再影響線上數(shù)據(jù),則將其設置為 DECOMMISIONED 狀態(tài),此時節(jié)點可以正式從集群中移除。

Maintenance 維護

和 Docommission 比較類似, Maintenance 也是在 NameNode 的節(jié)點中進行注冊的,用于對那些需要進行檢修的節(jié)點進行聲明。

為了避免在檢修過程中,由于當前檢修節(jié)點擁有唯一的 Replicate,導致 Block 無法訪問,和 Docommission 一樣,節(jié)點會先進入一個 ENTERING_MAINTENANCE 的狀態(tài),此時節(jié)點中那些唯一的 Block 會被逐步轉(zhuǎn)移到其他 Node 中,直到節(jié)點中的 Block 已經(jīng)被轉(zhuǎn)移,能夠確保每個 Block 在其他的 DataNode 中都能被找到,才能夠進入下一個狀態(tài) IN_MAINTENANCE,此時可以放心對節(jié)點進行處理。

但是如果始終都無法轉(zhuǎn)移 Block 完畢,導致檢修超時,則會退出 MAINTENANCE 狀態(tài)回到 NORMAL 狀態(tài)。

BlockManager 中的其他工作線程

除了 HeartbeatManager.Monitor,RedundancyMonitorDecommissionManager.Monitor 之外,BlockManager 中還有幾個獨立線程,負責周期性遍歷數(shù)據(jù)情況,優(yōu)化節(jié)點間的 Block 分布情況。

PendingReconstructionMonitor

當我們進行文件傳輸和副本備份的時候,會默認挑選出一些目標節(jié)點進行傳輸,此時我們認為這些目標節(jié)點之后會擁有特定的 Block,這時候我們會在 PendingReconstructionBlocks 的 pendingReconstructions 中存放 BlockInfo 和期待的儲存位置之間的關(guān)系。

PendingReconstructionMonitor 中會定時檢查 timeout 的 pendingConstruction 事件,放入 timedOutItems 中。

RedundancyMonitor 在進行周期檢查的時候,會取出這些過期的數(shù)據(jù),然后重新分配創(chuàng)建任務。

StorageInfoDefragmenter

最后一個獨立的工作線程是 StorageInfoDefragmenter。這個類負責優(yōu)化每個 Storage 中的 block 內(nèi)存占用。

DataNode 中的每一個存儲路徑都會被抽象成為一個 StorageInfo 對象,在這個對象中會有一棵紅黑樹 FoldedTreeSet 用來保存存儲路徑中的節(jié)點信息。隨著 DataNode 節(jié)點的不斷運行,不斷有新的 Replica 被創(chuàng)建,有舊的 Replica 被移除,如果長時間不對 FoldedTreeSet 的內(nèi)存做優(yōu)化,則其內(nèi)部的數(shù)據(jù)占用空間會越來越來,影響節(jié)點性能。

為了解決這個問題,在 BlockManager 中會啟動一個 StorageInfoDefragmenter 的線程,定期通過 scanAndCompactStorages() 方法找到紅黑樹中 fillRatio 占用比較低的樹,然后通過 compat 對樹的節(jié)點進行壓縮,減少內(nèi)存占用情況。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容