Hudi 源碼之Bucket Index

背景

對于Upsert操作,Hudi需要定位到數(shù)據(jù)所在的File Group。當(dāng)File Group很多的時候,定位File Group的過程會成為性能瓶頸。

Hudi 提供了索引的方式,保存了每個record key和他所屬的file id的對應(yīng)關(guān)系。然后將這些對應(yīng)關(guān)系保存到外部存儲系統(tǒng)(HBase, Flink狀態(tài)后端等)。這種方式需要引入外部系統(tǒng),運(yùn)維的復(fù)雜度較高且索引數(shù)據(jù)量較大。除此之外Hudi還提供了Bloom filter方式。每個parquet文件都對應(yīng)一個bloom filter。通過這個bloom filter可以很容易確定數(shù)據(jù)不在這個parquet文件。有助于在掃描parquet文件的時候快速跳過無關(guān)的文件。但是在確認(rèn)數(shù)據(jù)在某個parquet的時候,因bloom filter存在誤判的可能性,需要逐條比對數(shù)據(jù),存在較大的性能消耗。

在這個背景下提出了Hudi bucket Index。它是一種優(yōu)化措施,將每個partition中的file group分為N份,N為bucket個數(shù)。每個分區(qū)下的File group個數(shù)一旦確定不再會變化(除了Clustering的時候)。未啟用bucket index的情況下file group的file id使用UUID標(biāo)識。啟用了bucket index之后。每個file id的前8為被替換為bucket number(同一個partition中的不同bucket使用bucket number標(biāo)識)。通過數(shù)據(jù)的record key取hash運(yùn)算可以將數(shù)據(jù)映射到不同的bucket上。也就是說bucket index通過partition -> bucket number兩個層級來定位record所屬的file group。這兩級查找時間復(fù)雜度都是O(1),無需遍歷數(shù)據(jù)文件,極大的提高了查找的速度。

除此之外,在查詢的時候如果使用bucket字段作為查詢篩選條件,由于bucket字段相同的數(shù)據(jù)一定位于同一個bucket中,可以跳過其他的file group,減少掃描的數(shù)據(jù)量。

使用bucket index需要注意的是,每個partition的bucket數(shù)量一旦確定就無法更改。Hudi的小文件處理策略和大文件分塊不再有效。所以說使用前需要預(yù)估數(shù)據(jù)量。如果bucket數(shù)量過少,每個file group文件大小會過大,不利于并發(fā)處理。如果bucket數(shù)量過多,會遇到大量小文件問題,會增大分布式文件系統(tǒng)元數(shù)據(jù)負(fù)載,降低持續(xù)讀寫性能。

Bucket index配置項(xiàng)

  • index.type(Flink) / hoodie.index.type(Spark)。使用的索引類型。如果要使用bucket index,需要配置為BUCKET
  • hoodie.bucket.index.num.buckets。bucket個數(shù),默認(rèn)為256。在Flink中默認(rèn)為4。
  • hoodie.bucket.index.hash.field。按照哪個資源hash分桶。不配置默認(rèn)使用record key。

Bucket Index的原理

Pipelines

我們從構(gòu)建bucket寫入邏輯的BucketStreamWriteOperator所在的PipelineshoodieStreamWrite方法開始分析。它的代碼如下:

