背景
對于Upsert操作,Hudi需要定位到數(shù)據(jù)所在的File Group。當(dāng)File Group很多的時候,定位File Group的過程會成為性能瓶頸。
Hudi 提供了索引的方式,保存了每個record key和他所屬的file id的對應(yīng)關(guān)系。然后將這些對應(yīng)關(guān)系保存到外部存儲系統(tǒng)(HBase, Flink狀態(tài)后端等)。這種方式需要引入外部系統(tǒng),運(yùn)維的復(fù)雜度較高且索引數(shù)據(jù)量較大。除此之外Hudi還提供了Bloom filter方式。每個parquet文件都對應(yīng)一個bloom filter。通過這個bloom filter可以很容易確定數(shù)據(jù)不在這個parquet文件。有助于在掃描parquet文件的時候快速跳過無關(guān)的文件。但是在確認(rèn)數(shù)據(jù)在某個parquet的時候,因bloom filter存在誤判的可能性,需要逐條比對數(shù)據(jù),存在較大的性能消耗。
在這個背景下提出了Hudi bucket Index。它是一種優(yōu)化措施,將每個partition中的file group分為N份,N為bucket個數(shù)。每個分區(qū)下的File group個數(shù)一旦確定不再會變化(除了Clustering的時候)。未啟用bucket index的情況下file group的file id使用UUID標(biāo)識。啟用了bucket index之后。每個file id的前8為被替換為bucket number(同一個partition中的不同bucket使用bucket number標(biāo)識)。通過數(shù)據(jù)的record key取hash運(yùn)算可以將數(shù)據(jù)映射到不同的bucket上。也就是說bucket index通過partition -> bucket number兩個層級來定位record所屬的file group。這兩級查找時間復(fù)雜度都是O(1),無需遍歷數(shù)據(jù)文件,極大的提高了查找的速度。
除此之外,在查詢的時候如果使用bucket字段作為查詢篩選條件,由于bucket字段相同的數(shù)據(jù)一定位于同一個bucket中,可以跳過其他的file group,減少掃描的數(shù)據(jù)量。
使用bucket index需要注意的是,每個partition的bucket數(shù)量一旦確定就無法更改。Hudi的小文件處理策略和大文件分塊不再有效。所以說使用前需要預(yù)估數(shù)據(jù)量。如果bucket數(shù)量過少,每個file group文件大小會過大,不利于并發(fā)處理。如果bucket數(shù)量過多,會遇到大量小文件問題,會增大分布式文件系統(tǒng)元數(shù)據(jù)負(fù)載,降低持續(xù)讀寫性能。
Bucket index配置項(xiàng)
- index.type(Flink) / hoodie.index.type(Spark)。使用的索引類型。如果要使用bucket index,需要配置為
BUCKET。 - hoodie.bucket.index.num.buckets。bucket個數(shù),默認(rèn)為256。在Flink中默認(rèn)為4。
- hoodie.bucket.index.hash.field。按照哪個資源hash分桶。不配置默認(rèn)使用record key。
Bucket Index的原理
Pipelines
我們從構(gòu)建bucket寫入邏輯的BucketStreamWriteOperator所在的Pipelines的hoodieStreamWrite方法開始分析。它的代碼如下:
public static DataStream<Object> hoodieStreamWrite(Configuration conf, DataStream<HoodieRecord> dataStream) {
// 如果表的index類型是BUCKET。對應(yīng)配置項(xiàng)index.type
if (OptionsResolver.isBucketIndexType(conf)) {
// 使用Bucket類型的StreamWriter
WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
// 獲取bucket個數(shù),對應(yīng)配置項(xiàng)hoodie.bucket.index.num.buckets。在Flink中默認(rèn)為4
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
// 獲取index key字段,對應(yīng)配置項(xiàng)hoodie.bucket.index.hash.field。如果沒有配置,使用record key
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
// 使用bucket index分區(qū)器,根據(jù)record key,partition和flink channel數(shù)量分區(qū)
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
.transform(opName("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
.uid(opUID("bucket_write", conf))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
} else {
// 其他數(shù)據(jù)類型使用StreamWriteOperator
WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
return dataStream
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
// 先按照record key分區(qū)
// 這里的bucket概念和bucket index中的bucket不同
// 這里的bucket是根據(jù)file id和partition分組的概念,同時還考慮到了小文件聚合(將insert的數(shù)據(jù)優(yōu)先分配到小文件)
// bucket作為整體flush到磁盤上
.keyBy(HoodieRecord::getRecordKey)
// 分配bucket
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
.uid(opUID("bucket_assigner", conf))
.setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
// shuffle by fileId(bucket id)
// 確定好數(shù)據(jù)所屬的file id,分區(qū)寫入
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
.uid(opUID("stream_write", conf))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
}
}
BucketStreamWriteOperator代碼較少,在構(gòu)造函數(shù)中創(chuàng)建出了BucketStreamWriteFunction。我們接下來分析它。
BucketStreamWriteFunction
在processElement方法中,Hudi根據(jù)record的key計算出record對應(yīng)的bucket number。結(jié)合record所在的partition可以很快的確定數(shù)據(jù)所在的file group。
@Override
public void processElement(I i, ProcessFunction<I, Object>.Context context, Collector<Object> collector) throws Exception {
HoodieRecord<?> record = (HoodieRecord<?>) i;
// 獲取key
final HoodieKey hoodieKey = record.getKey();
// 獲取partition path
final String partition = hoodieKey.getPartitionPath();
final HoodieRecordLocation location;
// 在同一個partition中,從Hudi表中讀取bucket number和file id的對應(yīng)關(guān)系,放入索引中(bucketIndex)
// 后面分析
bootstrapIndexIfNeed(partition);
// 從索引讀取該分區(qū)中bucket number和file id的對應(yīng)關(guān)系
Map<Integer, String> bucketToFileId = bucketIndex.computeIfAbsent(partition, p -> new HashMap<>());
// 獲取record對應(yīng)的bucket number
// 代碼為getHashKeys(hoodieKey, indexKeyFields).hashCode() & Integer.MAX_VALUE) % numBuckets
// indexKeyField為配置項(xiàng)hoodie.bucket.index.hash.field的值,如果沒有配置,使用record key
final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, indexKeyFields, this.bucketNum);
// 組裝bucket id
final String bucketId = partition + "/" + bucketNum;
// incBucketIndex是新增數(shù)據(jù)的bucketIndex緩存
if (incBucketIndex.contains(bucketId)) {
// 如果是新增數(shù)據(jù)
// 根據(jù)bucket number找到對應(yīng)的file group的id
location = new HoodieRecordLocation("I", bucketToFileId.get(bucketNum));
} else if (bucketToFileId.containsKey(bucketNum)) {
// 如果索引中有,說明是修改的數(shù)據(jù)
location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum));
} else {
// 如果任何索引中都沒有,該bucket number還沒有對應(yīng)的file group,需要創(chuàng)建一個
// 生成新的file id,替換前8位為bucket number
String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
location = new HoodieRecordLocation("I", newFileId);
// 加入索引緩存中
bucketToFileId.put(bucketNum, newFileId);
incBucketIndex.add(bucketId);
}
record.unseal();
// 設(shè)置數(shù)據(jù)的location
record.setCurrentLocation(location);
record.seal();
// 將數(shù)據(jù)緩存起來
bufferRecord(record);
}
bootstrapIndexIfNeed方法在指定的partition中,從Hudi表中讀取bucket number和file id的對應(yīng)關(guān)系放入索引。代碼如下:
private void bootstrapIndexIfNeed(String partition) {
// 如果是insert overwrite,跳過
if (OptionsResolver.isInsertOverwrite(config)) {
// skips the index loading for insert overwrite operation.
return;
}
// 如果partition已被索引,返回
if (bucketIndex.containsKey(partition)) {
return;
}
LOG.info(String.format("Loading Hoodie Table %s, with path %s", this.metaClient.getTableConfig().getTableName(),
this.metaClient.getBasePath() + "/" + partition));
// Load existing fileID belongs to this task
// 索引的數(shù)據(jù)類型為map,key為bucket number
// valu為file id
Map<Integer, String> bucketToFileIDMap = new HashMap<>();
// 遍歷partition中的所有file slice
this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice -> {
// 獲取file id
String fileId = fileSlice.getFileId();
// file id的前8位是bucket number,獲取它
int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileId);
// 檢查這個bucket是否歸本task處理
// 每個task只緩存自己需要處理的bucket的索引
if (isBucketToLoad(bucketNumber, partition)) {
LOG.info(String.format("Should load this partition bucket %s with fileId %s", bucketNumber, fileId));
// Validate that one bucketId has only ONE fileId
// 檢查一個bucket number只對應(yīng)一個file id
if (bucketToFileIDMap.containsKey(bucketNumber)) {
throw new RuntimeException(String.format("Duplicate fileId %s from bucket %s of partition %s found "
+ "during the BucketStreamWriteFunction index bootstrap.", fileId, bucketNumber, partition));
} else {
LOG.info(String.format("Adding fileId %s to the bucket %s of partition %s.", fileId, bucketNumber, partition));
// 對應(yīng)關(guān)系加入緩存中
bucketToFileIDMap.put(bucketNumber, fileId);
}
}
});
// 加入緩存
bucketIndex.put(partition, bucketToFileIDMap);
}
Flink Hudi默認(rèn)的state索引
作為對比,我們再去分析下Flink state索引的實(shí)現(xiàn)方式。Flink state索引保存了record key和file id的對應(yīng)關(guān)系,保存在Flink的狀態(tài)后端中。
接下來我們分別分析使用索引和加載索引的方式。
使用索引方式
按照前面Pipelines的分析,在數(shù)據(jù)流向BucketAssignFunction的processElement方法之前已經(jīng)按照record key分區(qū)。所以索引和record key是一一對應(yīng)關(guān)系。
@Override
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
if (value instanceof IndexRecord) {
// 如果讀進(jìn)來的數(shù)據(jù)是IndexRecord類型,說明處于加載索引的階段
IndexRecord<?> indexRecord = (IndexRecord<?>) value;
// 更新保存的索引狀態(tài)
this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation());
} else {
// 處理用戶數(shù)據(jù)
processRecord((HoodieRecord<?>) value, out);
}
}
processRecord方法讀取狀態(tài)中的索引。如果record的partition path沒有發(fā)生變化,數(shù)據(jù)還在原先索引指向的位置,否則需要分配新的位置,更新索引。數(shù)據(jù)的位置和partition, record key有關(guān)。
private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exception {
// 1. put the record into the BucketAssigner;
// 2. look up the state for location, if the record has a location, just send it out;
// 3. if it is an INSERT, decide the location using the BucketAssigner then send it out.
final HoodieKey hoodieKey = record.getKey();
// 獲取key和partition path
final String recordKey = hoodieKey.getRecordKey();
final String partitionPath = hoodieKey.getPartitionPath();
final HoodieRecordLocation location;
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
// 從狀態(tài)中獲取上次record位置
HoodieRecordGlobalLocation oldLoc = indexState.value();
// upsert,upsert_prepped或者delete的時候isChangingRecords為true
if (isChangingRecords && oldLoc != null) {
// Set up the instant time as "U" to mark the bucket as an update bucket.
// 如果partition path發(fā)生了變化
// record的partition字段值發(fā)生變化會導(dǎo)致partition path發(fā)生變化
if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
// 如果開啟了全局索引,意思如果是新舊數(shù)據(jù)的partition path不同,是否更新舊數(shù)據(jù)的partition path
if (globalIndex) {
// if partition path changes, emit a delete record for old partition path,
// then update the index state using location with new partition path.
// 生成一個刪除類型的數(shù)據(jù),指向舊的partition path
HoodieRecord<?> deleteRecord = new HoodieAvroRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()),
payloadCreation.createDeletePayload((BaseAvroPayload) record.getData()));
deleteRecord.unseal();
// 設(shè)置instant time為U
deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
deleteRecord.seal();
out.collect((O) deleteRecord);
}
// 獲取新數(shù)據(jù)的location
location = getNewRecordLocation(partitionPath);
} else {
// 如果partition path沒有發(fā)生變化
location = oldLoc.toLocal("U");
// 為update類型record創(chuàng)建或加入bucket
this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
}
} else {
// 新增數(shù)據(jù),創(chuàng)建新的location
location = getNewRecordLocation(partitionPath);
}
// always refresh the index
if (isChangingRecords) {
// 如果數(shù)據(jù)更新,需要緊接著更新index狀態(tài)變量
updateIndexState(partitionPath, location);
}
// 配置record的location
record.unseal();
record.setCurrentLocation(location);
record.seal();
out.collect((O) record);
}
這段方法中BucketAssigner更詳細(xì)的分析可以參考Hudi 源碼之?dāng)?shù)據(jù)寫入邏輯。
加載索引的方式
Pipelines::bootstrap啟動方法流式啟動調(diào)用的是streamBootstrap。該方法創(chuàng)建了BootstrapOperator。
BootstrapOperator在啟動初始化狀態(tài)量的時候調(diào)用initializeState,從Hudi表加載索引。
@Override
public void initializeState(StateInitializationContext context) throws Exception {
ListStateDescriptor<String> instantStateDescriptor = new ListStateDescriptor<>(
"instantStateDescriptor",
Types.STRING
);
instantState = context.getOperatorStateStore().getListState(instantStateDescriptor);
if (context.isRestored()) {
Iterator<String> instantIterator = instantState.get().iterator();
if (instantIterator.hasNext()) {
lastInstantTime = instantIterator.next();
}
}
this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf, true);
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient().getFs(), this.writeConfig.getBasePath());
this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
preLoadIndexRecords();
}
繼續(xù)分析preLoadIndexRecords方法。該方法判斷需要加載哪些分區(qū)的索引。代碼如下:
protected void preLoadIndexRecords() throws Exception {
String basePath = hoodieTable.getMetaClient().getBasePath();
int taskID = getRuntimeContext().getIndexOfThisSubtask();
LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID);
// 遍歷所有的分區(qū)
for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) {
// 如果分區(qū)名稱匹配正則(對應(yīng)index.partition.regex配置項(xiàng)),加載該分區(qū)的索引
if (pattern.matcher(partitionPath).matches()) {
loadRecords(partitionPath);
}
}
LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
// wait for the other bootstrap tasks finish bootstrapping.
waitForBootstrapReady(getRuntimeContext().getIndexOfThisSubtask());
}
loadRecords方法讀取所有分區(qū)下的record(包含base file和log),包裝為IndexRecord發(fā)往下游。Flink下游算子接收到IndexRecord會更新狀態(tài)變量。
protected void loadRecords(String partitionPath) throws Exception {
long start = System.currentTimeMillis();
// 獲取并行度
final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
// 最大并行度
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
// 當(dāng)前作業(yè)id
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
HoodieTimeline commitsTimeline = this.hoodieTable.getMetaClient().getCommitsTimeline();
// 獲取上次snapshot之后的所有commit組成的timeline
if (!StringUtils.isNullOrEmpty(lastInstantTime)) {
commitsTimeline = commitsTimeline.findInstantsAfter(lastInstantTime);
}
// 找到最近的已完成的commit instant
Option<HoodieInstant> latestCommitTime = commitsTimeline.filterCompletedInstants().lastInstant();
// 如果存在
if (latestCommitTime.isPresent()) {
// 根據(jù)不同的文件類型,獲取不同的文件讀取工具
BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
// 讀取schema
Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
// 獲取latestCommitTime之前的instant,如果有還沒有完成的compaction,將這個的instant和前一個合并(將這兩個file slice的log文件視為一個file slice的)后返回
List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
.getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
.collect(toList());
for (FileSlice fileSlice : fileSlices) {
// 如果這個fileSlice不歸該任務(wù)處理,跳過
if (!shouldLoadFile(fileSlice.getFileId(), maxParallelism, parallelism, taskID)) {
continue;
}
LOG.info("Load records from {}.", fileSlice);
// load parquet records
fileSlice.getBaseFile().ifPresent(baseFile -> {
// filter out crushed files
// 如果base file為空或受損,跳過
if (!isValidFile(baseFile.getFileStatus())) {
return;
}
try (ClosableIterator<HoodieKey> iterator = fileUtils.getHoodieKeyIterator(this.hadoopConf, new Path(baseFile.getPath()))) {
// 逐個讀取base file中保存的record,包裝為IndexRecord類型發(fā)往下游
iterator.forEachRemaining(hoodieKey -> {
output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
});
}
});
// load avro log records
// 獲取所有的log file路徑
List<String> logPaths = fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
// filter out crushed files
.filter(logFile -> isValidFile(logFile.getFileStatus()))
.map(logFile -> logFile.getPath().toString())
.collect(toList());
HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
writeConfig, hadoopConf);
try {
// 讀取出這些log文件中的數(shù)據(jù),包裝為IndexRecord類型發(fā)往下游
for (String recordKey : scanner.getRecords().keySet()) {
output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice))));
}
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
} finally {
scanner.close();
}
}
}
long cost = System.currentTimeMillis() - start;
LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.",
this.getClass().getSimpleName(), taskID, partitionPath, cost);
}
參考文獻(xiàn)
Hudi Bucket Index 在字節(jié)跳動的設(shè)計與實(shí)踐 - 知乎 (zhihu.com)