什么是Clustering
開門見山,Clustering主要有兩個(gè)作用:數(shù)據(jù)小文件合并和重排序。
當(dāng)數(shù)據(jù)寫入Hudi表時(shí),為了提高寫入效率和存儲利用率,可能會產(chǎn)生大量小文件。Hudi的Clustering機(jī)制允許在后臺周期性地將這些小文件合并成大文件,從而減少存儲碎片和元數(shù)據(jù)管理開銷,提高查詢性能。
Clustering過程可以重新組織和排序數(shù)據(jù),依據(jù)用戶指定的列進(jìn)行排序,這樣能提升相關(guān)查詢的性能,比如范圍掃描或者JOIN操作,通過預(yù)排序的數(shù)據(jù),查詢引擎能夠更高效地處理查詢請求。
本篇首先介紹clustering的配置和操作,然后分析clustering的源代碼,包含clustering執(zhí)行計(jì)劃的創(chuàng)建和根據(jù)計(jì)劃執(zhí)行clustering過程兩個(gè)部分。
Clustering分區(qū)過濾策略
Clustering分區(qū)過濾策略按照hoodie.clustering.plan.partition.filter.mode配置項(xiàng)過濾出所需的partition。有如下選項(xiàng):
- NONE: 不過濾,返回所有partition path。
- RECENT_DAYS: 按照partition path倒序排序。跳過
hoodie.clustering.plan.strategy.daybased.skipfromlatest.partitions個(gè)partition,返回hoodie.clustering.plan.strategy.daybased.lookback.partitions個(gè)partition。如果partition path是日期,可以實(shí)現(xiàn)過濾出最近N天的數(shù)據(jù)。 - SELECTED_PARTITIONS: 獲取
hoodie.clustering.plan.strategy.cluster.begin.partition和hoodie.clustering.plan.strategy.cluster.end.partition之間的分區(qū)。 - DAY_ROLLING: 每次clustering一部分分區(qū)。如果分區(qū)的index對24取余等于排期時(shí)候當(dāng)前時(shí)間的小時(shí)數(shù),則該分區(qū)需要clustering。
配置項(xiàng)
Flink 配置項(xiàng)
- clustering.schedule.enabled:是否排期clustering。默認(rèn)值為false。
- clustering.async.enabled:是否異步執(zhí)行clustering。默認(rèn)值為false。
- clustering.delta_commits:每隔多少次commit之后觸發(fā)clustering。默認(rèn)為4。
- clustering.tasks:clustering并行度。默認(rèn)和寫入并行度相同。
- clustering.plan.strategy.daybased.lookback.partitions:對應(yīng)RECENT_DAYS策略,保留多少個(gè)分區(qū)參與clustering。默認(rèn)值為2。
- clustering.plan.strategy.daybased.skipfromlatest.partitions:對應(yīng)RECENT_DAYS策略,跳過最近多少個(gè)分區(qū),之后的分區(qū)參與clustering。默認(rèn)值為0。
- clustering.plan.strategy.cluster.begin.partition:對應(yīng)SELECTED_PARTITIONS策略,指定參與clustering的開始分區(qū)。無默認(rèn)值。
- clustering.plan.strategy.cluster.end.partition:對應(yīng)SELECTED_PARTITIONS策略,指定參與clustering的結(jié)束分區(qū)。無默認(rèn)值。
- clustering.plan.strategy.partition.regex.pattern:被該正則匹配的分區(qū)會參與clustering。無默認(rèn)值。
- clustering.plan.strategy.partition.selected:指定要參與clustering的分區(qū)。無默認(rèn)值。
- clustering.plan.strategy.class:clustering策略類。默認(rèn)值為FlinkSizeBasedClusteringPlanStrategy。選擇最近N天的分區(qū),選取較小的file slice參與clustering。
- clustering.plan.partition.filter.mode:分區(qū)過濾策略。默認(rèn)值為NONE。
- clustering.plan.strategy.target.file.max.bytes:每個(gè)clustering group(可理解為并行度)clustering完畢之后生成的文件大小上限。默認(rèn)為1GB。
- clustering.plan.strategy.small.file.limit:小于該大小的文件會認(rèn)為是clustering的參與對象。默認(rèn)值為600MB。
- clustering.plan.strategy.sort.columns:clustering排序字段。多個(gè)字段使用逗號分隔。無默認(rèn)值。
- clustering.plan.strategy.max.num.groups:clustering plan階段創(chuàng)建出的clustering group數(shù)量,對應(yīng)并行度。默認(rèn)為30。
Spark 配置項(xiàng)
- hoodie.clustering.plan.strategy.daybased.lookback.partitions:對應(yīng)RECENT_DAYS策略,保留多少個(gè)分區(qū)參與clustering。默認(rèn)值為2。
- hoodie.clustering.plan.strategy.daybased.skipfromlatest.partitions:對應(yīng)RECENT_DAYS策略,跳過最近多少個(gè)分區(qū),之后的分區(qū)參與clustering。默認(rèn)值為0。
- hoodie.clustering.plan.strategy.cluster.begin.partition:對應(yīng)SELECTED_PARTITIONS策略,指定參與clustering的開始分區(qū)。無默認(rèn)值。
- hoodie.clustering.plan.strategy.cluster.end.partition:對應(yīng)SELECTED_PARTITIONS策略,指定參與clustering的結(jié)束分區(qū)。無默認(rèn)值。
- hoodie.clustering.plan.strategy.small.file.limit:小于該大小的文件會認(rèn)為是clustering的參與對象。默認(rèn)值為300MB。
- hoodie.clustering.plan.partition.regex.pattern:被該正則匹配的分區(qū)會參與clustering。無默認(rèn)值。
- hoodie.clustering.plan.partition.selected:指定要參與clustering的分區(qū)。無默認(rèn)值。
- hoodie.clustering.plan.strategy.class:clustering plan策略。默認(rèn)為org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy。查找小文件(hoodie.clustering.plan.strategy.small.file.limit),這些小文件參與clustering。
- hoodie.clustering.execution.strategy.class:clustering執(zhí)行策略。默認(rèn)為org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy。按照指定的列排序,并滿足配置的目標(biāo)文件大小。
- hoodie.clustering.inline:是否啟用inline clustering。默認(rèn)為false。
- hoodie.clustering.inline.max.commits:最多多少次commit觸發(fā)inline clustering,控制clustering的頻率。默認(rèn)為4。
- hoodie.clustering.async.max.commits:控制async clustering的頻率。默認(rèn)為4。
- hoodie.clustering.max.parallelism:clustering的最大并行度。默認(rèn)為15。
- hoodie.clustering.group.read.parallelism:Spark從clustering group讀取數(shù)據(jù)的并行度。默認(rèn)值為20。
- hoodie.clustering.plan.partition.filter.mode:分區(qū)過濾策略。默認(rèn)為NONE。
- hoodie.clustering.plan.strategy.max.bytes.per.group:每個(gè)clustering group最多產(chǎn)生的數(shù)據(jù)量。默認(rèn)為2GB。
- hoodie.clustering.plan.strategy.max.num.groups:最大clustering group數(shù)量。每次clustering的最大操作數(shù)據(jù)量= hoodie.clustering.plan.strategy.max.bytes.per.group * hoodie.clustering.plan.strategy.max.num.groups。
- hoodie.clustering.plan.strategy.target.file.max.bytes:每個(gè)clustering group生成hoodie.clustering.plan.strategy.max.bytes.per.group / hoodie.clustering.plan.strategy.target.file.max.bytes個(gè)file group。
- hoodie.clustering.plan.strategy.single.group.clustering.enabled:是否能夠生成只有一個(gè)file group參與的clustering執(zhí)行計(jì)劃。默認(rèn)為true。
- hoodie.clustering.plan.strategy.sort.columns:clustering排序字段。多個(gè)字段使用逗號分隔。無默認(rèn)值。
- hoodie.clustering.updates.strategy:update策略。默認(rèn)為org.apache.hudi.client.clustering.update.strategy.SparkRejectUpdateStrategy。clustering的時(shí)候拒絕更新。配置org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy可允許更新。
- hoodie.clustering.schedule.inline:是否啟用inline clustering排期。默認(rèn)為false。
- hoodie.clustering.async.enabled:是否啟用async clustering。默認(rèn)為false。
- hoodie.layout.optimize.strategy:布局策略。使用LINEAR(線性),ZORDER還是HILBERT(希爾伯特曲線)。默認(rèn)值是LINEAR。
- hoodie.layout.optimize.curve.build.method:可配置DIRECT或者SAMPLE。
SpatialCurveCompositionStrategyType中SAMPLE的數(shù)據(jù)排序分布效果較DIRECT更好,但是執(zhí)行速度更慢。默認(rèn)配置的是DIRECT類型。 - hoodie.layout.optimizebuild.curve.sample.size:對應(yīng)SAMPLE類型,默認(rèn)值為200000。
- hoodie.layout.optimize.data.skipping.enable:是否在布局優(yōu)化完成后收集統(tǒng)計(jì)信息來啟用數(shù)據(jù)跳過功能。默認(rèn)為true。
- hoodie.clustering.rollback.pending.replacecommit.on.conflict:默認(rèn)值為false。如果允許對等待clustering的file group進(jìn)行更新,則應(yīng)將此配置設(shè)置為回滾失敗或pending的clustering instants。 僅當(dāng)插入或更新與pending clustering的file group存在沖突時(shí),pending clustering才會被回滾。 在設(shè)置此配置時(shí)請務(wù)必謹(jǐn)慎,特別是在非常頻繁地執(zhí)行clustering操作時(shí)。這在極少數(shù)情況下可能導(dǎo)致競態(tài)條件, 例如,在獲取到實(shí)例后但回滾完成前clustering操作已完成。
離線觸發(fā)
使用Spark
任務(wù)提交命令如下:
spark-submit \
--class org.apache.hudi.utilities.HoodieClusteringJob \
/path/to/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.15.0.jar \
--props /path/to/config/clusteringjob.properties \
--mode scheduleAndExecute \
--base-path /path/to/hudi_table/basePath \
--table-name hudi_table_schedule_clustering \
--spark-memory 1g
由于clustering的配置項(xiàng)較多,可以把這些配置項(xiàng)寫在/path/to/config/clusteringjob.properties文件中。例如:
hoodie.clustering.async.enabled=true
hoodie.clustering.async.max.commits=4
hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
hoodie.clustering.plan.strategy.small.file.limit=629145600
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
hoodie.clustering.plan.strategy.sort.columns=column1,column2
HoodieClusteringJob的參數(shù)如下:
| 參數(shù)名 | 是否必須 | 默認(rèn)值 | 備注 |
|---|---|---|---|
| --base-path | 是 | - | Hudi表根目錄 |
| --table-name | 是 | - | 表名 |
| --instant-time | 否 | - | 只在execute模式有效。指定執(zhí)行哪個(gè)instant time的clustering。如果沒有指定。執(zhí)行最早排期的clustering。使用scheduleAndExecute默認(rèn)該配置項(xiàng)會被忽略。 |
| --parallelism | 否 | 1 | clustering并行度 |
| --spark-master | 否 | - | Spark master |
| --spark-memory | 否 | - | Spark內(nèi)存 |
| --retry | 否 | 0 | 重試次數(shù) |
| --skip-clean | 否 | true | clustering完畢之后是否跳過clean |
| --retry-last-failed-clustering-job | 否 | false | 使用scheduleAndExecute有效。是否重試最近失敗的clustering job |
| --mode | 否 | - | schedule表示排期。execute表示執(zhí)行。scheduleAndExecute表示排期并執(zhí)行 |
| --help | 否 | false | 打印幫助信息 |
| --job-max-processing-time-ms | 否 | 0 | 只有--retry-last-failed-clustering-job和scheduleAndExecute是否有效。如果超過配置時(shí)間clustering job仍未完成。Hudi認(rèn)為該job失敗并重新啟動 |
| --props | 否 | - | clustering配置參數(shù)所在文件。使用properties文件格式 |
| --hoodie-conf | 否 | - | 額外的Hudi配置 |
使用Flink
Flink的HoodieFlinkClusteringJob不僅有clustering,還包含了archive和clean操作。
任務(wù)提交命令如下:
./bin/flink run \
-c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob \
lib/hudi-flink1.17-bundle-0.15.0.jar \
--path hdfs://xxx:8020/table
參數(shù)解析:
| 參數(shù)名 | 是否必須 | 默認(rèn)值 | 備注 |
|---|---|---|---|
| --path | 是 | - | Hudi表的根目錄 |
| --clustering-delta-commits | 否 | 1 | 最多多少次commit觸發(fā)clustering,控制clustering的頻率 |
| --clustering-tasks | false | -1 | Clustering task 的并發(fā)數(shù) |
| --clean-policy | false | KEEP_LATEST_COMMITS | clean策略??梢允褂肒EEP_LATEST_COMMITS, KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_BY_HOURS |
| --clean-retain-commits | 否 | 10 | 保留最近n個(gè)commit不被清理 |
| --clean-retain-hours | 否 | 24 | 保留最近n小時(shí)的commit不被清理 |
| --clean-retain-file-versions | 否 | 5 | 保留最近n個(gè)文件版本不被清理 |
| --archive-min-commits | 否 | 20 | 歸檔commit前保留的最少commit數(shù)量 |
| --archive-max-commits | 否 | 30 | 歸檔commit前保留的最多commit數(shù)量 |
| --schedule | 否 | false | 是否排期clustering plan |
| --instant-time | 否 | - | clustering instant time |
| --clean-async-enabled | 否 | false | 是否啟用異步clean |
| --plan-strategy-class | 否 | FlinkSizeBasedClusteringPlanStrategy | clustering策略類 |
| --plan-partition-filter-mode | 否 | NONE | 分區(qū)過濾模式 |
| --seq | 否 | FIFO | Clustering plan的執(zhí)行順序。 LIFO: 從最近的plan開始執(zhí)行, FIFO: 從最早的plan開始執(zhí)行 |
| --target-file-max-bytes | 否 | 1GB | 最大目標(biāo)文件 |
| --small-file-limit | 否 | 600 | 小于該大小的文件會參與clustering |
| --skip-from-latest-partitions | 否 | 0 | clustering跳過最近n個(gè)分區(qū) |
| --sort-columns | 否 | - | clustering排序字段。多個(gè)字段使用逗號分隔 |
| --sort-memory | 否 | 128 | 排序內(nèi)存大小 |
| --max-num-groups | 否 | 30 | Clustering group個(gè)數(shù) |
| --target-partitions | 否 | 2 | 參與clustering的分區(qū)數(shù) |
| --cluster-begin-partition | 否 | - | Clustering開始分區(qū) |
| --cluster-end-partition | 否 | - | Clustering結(jié)束分區(qū) |
| --partition-regex-pattern | 否 | - | 匹配該正則的partition參與clustering |
| --partition-selected | 否 | - | 指定參與clustering的分區(qū) |
| --service | 否 | false | 是否開啟 service 模式,service模式為常駐作業(yè) |
| --min-clustering-interval-seconds | 否 | 600s | 異步clustering服務(wù)的最小時(shí)間間隔 |
| --hoodie-conf | 否 | - | 額外的Hudi配置 |
| --props | 否 | - | clustering等參數(shù)配置所在文件路徑 |
創(chuàng)建clustering執(zhí)行計(jì)劃
創(chuàng)建執(zhí)行計(jì)劃位于ClusteringPlanActionExecutor類的execute方法,代碼如下所示:
@Override
public Option<HoodieClusteringPlan> execute() {
// 創(chuàng)建執(zhí)行計(jì)劃
Option<HoodieClusteringPlan> planOption = createClusteringPlan();
// 如果計(jì)劃創(chuàng)建成功(可能存在沒有file slice需要cluster的情況)
if (planOption.isPresent()) {
// 創(chuàng)建clustering instant
// clustering instant的類型是replace commit,意味這clustering之后的數(shù)據(jù)文件替換掉先前的
HoodieInstant clusteringInstant =
new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime);
try {
HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
.setOperationType(WriteOperationType.CLUSTER.name())
.setExtraMetadata(extraMetadata.orElse(Collections.emptyMap()))
.setClusteringPlan(planOption.get())
.build();
// 添加到pending commit中
table.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant,
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
} catch (IOException ioe) {
throw new HoodieIOException("Exception scheduling clustering", ioe);
}
}
return planOption;
}
}
該方法創(chuàng)建clustering執(zhí)行計(jì)劃,然后再創(chuàng)建一個(gè)pending replace commit。因?yàn)閏lutering完成之后,新生成的數(shù)據(jù)文件會替換掉原有的數(shù)據(jù)文件,因此對應(yīng)的commit類型為replace。
繼續(xù)分析createClusteringPlan方法。其中首先判斷是否滿足可執(zhí)行clustering的條件,然后獲取配置的clustering策略類,創(chuàng)建clustering計(jì)劃。
Clustering并不是說每次schedule都必須要執(zhí)行。為了效率clustering要求至少要經(jīng)過N次commit之后,才會schedule。此限制通過配置項(xiàng)hoodie.clustering.inline.max.commits或hoodie.clustering.async.max.commits(分別對應(yīng)inline和異步)來控制。如果滿足clustering條件,通過hoodie.clustering.plan.strategy.class配置的策略類生成執(zhí)行計(jì)劃。
代碼如下所示:
protected Option<HoodieClusteringPlan> createClusteringPlan() {
LOG.info("Checking if clustering needs to be run on " + config.getBasePath());
// 獲取上一次clustering對應(yīng)的instant
Option<HoodieInstant> lastClusteringInstant = table.getActiveTimeline().getLastClusterCommit();
// 獲取上次clustering之后提交的次數(shù)
int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE)
.countInstants();
// 讀取hoodie.clustering.inline.max.commits配置,默認(rèn)為4
// 該配置項(xiàng)表示在上次clustering之后至少需要經(jīng)歷幾次commit才能schedule下一次clustering
// 這里處理inline clustering的配置
if (config.inlineClusteringEnabled() && config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {
LOG.warn("Not scheduling inline clustering as only " + commitsSinceLastClustering
+ " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "
+ config.getInlineClusterMaxCommits());
return Option.empty();
}
// 同上,但這里處理異步clustering的配置
// 配置項(xiàng)為hoodie.clustering.async.max.commits,默認(rèn)值4
if ((config.isAsyncClusteringEnabled() || config.scheduleInlineClustering()) && config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) {
LOG.warn("Not scheduling async clustering as only " + commitsSinceLastClustering
+ " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "
+ config.getAsyncClusterMaxCommits());
return Option.empty();
}
LOG.info("Generating clustering plan for table " + config.getBasePath());
// 加載clustering策略類,對應(yīng)配置項(xiàng)hoodie.clustering.plan.strategy.class
// 默認(rèn)為SparkSizeBasedClusteringPlanStrategy
ClusteringPlanStrategy strategy = (ClusteringPlanStrategy) ReflectionUtils.loadClass(
ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config);
// 生成clustering計(jì)劃
return strategy.generateClusteringPlan();
}
接著我們聚焦默認(rèn)的策略SparkSizeBasedClusteringPlanStrategy。該策略根據(jù)文件大小來決定文件數(shù)據(jù)是否參與clustering。分析clustering計(jì)劃生成步驟。
generateClusteringPlan方法位于SparkSizeBasedClusteringPlanStrategy的父類PartitionAwareClusteringPlanStrategy中。該方法根據(jù)hoodie.clustering.plan.strategy.partition.selected,hoodie.clustering.plan.strategy.partition.regex.pattern和hoodie.clustering.plan.partition.filter.mode條件過濾出符合要求的partition path。獲取它們包含的file slice。從這些file slice中篩選出小文件(小于hoodie.clustering.plan.strategy.small.file.limit的文件)。將這些按照clutering要求的group大小(hoodie.clustering.plan.strategy.max.bytes.per.group),分成若干個(gè)group。Group數(shù)量上限為hoodie.clustering.plan.strategy.max.num.groups。此步驟對應(yīng)小文件合并功能。
代碼如下所示:
@Override
public Option<HoodieClusteringPlan> generateClusteringPlan() {
if (!checkPrecondition()) {
return Option.empty();
}
// 獲取metaclient,用來操作metadata
HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
LOG.info("Scheduling clustering for " + metaClient.getBasePath());
// 獲取寫配置
HoodieWriteConfig config = getWriteConfig();
// 讀取配置項(xiàng)hoodie.clustering.plan.strategy.partition.selected
// 確定在哪些分區(qū)運(yùn)行clustering
String partitionSelected = config.getClusteringPartitionSelected();
LOG.info("Scheduling clustering partitionSelected: " + partitionSelected);
List<String> partitionPaths;
// 如果沒有配置
if (StringUtils.isNullOrEmpty(partitionSelected)) {
// get matched partitions if set
// 讀取hoodie.clustering.plan.strategy.partition.regex.pattern配置
// 獲取正則表達(dá)式匹配的partition path
partitionPaths = getRegexPatternMatchedPartitions(config, FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), metaClient.getBasePath()));
// filter the partition paths if needed to reduce list status
} else {
// 如果配置了partitionSelected,優(yōu)先這個(gè)配置
partitionPaths = Arrays.asList(partitionSelected.split(","));
}
// 過濾需要clustering的分區(qū)
// 過濾策略對應(yīng)配置項(xiàng)hoodie.clustering.plan.partition.filter.mode
// 可用策略為NONE,RECENT_DAYS,SELECTED_PARTITIONS和DAY_ROLLING
partitionPaths = filterPartitionPaths(partitionPaths);
LOG.info("Scheduling clustering partitionPaths: " + partitionPaths);
// 如果所有的分區(qū)都被排除了,返回空
if (partitionPaths.isEmpty()) {
// In case no partitions could be picked, return no clustering plan
return Option.empty();
}
// 排除掉分區(qū)中已經(jīng)要做clustering的file group(pending狀態(tài))
// 篩選出小文件
// 決定小文件判斷閾值的配置項(xiàng)為hoodie.clustering.plan.strategy.small.file.limit
// 將其映射為HoodieClusteringGroup
// 映射邏輯后面分析
List<HoodieClusteringGroup> clusteringGroups = getEngineContext()
.flatMap(
partitionPaths,
partitionPath -> {
List<FileSlice> fileSlicesEligible = getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList());
return buildClusteringGroupsForPartition(partitionPath, fileSlicesEligible).limit(getWriteConfig().getClusteringMaxNumGroups());
},
partitionPaths.size())
.stream()
.limit(getWriteConfig().getClusteringMaxNumGroups())
.collect(Collectors.toList());
if (clusteringGroups.isEmpty()) {
LOG.warn("No data available to cluster");
return Option.empty();
}
// 構(gòu)造cluster策略
HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder()
.setStrategyClassName(getWriteConfig().getClusteringExecutionStrategyClass())
.setStrategyParams(getStrategyParams())
.build();
// 構(gòu)造clustering計(jì)劃
return Option.of(HoodieClusteringPlan.newBuilder()
.setStrategy(strategy)
.setInputGroups(clusteringGroups)
.setExtraMetadata(getExtraMetadata())
.setVersion(getPlanVersion())
.setPreserveHoodieMetadata(true)
.build());
}
上面的filterPartitionPaths通過配置的hoodie.clustering.plan.partition.filter.mode過濾出所需的partition。具有有如下選項(xiàng):
- NONE: 不過濾,返回所有partition path。
- RECENT_DAYS: 按照partition path倒序排序。跳過
hoodie.clustering.plan.strategy.daybased.skipfromlatest.partitions個(gè)partition,返回hoodie.clustering.plan.strategy.daybased.lookback.partitions個(gè)partition。如果partition path是日期,可以實(shí)現(xiàn)過濾出最近N天的數(shù)據(jù)。 - SELECTED_PARTITIONS: 獲取
hoodie.clustering.plan.strategy.cluster.begin.partition和hoodie.clustering.plan.strategy.cluster.end.partition之間的分區(qū)。 - DAY_ROLLING: 每次clustering一部分分區(qū)。如果分區(qū)的index對24取余等于排期時(shí)候當(dāng)前時(shí)間的小時(shí)數(shù),則該分區(qū)需要clustering。
buildClusteringGroupsForPartition方法將篩選出的file slice按照從小到大排序。然后按照clustering配置的group size和group數(shù)量條件,合并為clustering group。
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
// 獲取寫入配置
HoodieWriteConfig writeConfig = getWriteConfig();
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
List<FileSlice> currentGroup = new ArrayList<>();
// Sort fileSlices before dividing, which makes dividing more compact
// file slice按照base file大小排序,如果文件不存在,按照最大大小排序
List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
sortedFileSlices.sort((o1, o2) -> (int)
((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())
- (o1.getBaseFile().isPresent() ? o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
long totalSizeSoFar = 0;
for (FileSlice currentSlice : sortedFileSlices) {
// 遍歷所有file slice
// 獲取當(dāng)前file slice的大小,如果文件不存在,獲取大小上限
long currentSize = currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
// check if max size is reached and create new group, if needed.
// 如果本次累積的文件大小大于hoodie.clustering.plan.strategy.max.bytes.per.group
// 并且當(dāng)前group不為空
if (totalSizeSoFar + currentSize > writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
// totalSizeSoFar除以hoodie.clustering.plan.strategy.target.file.max.bytes向上取整
// 計(jì)算出輸出組編號
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "
+ writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
// 加入到fileSliceGroups集合中,保存結(jié)果
// 保存了輸出組組和輸出組編號
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
// 結(jié)果保存之后,清零currentGroup和totalSizeSoFar
currentGroup = new ArrayList<>();
totalSizeSoFar = 0;
// if fileSliceGroups's size reach the max group, stop loop
// 檢查file group個(gè)數(shù)是否超過了hoodie.clustering.plan.strategy.max.num.groups
// 超過的話退出循環(huán),本次不再處理后面的file slice
if (fileSliceGroups.size() >= writeConfig.getClusteringMaxNumGroups()) {
LOG.info("Having generated the maximum number of groups : " + writeConfig.getClusteringMaxNumGroups());
break;
}
}
// Add to the current file-group
// 加入到當(dāng)前文件組
currentGroup.add(currentSlice);
// assume each file group size is ~= parquet.max.file.size
// 累積大小到totalSizeSoFar變量
totalSizeSoFar += currentSize;
}
if (!currentGroup.isEmpty()) {
// 處理最后一個(gè)output group
// shouldClusteringSingleGroup在下面兩個(gè)配置項(xiàng)任意一個(gè)啟用的時(shí)候?yàn)閠rue
// 表示只有一個(gè)輸出文件組的話,也clustering
// hoodie.clustering.plan.strategy.sort.columns
// hoodie.clustering.plan.strategy.single.group.clustering.enabled
if (currentGroup.size() > 1 || writeConfig.shouldClusteringSingleGroup()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: "
+ writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
}
}
// 構(gòu)造并返回fileSliceGroups
return fileSliceGroups.stream().map(fileSliceGroup ->
HoodieClusteringGroup.newBuilder()
.setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
.setNumOutputFileGroups(fileSliceGroup.getRight())
.setMetrics(buildMetrics(fileSliceGroup.getLeft()))
.build());
}
到此為止clustering計(jì)劃生成部分分析完畢。
根據(jù)執(zhí)行計(jì)劃執(zhí)行clustering
Clustering的執(zhí)行開始于BaseHoodieWriteClient::cluster。
在clustering之前,首先執(zhí)行preWrite操作。
public HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete) {
// 創(chuàng)建hudi table,根據(jù)引擎(Spark/Flink)和表類型(MOR/COW)的不同,有多種實(shí)現(xiàn)類
HoodieTable table = createTable(config, context.getHadoopConf().get());
// 執(zhí)行寫入前操作,包含:
// inflight和requested instant去掉本次instant
// 啟動clean和archive服務(wù)(如果開啟的話)
preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient());
// 執(zhí)行clutering
return tableServiceClient.cluster(clusteringInstant, shouldComplete);
}
接著是BaseHoodieTableServiceClient::cluster方法。該方法檢測當(dāng)前clustering是否已經(jīng)pending,配置監(jiān)控,執(zhí)行clustering并返回clustering執(zhí)行結(jié)果元數(shù)據(jù)。
public HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete) {
// 同上個(gè)方法,獲取table
HoodieTable<?, I, ?, T> table = createTable(config, context.getHadoopConf().get());
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
// 檢查本次cluster是否已經(jīng)pending狀態(tài)。如果是,需要回滾
if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
table.getMetaClient().reloadActiveTimeline();
}
// cluster時(shí)長計(jì)時(shí)器監(jiān)控
clusteringTimer = metrics.getClusteringCtx();
LOG.info("Starting clustering at " + clusteringInstant);
// 調(diào)用table的cluster服務(wù)
HoodieWriteMetadata<T> writeMetadata = table.cluster(context, clusteringInstant);
// 轉(zhuǎn)換metadata到對應(yīng)計(jì)算引擎格式
HoodieWriteMetadata<O> clusteringMetadata = convertToOutputMetadata(writeMetadata);
// Validation has to be done after cloning. if not, it could result in referencing the write status twice which means clustering could get executed twice.
// 檢查cluster寫入狀態(tài)不能為空
validateClusteringCommit(clusteringMetadata, clusteringInstant, table);
// Publish file creation metrics for clustering.
// 讀取并返回監(jiān)控信息
if (config.isMetricsOn()) {
clusteringMetadata.getWriteStats()
.ifPresent(hoodieWriteStats -> hoodieWriteStats.stream()
.filter(hoodieWriteStat -> hoodieWriteStat.getRuntimeStats() != null)
.map(hoodieWriteStat -> hoodieWriteStat.getRuntimeStats().getTotalCreateTime())
.forEach(metrics::updateClusteringFileCreationMetrics));
}
// TODO : Where is shouldComplete used ?
if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
completeClustering((HoodieReplaceCommitMetadata) clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant, Option.ofNullable(convertToWriteStatus(writeMetadata)));
}
return clusteringMetadata;
}
以Spark為例,我們查看COW表的HoodieSparkCopyOnWriteTable::cluster邏輯。
public HoodieWriteMetadata<HoodieData<WriteStatus>> cluster(HoodieEngineContext context,
String clusteringInstantTime) {
return new SparkExecuteClusteringCommitActionExecutor<>(context, config, this, clusteringInstantTime).execute();
}
此邏輯交由SparkExecuteClusteringCommitActionExecutor執(zhí)行。繼續(xù)分析SparkExecuteClusteringCommitActionExecutor::execute方法,它調(diào)用了BaseCommitActionExecutor::executeClustering方法。
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
return executeClustering(clusteringPlan);
}
BaseCommitActionExecutor::executeClustering該方法反射加載hoodie.clustering.execution.strategy.class配置項(xiàng)對應(yīng)的clustering策略(默認(rèn)為SparkSortAndSizeExecutionStrategy),然后執(zhí)行clustering。
protected HoodieWriteMetadata<HoodieData<WriteStatus>> executeClustering(HoodieClusteringPlan clusteringPlan) {
// 創(chuàng)建instant
HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
// Mark instant as clustering inflight
// 標(biāo)記instant為inflight狀態(tài)
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
table.getMetaClient().reloadActiveTimeline();
// Disable auto commit. Strategy is only expected to write data in new files.
// 禁用自動commit
config.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE, Boolean.FALSE.toString());
// 添加_hoodie_commit_time等5個(gè)元數(shù)據(jù)字段到schema中
final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
// 加載hoodie.clustering.execution.strategy.class配置項(xiàng)對應(yīng)的clustering策略類
// 執(zhí)行它的performClustering方法
// 對于默認(rèn)的配置,clustering策略類為SparkSortAndSizeExecutionStrategy
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = (
(ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>>)
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config))
.performClustering(clusteringPlan, schema, instantTime);
// 獲取寫入狀態(tài)
HoodieData<WriteStatus> writeStatusList = writeMetadata.getWriteStatuses();
// 更新表索引,更新數(shù)據(jù)所在位置
HoodieData<WriteStatus> statuses = updateIndex(writeStatusList, writeMetadata);
// 持久化保存
statuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE), context, HoodieData.HoodieDataCacheKey.of(config.getBasePath(), instantTime));
// triggers clustering.
// 更新writeMetadata中的writestats
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList());
// 獲取clustering操作的數(shù)據(jù)文件file id和partition path
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata));
// 提交修改的writeMetadata,clustering對后續(xù)操作生效
commitOnAutoCommit(writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
writeMetadata.setCommitMetadata(Option.of(commitMetadata));
}
return writeMetadata;
}
Clustering的執(zhí)行細(xì)節(jié)位于策略類中。我們這里分析默認(rèn)的策略類SparkSortAndSizeExecutionStrategy::performClustering方法。該方法位于父類MultipleSparkJobExecutionStrategy::performClustering中。該方法使用線程池,一個(gè)線程處理一個(gè)input group(對應(yīng)執(zhí)行計(jì)劃中提到的clustering group),但線程數(shù)不能超過配置的最大值。
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) {
JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext());
// 是否保留元數(shù)據(jù),默認(rèn)為true
boolean shouldPreserveMetadata = Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(true);
// 使用專門的線程執(zhí)行clustering。創(chuàng)建clustering線程池
// 取InputGroups數(shù)量(plan中clustering生成file group的數(shù)量)
// 最大值為hoodie.clustering.max.parallelism,最大值默認(rèn)15
ExecutorService clusteringExecutorService = Executors.newFixedThreadPool(
Math.min(clusteringPlan.getInputGroups().size(), writeConfig.getClusteringMaxParallelism()),
new CustomizedThreadFactory("clustering-job-group", true));
try {
// execute clustering for each group async and collect WriteStatus
// 在線程池中執(zhí)行clustering,獲取執(zhí)行結(jié)果
Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
clusteringPlan.getInputGroups().stream()
.map(inputGroup -> {
// hoodie.datasource.write.row.writer.enable如果為true,使用Spark原生的Row類型,避免類型轉(zhuǎn)換引發(fā)的額外代價(jià)
if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", true)) {
return runClusteringForGroupAsyncAsRow(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime,
clusteringExecutorService);
}
return runClusteringForGroupAsync(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime,
clusteringExecutorService);
})
.collect(Collectors.toList()))
.join()
.stream();
JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD));
return writeMetadata;
} finally {
clusteringExecutorService.shutdown();
}
}
我們繼續(xù)分析默認(rèn)配置的執(zhí)行路線ClusteringPlanActionExecutor::runClusteringForGroupAsyncAsRow。該方法獲取到所有需要clustering的數(shù)據(jù)到Spark的dataset,讀取表schema和各個(gè)file id從屬的partition path的對應(yīng)關(guān)系。然后執(zhí)行clustering。
private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsyncAsRow(HoodieClusteringGroup clusteringGroup,
Map<String, String> strategyParams,
boolean shouldPreserveHoodieMetadata,
String instantTime,
ExecutorService clusteringExecutorService) {
return CompletableFuture.supplyAsync(() -> {
// 獲取spark context
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
// 轉(zhuǎn)換所有clustering涉及到的數(shù)據(jù)為Spark DataSet
Dataset<Row> inputRecords = readRecordsForGroupAsRow(jsc, clusteringGroup, instantTime);
// 獲取帶有元數(shù)據(jù)字段的schema
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
// 轉(zhuǎn)換clustering的file slice為HoodieFileGroupId
// 保存的是partition path和file id的對應(yīng)關(guān)系
List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
.map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId()))
.collect(Collectors.toList());
// 執(zhí)行clustering
return performClusteringWithRecordsAsRow(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, shouldPreserveHoodieMetadata,
clusteringGroup.getExtraMetadata());
}, clusteringExecutorService);
}
SparkSortAndSizeExecutionStrategy::performClusteringWithRecordsAsRow方法獲取分區(qū)器,將數(shù)據(jù)重新排序,最后使用批量插入的方式,寫回parquet文件。
@Override
public HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> inputRecords,
int numOutputGroups,
String instantTime, Map<String, String> strategyParams,
Schema schema,
List<HoodieFileGroupId> fileGroupIdList,
boolean shouldPreserveHoodieMetadata,
Map<String, String> extraMetadata) {
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
// 生成寫入配置,clustering輸出多少個(gè)file group就配置多少個(gè)bulk insert并行度
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
.withBulkInsertParallelism(numOutputGroups)
.withProps(getWriteConfig().getProps()).build();
// 配置最大parquet文件大小為clustering目標(biāo)文件最大上限
// 對應(yīng)配置項(xiàng)為hoodie.clustering.plan.strategy.target.file.max.bytes
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
// 獲取分區(qū)器
BulkInsertPartitioner<Dataset<Row>> partitioner = getRowPartitioner(strategyParams, schema);
// 使用分區(qū)器分區(qū)數(shù)據(jù)(數(shù)據(jù)重新排序)
Dataset<Row> repartitionedRecords = partitioner.repartitionRecords(inputRecords, numOutputGroups);
// 將重排序之后的數(shù)據(jù)批量插入
return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, instantTime, getHoodieTable(), newConfig,
partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata);
}
接下來分析的重點(diǎn)是clustering的另一個(gè)功能:將數(shù)據(jù)重排序。因此重點(diǎn)是分區(qū)器和分區(qū)器重排序的邏輯。獲取分區(qū)器的邏輯位于它的父類MultipleSparkJobExecutionStrategy::getRowPartitioner中。代碼如下:
private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategyParams,
Schema schema,
boolean isRowPartitioner) {
// 獲取排序字段配置項(xiàng)
// 對應(yīng)的配置項(xiàng)為hoodie.clustering.plan.strategy.sort.columns
// 使用逗號分隔
Option<String[]> orderByColumnsOpt =
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
.map(listStr -> listStr.split(","));
return orderByColumnsOpt.map(orderByColumns -> {
// 獲取hoodie.layout.optimize.strategy配置,字段可使用zorder或者h(yuǎn)ilbert曲線排序或者linear線性排序
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = getWriteConfig().getLayoutOptimizationStrategy();
switch (layoutOptStrategy) {
case ZORDER:
case HILBERT:
return isRowPartitioner
? new RowSpatialCurveSortPartitioner(getWriteConfig())
: new RDDSpatialCurveSortPartitioner((HoodieSparkEngineContext) getEngineContext(), orderByColumns, layoutOptStrategy,
getWriteConfig().getLayoutOptimizationCurveBuildMethod(), HoodieAvroUtils.addMetadataFields(schema), recordType);
case LINEAR:
return isRowPartitioner
? new RowCustomColumnsSortPartitioner(orderByColumns, getWriteConfig())
: new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieAvroUtils.addMetadataFields(schema), getWriteConfig());
default:
throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy));
}
}).orElseGet(() -> isRowPartitioner
? BulkInsertInternalPartitionerWithRowsFactory.get(getWriteConfig(), getHoodieTable().isPartitioned(), true)
: BulkInsertInternalPartitionerFactory.get(getHoodieTable(), getWriteConfig(), true));
}
對于使用Spark原生Row類型的情況,isRowPartitioner為true。如果使用ZORDER或者HILBERT排序策略,使用RowSpatialCurveSortPartitioner,LINEAR排序策略對應(yīng)的是RowCustomColumnsSortPartitioner。
接下來我們分別分析這兩個(gè)partitioner是如何對數(shù)據(jù)重排序的。
首先是RowSpatialCurveSortPartitioner::repartitionRecords,代碼如下:
@Override
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputPartitions) {
return reorder(records, outputPartitions);
}
repartitionRecords調(diào)用了reorder方法。
protected Dataset<Row> reorder(Dataset<Row> dataset, int numOutputGroups) {
// 檢查排序字段配置
if (orderByColumns.length == 0) {
// No-op
return dataset;
}
List<String> orderedCols = Arrays.asList(orderByColumns);
// curveCompositionStrategyType默認(rèn)為DIRECT
switch (curveCompositionStrategyType) {
case DIRECT:
return SpaceCurveSortingHelper.orderDataFrameByMappingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);
case SAMPLE:
return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);
default:
throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveCompositionStrategyType));
}
}
SpatialCurveCompositionStrategyType中SAMPLE的數(shù)據(jù)排序分布效果較DIRECT更好,但是執(zhí)行速度更慢。默認(rèn)配置的是DIRECT類型。
接下來分析DIRECT類型處理方式,對應(yīng)的是SpaceCurveSortingHelper::orderDataFrameByMappingValues。該方法首先判斷排序字段配置的合法性。然后將數(shù)據(jù)按照排序字段,使用Z曲線或者是Hilbert曲線重排序。
public static Dataset<Row> orderDataFrameByMappingValues(
Dataset<Row> df,
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
List<String> orderByCols,
int targetPartitionCount
) {
// 獲取字段名稱和StructField的對應(yīng)關(guān)系
Map<String, StructField> columnsMap =
Arrays.stream(df.schema().fields())
.collect(Collectors.toMap(StructField::name, Function.identity()));
// 檢查排序字段是否出現(xiàn)在schema中
List<String> checkCols =
orderByCols.stream()
.filter(columnsMap::containsKey)
.collect(Collectors.toList());
// 如果沒有,說明排序字段配置有誤,跳過不再繼續(xù)執(zhí)行
if (orderByCols.size() != checkCols.size()) {
LOG.error(String.format("Trying to ordering over a column(s) not present in the schema (%s); skipping", CollectionUtils.diff(orderByCols, checkCols)));
return df;
}
// In case when there's just one column to be ordered by, we can skip space-curve
// ordering altogether (since it will match linear ordering anyway)
// 如果排序字段只有一個(gè),沒必要使用空間曲線方式排序,直接使用Spark排序
if (orderByCols.size() == 1) {
String orderByColName = orderByCols.get(0);
LOG.debug(String.format("Single column to order by (%s), skipping space-curve ordering", orderByColName));
// TODO validate if we need Spark to re-partition
return df.repartitionByRange(targetPartitionCount, new Column(orderByColName));
}
// 字段個(gè)數(shù)
int fieldNum = df.schema().fields().length;
// 返回排序字段對應(yīng)的index和字段信息對應(yīng)關(guān)系
Map<Integer, StructField> fieldMap =
orderByCols.stream()
.collect(
Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), columnsMap::get));
JavaRDD<Row> sortedRDD;
// 根據(jù)布局優(yōu)化策略,排序RDD
switch (layoutOptStrategy) {
case ZORDER:
sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount);
break;
case HILBERT:
sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount);
break;
default:
throw new UnsupportedOperationException(String.format("Not supported layout-optimization strategy (%s)", layoutOptStrategy));
}
// Compose new {@code StructType} for ordered RDDs
// 為排序后的RDD創(chuàng)建StructType(schema)
StructType newStructType = composeOrderedRDDStructType(df.schema());
// 返回dataset
return df.sparkSession()
.createDataFrame(sortedRDD, newStructType)
.drop("Index");
}
我們先看第一種情況,Z曲線排序。方法位于SpaceCurveSortingHelper::createZCurveSortedRDD。
該方法將多個(gè)排序字段的值映射為8字節(jié)內(nèi)容(多的截取少的補(bǔ)充),然后每個(gè)字段的字節(jié)內(nèi)容各取一位拼接到一起,然后再各取第二位拼接……一直循環(huán),這個(gè)步驟稱之為二進(jìn)制數(shù)據(jù)交織(interleaving)。將交織之后的值作為一個(gè)字段,拼接在數(shù)據(jù)中。然后按照該字段的內(nèi)容排序。
private static JavaRDD<Row> createZCurveSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
return originRDD.map(row -> {
// 將數(shù)據(jù)中每個(gè)排序字段的值填充為8字節(jié)內(nèi)容
// 多的截取少的補(bǔ)充
byte[][] zBytes = fieldMap.entrySet().stream()
.map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
return mapColumnValueTo8Bytes(row, index, field.dataType());
})
.toArray(byte[][]::new);
// Interleave received bytes to produce Z-curve ordinal
// 將這些排序字段的值交織起來
// 比如有A,B兩個(gè)排序字段。A字段值取1位,然后取B字段值1位,然后A再取下一位,B取下一位,以此類推
byte[] zOrdinalBytes = BinaryUtil.interleaving(zBytes, 8);
// 追加zOrdinalBytes到Row
return appendToRow(row, zOrdinalBytes);
})
// 按照該字段的值(zOrdinalBytes,位于row的末尾,index正好是fieldNum)排序
.sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);
}
第二種情況為Hilbert曲線,對應(yīng)方法為SpaceCurveSortingHelper::createHilbertSortedRDD。和ZOrder曲線排序處理邏輯基本相同,只是將Z曲線替換成了Hilbert曲線。
private static JavaRDD<Row> createHilbertSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
// NOTE: Here {@code mapPartitions} is used to make sure Hilbert curve instance is initialized
// only once per partition
return originRDD.mapPartitions(rows -> {
// 創(chuàng)建hilbert fieldMap個(gè)數(shù)維度曲線
HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size());
return new Iterator<Row>() {
@Override
public boolean hasNext() {
return rows.hasNext();
}
@Override
public Row next() {
Row row = rows.next();
// 將row中的排序字段值映射為long類型
long[] longs = fieldMap.entrySet().stream()
.mapToLong(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
return mapColumnValueToLong(row, index, field.dataType());
})
.toArray();
// Map N-dimensional coordinates into position on the Hilbert curve
// 使用hilbert曲線索引上面的long值,結(jié)果作為后面的排序依據(jù)
byte[] hilbertCurvePosBytes = HilbertCurveUtils.indexBytes(hilbertCurve, longs, 63);
return appendToRow(row, hilbertCurvePosBytes);
}
};
})
.sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);
}
和空間曲線的方式相比,LINEAR線性排序顯得較為簡單。代碼位于RowCustomColumnsSortPartitioner::repartitionRecords
。通過spark的sort算子按照配置的column排序。
@Override
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
return records
.sort(Arrays.stream(sortColumnNames).map(Column::new).toArray(Column[]::new))
.coalesce(outputSparkPartitions);
}
到這里為止分區(qū)器的邏輯分析完畢。