public static DataStream<Object> hoodieStreamWrite(Configuration conf, DataStream<HoodieRecord> dataStream) {
    // 如果表的index類型是BUCKET。對應(yīng)配置項(xiàng)index.type
    if (OptionsResolver.isBucketIndexType(conf)) {
        // 使用Bucket類型的StreamWriter
        WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
        // 獲取bucket個數(shù),對應(yīng)配置項(xiàng)hoodie.bucket.index.num.buckets。在Flink中默認(rèn)為4
        int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
        // 獲取index key字段,對應(yīng)配置項(xiàng)hoodie.bucket.index.hash.field。如果沒有配置,使用record key
        String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
        // 使用bucket index分區(qū)器,根據(jù)record key,partition和flink channel數(shù)量分區(qū)
        BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
        return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
            .transform(opName("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
            .uid(opUID("bucket_write", conf))
            .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
    } else {
        // 其他數(shù)據(jù)類型使用StreamWriteOperator
        WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
        return dataStream
            // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
            // 先按照record key分區(qū)
            // 這里的bucket概念和bucket index中的bucket不同
            // 這里的bucket是根據(jù)file id和partition分組的概念,同時還考慮到了小文件聚合(將insert的數(shù)據(jù)優(yōu)先分配到小文件)
            // bucket作為整體flush到磁盤上
            .keyBy(HoodieRecord::getRecordKey)
            // 分配bucket
            .transform(
            "bucket_assigner",
            TypeInformation.of(HoodieRecord.class),
            new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
            .uid(opUID("bucket_assigner", conf))
            .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
            // shuffle by fileId(bucket id)
            // 確定好數(shù)據(jù)所屬的file id,分區(qū)寫入
            .keyBy(record -> record.getCurrentLocation().getFileId())
            .transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
            .uid(opUID("stream_write", conf))
            .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
    }
}

BucketStreamWriteOperator代碼較少,在構(gòu)造函數(shù)中創(chuàng)建出了BucketStreamWriteFunction。我們接下來分析它。

BucketStreamWriteFunction

processElement方法中,Hudi根據(jù)record的key計算出record對應(yīng)的bucket number。結(jié)合record所在的partition可以很快的確定數(shù)據(jù)所在的file group。

@Override
public void processElement(I i, ProcessFunction<I, Object>.Context context, Collector<Object> collector) throws Exception {
    HoodieRecord<?> record = (HoodieRecord<?>) i;
    // 獲取key
    final HoodieKey hoodieKey = record.getKey();
    // 獲取partition path
    final String partition = hoodieKey.getPartitionPath();
    final HoodieRecordLocation location;

    // 在同一個partition中,從Hudi表中讀取bucket number和file id的對應(yīng)關(guān)系,放入索引中(bucketIndex)
    // 后面分析
    bootstrapIndexIfNeed(partition);
    // 從索引讀取該分區(qū)中bucket number和file id的對應(yīng)關(guān)系
    Map<Integer, String> bucketToFileId = bucketIndex.computeIfAbsent(partition, p -> new HashMap<>());
    // 獲取record對應(yīng)的bucket number
    // 代碼為getHashKeys(hoodieKey, indexKeyFields).hashCode() & Integer.MAX_VALUE) % numBuckets
    // indexKeyField為配置項(xiàng)hoodie.bucket.index.hash.field的值,如果沒有配置,使用record key
    final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, indexKeyFields, this.bucketNum);
    // 組裝bucket id
    final String bucketId = partition + "/" + bucketNum;

    // incBucketIndex是新增數(shù)據(jù)的bucketIndex緩存
    if (incBucketIndex.contains(bucketId)) {
        // 如果是新增數(shù)據(jù)
        // 根據(jù)bucket number找到對應(yīng)的file group的id
        location = new HoodieRecordLocation("I", bucketToFileId.get(bucketNum));
    } else if (bucketToFileId.containsKey(bucketNum)) {
        // 如果索引中有,說明是修改的數(shù)據(jù)
        location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum));
    } else {
        // 如果任何索引中都沒有,該bucket number還沒有對應(yīng)的file group,需要創(chuàng)建一個
        // 生成新的file id,替換前8位為bucket number
        String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
        location = new HoodieRecordLocation("I", newFileId);
        // 加入索引緩存中
        bucketToFileId.put(bucketNum, newFileId);
        incBucketIndex.add(bucketId);
    }
    record.unseal();
    // 設(shè)置數(shù)據(jù)的location
    record.setCurrentLocation(location);
    record.seal();
    // 將數(shù)據(jù)緩存起來
    bufferRecord(record);
}

bootstrapIndexIfNeed方法在指定的partition中,從Hudi表中讀取bucket number和file id的對應(yīng)關(guān)系放入索引。代碼如下:

private void bootstrapIndexIfNeed(String partition) {
    // 如果是insert overwrite,跳過
    if (OptionsResolver.isInsertOverwrite(config)) {
        // skips the index loading for insert overwrite operation.
        return;
    }
    // 如果partition已被索引,返回
    if (bucketIndex.containsKey(partition)) {
        return;
    }
    LOG.info(String.format("Loading Hoodie Table %s, with path %s", this.metaClient.getTableConfig().getTableName(),
                           this.metaClient.getBasePath() + "/" + partition));

    // Load existing fileID belongs to this task
    // 索引的數(shù)據(jù)類型為map,key為bucket number
    // valu為file id
    Map<Integer, String> bucketToFileIDMap = new HashMap<>();
    // 遍歷partition中的所有file slice
    this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice -> {
        // 獲取file id
        String fileId = fileSlice.getFileId();
        // file id的前8位是bucket number,獲取它
        int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileId);
        // 檢查這個bucket是否歸本task處理
        // 每個task只緩存自己需要處理的bucket的索引
        if (isBucketToLoad(bucketNumber, partition)) {
            LOG.info(String.format("Should load this partition bucket %s with fileId %s", bucketNumber, fileId));
            // Validate that one bucketId has only ONE fileId
            // 檢查一個bucket number只對應(yīng)一個file id
            if (bucketToFileIDMap.containsKey(bucketNumber)) {
                throw new RuntimeException(String.format("Duplicate fileId %s from bucket %s of partition %s found "
                                                         + "during the BucketStreamWriteFunction index bootstrap.", fileId, bucketNumber, partition));
            } else {
                LOG.info(String.format("Adding fileId %s to the bucket %s of partition %s.", fileId, bucketNumber, partition));
                // 對應(yīng)關(guān)系加入緩存中
                bucketToFileIDMap.put(bucketNumber, fileId);
            }
        }
    });
    // 加入緩存
    bucketIndex.put(partition, bucketToFileIDMap);
}

