近期工作中使用hbase bulkload向hbase導(dǎo)入2TB數(shù)據(jù)(10000個hfiles),我們發(fā)現(xiàn)將hfiles加載到hbase 表的過程用了將近一個小時。這和我對bulk load過程的理解不太相符,在我的理解中,hbase bulkload并不會產(chǎn)生數(shù)據(jù)copy,數(shù)據(jù)導(dǎo)入通過hdfs的mv操作完成。那么,
問題1 :是什么操作消耗了一個小時呢?
另外,對bulkload一直有個疑問,
問題2 :基于某個hbase cluster中的表生成的hfiles能否導(dǎo)入到其他hbase cluster中的相同表(表名和列簇都相同)中,bulkload會自動處理兩個集群中表的region分布差異嗎?
帶著上述兩個問題閱讀了hbase中LoadIncrementalHFiles類的代碼,本文對此做個梳理。
run方法
LoadIncrementalHFiles的main函數(shù)調(diào)用run方法:

run方法做了三件事:
1. 初始化
2. 判斷要導(dǎo)入的表是否存在,不存在且參數(shù)create.table為yes, 則創(chuàng)建該表;不存在且create.table不為yes則拋出異常TableNotFoundException
3. 調(diào)用doBulkLoad
初始化
LoadIncrementalHFiles的初始化過程比較簡單,主要是對hbase admin等對象的初始化:

doBulkLoad
doBulkLoad方法執(zhí)行以下步驟:
1. 創(chuàng)建線程池
創(chuàng)建用于bulkload的線程池, 線程池大小由參數(shù)hbase.loadincremental.threads.max控制,默認(rèn)為當(dāng)前機器的core數(shù)量。源碼如下:

2. 初始化加載項隊列
遍歷指定目錄,為每個hfile生成一個LoadQueueItem對象并添加到隊列中(下文我們稱此隊列為LQI隊列,稱隊列中的元素為LQI)中,該步驟由discoverLoadQueue方法完成。

單個hfile的大小不應(yīng)超過HREGION_MAX_FILESIZE, 該值由參數(shù)hbase.hregion.max.filesize控制,默認(rèn)為10GB。
一個LQI代表一個加載項,LoadQueueItem類的源碼如下 :

需要加載的文件在HDFS上按照column family被分配在不同的子目錄下,每個子目錄下的一個文件就對應(yīng)一個LQI。
discoverLoadQueue方法中調(diào)用了visitBulkHFiles方法遍歷hfile所在的HDFS目錄,visitBulkHFiles方法對每個hfile會做一系列validation :

過濾掉reference,link, 以'_'開頭的,以及非hfile格式的文件。
3. 檢查column family的有效性
在discoverLoadQueue完成對所有hfiles的遍歷后,會對queue中所有的items進行column family的check,如果存在某個item的column family不屬于目標(biāo)表,則拋出異常:

4. 循環(huán)分組加載

while循環(huán)的每次迭代主要執(zhí)行g(shù)roupOrSplit和bulkLoad兩個phase的操作:
a) groupOrSplitPhase
把queue中的所有文件根據(jù)目標(biāo)表的region metadata進行分組,把每個文件劃分到其所屬region。

如果某個hfile的[firstkey, lastkey]不在任何region的[starkey, endkey]范圍內(nèi),則將此hfile拆分成兩個文件(拆分后的文件后綴為.top和.bottom),拆分的split key就是firstkey所在region的endkey。

拆分后得到的兩個hfile會被封裝成LQI再添加回LQI隊列,這就是為什么需要一個while循環(huán)判斷LQI隊列是否為空。需注意,拆分后,第一個LQI肯定會在某個region范圍內(nèi)(除非在下次迭代加載該LQI之前目標(biāo)region又發(fā)生了split),第二個LQI有可能仍需拆分。另外,還要注意,做完split之后,老的文件并不會刪除,所以bulkload過程結(jié)束后hfile的目錄下可能會有一些殘留文件,就是那些做了split之后留下的原始文件。

groupOrSplitPhase完成之后,所有可加載的LQI都會被放到regionGroups中。regionGroups是一個Multimap,key為region的startkey,value為對應(yīng)的LQI,一個region可對應(yīng)多個LQI。
b) bulkLoadPhase :?
對于regionGroups中的每個key(即region的startkey),調(diào)用方法tryAtomicRegionLoad將其對應(yīng)的所有LQI加載到目標(biāo)table中。如果加載失敗,則將failed LQI再加入到LQI隊列中,供下一循環(huán)檢測和加載。tryAtomicRegionLoad方法會連接hbase region server,發(fā)送SecureBulkLoadHFilesRequest請求。
groupOrSplit和bulkLoad的操作都是通過上面創(chuàng)建的線程池對所有hfile并發(fā)執(zhí)行的。除了這兩個phase的操作外,while循環(huán)中還會檢測一些異常情況:
a) 對于doBulkLoad中while(!queue.isEmpty)循環(huán),如果經(jīng)過maxRetries次嘗試后,LQI隊列仍不為空,則拋出異常。maxRetries由參數(shù)hbase.bulkload.retries.number控制,默認(rèn)為10 :

b) 經(jīng)過groupOrSplitPhase后,如果regionGroups中單個region單個column family對應(yīng)的hfile個數(shù)超過了maxFilesPerRegionPerFamily,則拋出異常:

maxFilesPerRegionPerFamily由參數(shù)hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily控制,默認(rèn)為32。
問題1解答
通過上面的分析,我們知道bulkload過程對hfile目錄進行遍歷,每個hfile都會進行一系列validation,生成LQI,最終調(diào)用tryAtomicRegionLoad進行加載。我們通過打印每個步驟的耗時,發(fā)現(xiàn)檢測hfile文件格式,即visitBulkHFiles中調(diào)用的isHFileFormat方法是主要的耗時步驟, 這是因為visitBulkHFiles方法是在主線程串行執(zhí)行的,我們有10000個文件,并且每次isHFileFormat都會讀取hfile的file trailer,所以累計時間很長。
我們通過添加一個hbase配置項hbase.client.bulk.load.validate.hfile.format來控制是否進行hfile格式檢測,當(dāng)將其設(shè)置為false時,加載2TB數(shù)據(jù)(10000個hfile)從之前的1個小時縮短為10分鐘。繞過文件格式檢查的前提是我們確定hfile的format都是正確的。我們還可以通過減少hfile的個數(shù)來減少bulkload在客戶端運行的時間。還有一個可能的解決方案是將visitBulkHFiles方法修改成多線程執(zhí)行,以后有時間可以嘗試一下。
問題2解答
答案是肯定的。如上文所述,bulkload會將hfile的[firstkey, lastkey]和目標(biāo)表region的[startkey, endkey]進行匹配,如果匹配失敗則會進行文件拆分,所以不用擔(dān)心不同集群表中region的差異。
總結(jié)
本文對hbase bulkload的客戶端過程進行了分析,詳述了hfile的遍歷,檢測,分組,拆分,加載等步驟,并對文中開頭提出的兩個問題進行了解答。
水平有限,若有誤解,望讀者指正。
說明
hbase源碼版本:1.1.2