Flink Hudi默認(rèn)的state索引

作為對比,我們再去分析下Flink state索引的實(shí)現(xiàn)方式。Flink state索引保存了record key和file id的對應(yīng)關(guān)系,保存在Flink的狀態(tài)后端中。

接下來我們分別分析使用索引和加載索引的方式。

使用索引方式

按照前面Pipelines的分析,在數(shù)據(jù)流向BucketAssignFunctionprocessElement方法之前已經(jīng)按照record key分區(qū)。所以索引和record key是一一對應(yīng)關(guān)系。

@Override
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
    if (value instanceof IndexRecord) {
        // 如果讀進(jìn)來的數(shù)據(jù)是IndexRecord類型,說明處于加載索引的階段
        IndexRecord<?> indexRecord = (IndexRecord<?>) value;
        // 更新保存的索引狀態(tài)
        this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation());
    } else {
        // 處理用戶數(shù)據(jù)
        processRecord((HoodieRecord<?>) value, out);
    }
}

processRecord方法讀取狀態(tài)中的索引。如果record的partition path沒有發(fā)生變化,數(shù)據(jù)還在原先索引指向的位置,否則需要分配新的位置,更新索引。數(shù)據(jù)的位置和partition, record key有關(guān)。

private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exception {
    // 1. put the record into the BucketAssigner;
    // 2. look up the state for location, if the record has a location, just send it out;
    // 3. if it is an INSERT, decide the location using the BucketAssigner then send it out.
    final HoodieKey hoodieKey = record.getKey();
    // 獲取key和partition path
    final String recordKey = hoodieKey.getRecordKey();
    final String partitionPath = hoodieKey.getPartitionPath();
    final HoodieRecordLocation location;

    // Only changing records need looking up the index for the location,
    // append only records are always recognized as INSERT.
    // 從狀態(tài)中獲取上次record位置
    HoodieRecordGlobalLocation oldLoc = indexState.value();
    // upsert,upsert_prepped或者delete的時候isChangingRecords為true
    if (isChangingRecords && oldLoc != null) {
        // Set up the instant time as "U" to mark the bucket as an update bucket.
        // 如果partition path發(fā)生了變化
        // record的partition字段值發(fā)生變化會導(dǎo)致partition path發(fā)生變化
        if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
            // 如果開啟了全局索引,意思如果是新舊數(shù)據(jù)的partition path不同,是否更新舊數(shù)據(jù)的partition path
            if (globalIndex) {
                // if partition path changes, emit a delete record for old partition path,
                // then update the index state using location with new partition path.
                // 生成一個刪除類型的數(shù)據(jù),指向舊的partition path
                HoodieRecord<?> deleteRecord = new HoodieAvroRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()),
                                                                      payloadCreation.createDeletePayload((BaseAvroPayload) record.getData()));

                deleteRecord.unseal();
                // 設(shè)置instant time為U
                deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
                deleteRecord.seal();

                out.collect((O) deleteRecord);
            }
            // 獲取新數(shù)據(jù)的location
            location = getNewRecordLocation(partitionPath);
        } else {
            // 如果partition path沒有發(fā)生變化
            location = oldLoc.toLocal("U");
            // 為update類型record創(chuàng)建或加入bucket
            this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
        }
    } else {
        // 新增數(shù)據(jù),創(chuàng)建新的location
        location = getNewRecordLocation(partitionPath);
    }
    // always refresh the index
    if (isChangingRecords) {
        // 如果數(shù)據(jù)更新,需要緊接著更新index狀態(tài)變量
        updateIndexState(partitionPath, location);
    }

    // 配置record的location
    record.unseal();
    record.setCurrentLocation(location);
    record.seal();

    out.collect((O) record);
}

這段方法中BucketAssigner更詳細(xì)的分析可以參考Hudi 源碼之?dāng)?shù)據(jù)寫入邏輯。

加載索引的方式

Pipelines::bootstrap啟動方法流式啟動調(diào)用的是streamBootstrap。該方法創(chuàng)建了BootstrapOperator。

BootstrapOperator在啟動初始化狀態(tài)量的時候調(diào)用initializeState,從Hudi表加載索引。

@Override
public void initializeState(StateInitializationContext context) throws Exception {
    ListStateDescriptor<String> instantStateDescriptor = new ListStateDescriptor<>(
        "instantStateDescriptor",
        Types.STRING
    );
    instantState = context.getOperatorStateStore().getListState(instantStateDescriptor);

    if (context.isRestored()) {
        Iterator<String> instantIterator = instantState.get().iterator();
        if (instantIterator.hasNext()) {
            lastInstantTime = instantIterator.next();
        }
    }

    this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
    this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf, true);
    this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
    this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient().getFs(), this.writeConfig.getBasePath());
    this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();

    preLoadIndexRecords();
}

繼續(xù)分析preLoadIndexRecords方法。該方法判斷需要加載哪些分區(qū)的索引。代碼如下:

protected void preLoadIndexRecords() throws Exception {
    String basePath = hoodieTable.getMetaClient().getBasePath();
    int taskID = getRuntimeContext().getIndexOfThisSubtask();
    LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID);
    // 遍歷所有的分區(qū)
    for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) {
        // 如果分區(qū)名稱匹配正則(對應(yīng)index.partition.regex配置項(xiàng)),加載該分區(qū)的索引
        if (pattern.matcher(partitionPath).matches()) {
            loadRecords(partitionPath);
        }
    }

    LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());

    // wait for the other bootstrap tasks finish bootstrapping.
    waitForBootstrapReady(getRuntimeContext().getIndexOfThisSubtask());
}

loadRecords方法讀取所有分區(qū)下的record(包含base file和log),包裝為IndexRecord發(fā)往下游。Flink下游算子接收到IndexRecord會更新狀態(tài)變量。

protected void loadRecords(String partitionPath) throws Exception {
    long start = System.currentTimeMillis();

    // 獲取并行度
    final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
    // 最大并行度
    final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
    // 當(dāng)前作業(yè)id
    final int taskID = getRuntimeContext().getIndexOfThisSubtask();

    HoodieTimeline commitsTimeline = this.hoodieTable.getMetaClient().getCommitsTimeline();
    // 獲取上次snapshot之后的所有commit組成的timeline
    if (!StringUtils.isNullOrEmpty(lastInstantTime)) {
        commitsTimeline = commitsTimeline.findInstantsAfter(lastInstantTime);
    }
    // 找到最近的已完成的commit instant
    Option<HoodieInstant> latestCommitTime = commitsTimeline.filterCompletedInstants().lastInstant();

    // 如果存在
    if (latestCommitTime.isPresent()) {
        // 根據(jù)不同的文件類型,獲取不同的文件讀取工具
        BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
        // 讀取schema
        Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();

        // 獲取latestCommitTime之前的instant,如果有還沒有完成的compaction,將這個的instant和前一個合并(將這兩個file slice的log文件視為一個file slice的)后返回
        List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
            .getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
            .collect(toList());

        for (FileSlice fileSlice : fileSlices) {
            // 如果這個fileSlice不歸該任務(wù)處理,跳過
            if (!shouldLoadFile(fileSlice.getFileId(), maxParallelism, parallelism, taskID)) {
                continue;
            }
            LOG.info("Load records from {}.", fileSlice);

            // load parquet records
            fileSlice.getBaseFile().ifPresent(baseFile -> {
                // filter out crushed files
                // 如果base file為空或受損,跳過
                if (!isValidFile(baseFile.getFileStatus())) {
                    return;
                }
                try (ClosableIterator<HoodieKey> iterator = fileUtils.getHoodieKeyIterator(this.hadoopConf, new Path(baseFile.getPath()))) {
                    // 逐個讀取base file中保存的record,包裝為IndexRecord類型發(fā)往下游
                    iterator.forEachRemaining(hoodieKey -> {
                        output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
                    });
                }
            });

            // load avro log records
            // 獲取所有的log file路徑
            List<String> logPaths = fileSlice.getLogFiles()
                .sorted(HoodieLogFile.getLogFileComparator())
                // filter out crushed files
                .filter(logFile -> isValidFile(logFile.getFileStatus()))
                .map(logFile -> logFile.getPath().toString())
                .collect(toList());
            HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
                                                                          writeConfig, hadoopConf);

            try {
                // 讀取出這些log文件中的數(shù)據(jù),包裝為IndexRecord類型發(fā)往下游
                for (String recordKey : scanner.getRecords().keySet()) {
                    output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice))));
                }
            } catch (Exception e) {
                throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
            } finally {
                scanner.close();
            }
        }
    }

    long cost = System.currentTimeMillis() - start;
    LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.",
             this.getClass().getSimpleName(), taskID, partitionPath, cost);
}

參考文獻(xiàn)

Hudi Bucket Index 在字節(jié)跳動的設(shè)計與實(shí)踐 - 知乎 (zhihu.com)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容