前言
Hudi Flink支持配置table service的異步執(zhí)行。Schedule的時機為checkpoint完成的時候。執(zhí)行過程在線程池中完成。Flink Hudi 常用的table service有compaction,clustering和clean三種。它們對應(yīng)的配置項為:
- clustering.async.enabled:是否開啟異步的clustering。默認不開啟。
- compaction.async.enabled:是否開啟異步compaction。默認開啟。
- clean.async.enabled:是否開啟異步clean。默認開啟。
本篇主要分析Flink中Hudi table service的排期和執(zhí)行時機。至于compaction clustering和clean表服務(wù)具體的執(zhí)行邏輯,參見:
- Hudi Compaction使用和源碼分析 - 簡書 (jianshu.com)
- Hudi 源碼之 Clustering - 簡書 (jianshu.com)
- Hudi 源碼之 Cleaning service - 簡書 (jianshu.com)
Scheduling排期
Hudi Flink table service的排期主要位于如下兩個方法中:
- StreamWriteOperatorCoordinator::handleInputEvent: batch模式需要schedule的table service。
- StreamWriteOperatorCoordinator::notifyCheckpointComplete: streaming模式需要schedule的table service
接下來我們分別分析這兩個方法。
handleEndInputWvent方法:
private void handleEndInputEvent(WriteMetadataEvent event) {
addEventToBuffer(event);
// 如果已經(jīng)接收到所有數(shù)據(jù)
if (allEventsReceived()) {
// start to commit the instant.
// 提交
// 如果數(shù)據(jù)成功寫入,返回true,表示提交成功
boolean committed = commitInstant(this.instant);
if (committed) {
// The executor thread inherits the classloader of the #handleEventFromOperator
// caller, which is a AppClassLoader. Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
// sync Hive synchronously if it is enabled in batch mode.
// 如果開啟了hive sync,執(zhí)行
syncHive();
// schedules the compaction or clustering if it is enabled in batch execution mode
// 表服務(wù)排期
scheduleTableServices(true);
}
}
}
notifyCheckpointComplete方法。在checkpoint執(zhí)行成功的時候執(zhí)行回調(diào)。
@Override
public void notifyCheckpointComplete(long checkpointId) {
executor.execute(
() -> {
// The executor thread inherits the classloader of the #notifyCheckpointComplete
// caller, which is a AppClassLoader.
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
// for streaming mode, commits the ever received events anyway,
// the stream write task snapshot and flush the data buffer synchronously in sequence,
// so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
// 如果數(shù)據(jù)成功寫入,返回true,表示提交成功
final boolean committed = commitInstant(this.instant, checkpointId);
// schedules the compaction or clustering if it is enabled in stream execution mode
// 排期表服務(wù)
scheduleTableServices(committed);
if (committed) {
// start new instant.
// 寫入instant
startInstant();
// sync Hive if is enabled
// 如果開啟了hive sync,執(zhí)行
syncHiveAsync();
}
}, "commits the instant %s", this.instant
);
}
scheduleTableServices方法:
private void scheduleTableServices(Boolean committed) {
// if compaction is on, schedule the compaction
// 如果是MOR表,并且開啟了compaction.schedule.enabled配置(默認開啟)
if (tableState.scheduleCompaction) {
CompactionUtil.scheduleCompaction(writeClient, tableState.isDeltaTimeCompaction, committed);
}
// if clustering is on, schedule the clustering
// clustering.schedule.enabled如果開啟(默認不開啟)
// 對于bucket index表,如果配置的是consistent hash(一致性hash),要求寫入類型必須是upsert
// 否則(SIMPLE類型)要求寫入類型必須是insert
if (tableState.scheduleClustering) {
ClusteringUtil.scheduleClustering(conf, writeClient, committed);
}
}
Executing執(zhí)行
HoodieTableSink
Hudi Flink創(chuàng)建table service異步任務(wù)流位于HoodieTableSink::getSinkRuntimeProvider。
// ...
// Append mode
// 如果是增量寫入模式
if (OptionsResolver.isAppendMode(conf)) {
// close compaction for append mode
// 關(guān)閉compaction schedule模式
conf.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);
// append 模式寫入數(shù)據(jù)
DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream);
// 如果需要異步clustering
// write.operation為insert并且啟用了異步clustering(clustering.async.enabled為true)
if (OptionsResolver.needsAsyncClustering(conf)) {
// 執(zhí)行clustering
return Pipelines.cluster(conf, rowType, pipeline);
// 如果hoodie.cleaner.policy.failed.writes配置為lazy
} else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {
// add clean function to rollback failed writes for lazy failed writes cleaning policy
// 執(zhí)行清理
return Pipelines.clean(conf, pipeline);
} else {
// 否則什么也不做
return Pipelines.dummySink(pipeline);
}
}
DataStream<Object> pipeline;
// bootstrap加載索引
final DataStream<HoodieRecord> hoodieRecordDataStream =
Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);
// write pipeline
// 流式寫入
pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);
// compaction
// 是否需要異步壓縮
// compaction.async.enabled是否為true。默認為true
if (OptionsResolver.needsAsyncCompaction(conf)) {
// use synchronous compaction for bounded source.
// 如果是bounded數(shù)據(jù)源(有頭有尾),使用同步壓縮
if (context.isBounded()) {
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
}
// 執(zhí)行壓縮
return Pipelines.compact(conf, pipeline);
} else {
// 如果沒有配置壓縮,執(zhí)行清理
return Pipelines.clean(conf, pipeline);
}
// ...
通過上面的分析不難得知,F(xiàn)link中compact,clean和clustering表服務(wù)都在Pipeline中創(chuàng)建。接下來我們分析Pipeline的源代碼。
Pipelines
Pipeline創(chuàng)建了一條專用的數(shù)據(jù)流,這些數(shù)據(jù)流分別用來周期性創(chuàng)建compaction和clustering的執(zhí)行計劃,以及執(zhí)行compact,clean和clustering。它們獨立于系統(tǒng)的業(yè)務(wù)數(shù)據(jù)流。
Pipelines::compact
該方法用來啟動周期壓縮任務(wù)流。
public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {
// 使用CompactionPlanOperator下發(fā)compaction執(zhí)行計劃
DataStreamSink<CompactionCommitEvent> compactionCommitEventDataStream = dataStream.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
// plan生成過程必須是單并行度
.setParallelism(1) // plan generate must be singleton
.setMaxParallelism(1)
// make the distribution strategy deterministic to avoid concurrent modifications
// on the same bucket files .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
// 使用CompactorOperator執(zhí)行壓縮計劃
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new CompactOperator(conf))
// 并行度配置為compaction.tasks
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
// CompactionSommitSink檢查并提交compaction instant
.addSink(new CompactionCommitSink(conf))
.name("compact_commit")
// 執(zhí)行commit的并行度必須是1
.setParallelism(1); // compaction commit should be singleton
compactionCommitEventDataStream.getTransformation().setMaxParallelism(1);
return compactionCommitEventDataStream;
}
CompactionPlanOperator::notifyCheckpointComplete。在checkpoint的時候檢查是否生成的有requested狀態(tài)的compaction instant。如果有,生成CompactionPlanEvent發(fā)往下游。
SteamWriteOperatorCoordinator用來生成requested狀態(tài)的compaction instant,CompactionPlanOperator用來獲取到這些compaction instant,讀取保存的執(zhí)行計劃然后發(fā)往下游。
@Override
public void notifyCheckpointComplete(long checkpointId) {
try {
table.getMetaClient().reloadActiveTimeline();
// There is no good way to infer when the compaction task for an instant crushed
// or is still undergoing. So we use a configured timeout threshold to control the rollback: // {@code FlinkOptions.COMPACTION_TIMEOUT_SECONDS}, // when the earliest inflight instant has timed out, assumes it has failed // already and just rolls it back.
// comment out: do we really need the timeout rollback ? // CompactionUtil.rollbackEarliestCompaction(table, conf);
scheduleCompaction(table, checkpointId);
} catch (Throwable throwable) {
// make it fail-safe
LOG.error("Error while scheduling compaction plan for checkpoint: " + checkpointId, throwable);
}
}
scheduleCompaction方法讀取第一個狀態(tài)為reqested狀態(tài)的compaction instant,獲取到它的compaction plan,將該compaction涉及到的compaction plan中的compactionOperation(即Compaction操作涉及到的file group信息)包裝為CompactionPlanEvent發(fā)往下游。
private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
// 獲取包含所有pending compaction的timeline
HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();
// the first instant takes the highest priority.
// 找到時間最早的requested compaction instant
Option<HoodieInstant> firstRequested = pendingCompactionTimeline
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant();
// record metrics
compactionMetrics.setFirstPendingCompactionInstant(firstRequested);
compactionMetrics.setPendingCompactionCount(pendingCompactionTimeline.countInstants());
// 如果沒有requested狀態(tài)的compaction instant,說明沒有必要schedule
if (!firstRequested.isPresent()) {
// do nothing.
LOG.info("No compaction plan for checkpoint " + checkpointId);
return;
}
// 獲取這個requested compaction instant對應(yīng)的時間
String compactionInstantTime = firstRequested.get().getTimestamp();
// generate compaction plan
// should support configurable commit metadata
獲取compaction plan
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
table.getMetaClient(), compactionInstantTime);
// 如果沒有獲取到有效的compactio plan
if (compactionPlan == null || (compactionPlan.getOperations() == null)
|| (compactionPlan.getOperations().isEmpty())) {
// do nothing.
LOG.info("Empty compaction plan for instant " + compactionInstantTime);
} else {
// 獲取這個requested狀態(tài)的 compaction instant
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
// Mark instant as compaction inflight
// 將它的狀態(tài)修改為inflight
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
table.getMetaClient().reloadActiveTimeline();
// 獲取plan中所有的compaction operation,封裝為compactionPlanEvent發(fā)往下游
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("Execute compaction plan for instant {} as {} file groups", compactionInstantTime, operations.size());
// 刪除標(biāo)記文件
WriteMarkersFactory
.get(table.getConfig().getMarkersType(), table, compactionInstantTime)
.deleteMarkerDir(table.getContext(), table.getConfig().getMarkersDeleteParallelism());
// 每一個operation封裝為一個CompactionPlanEvent
// 這樣到下游的時候可以將這些壓縮任務(wù)均分
// 每個并行度處理一部分file group的壓縮
for (CompactionOperation operation : operations) {
output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation)));
}
}
}
CompactOperator的processElement方法接收上面生成的CompactionPlanEvent,執(zhí)行壓縮任務(wù)。
@Override
public void processElement(StreamRecord<CompactionPlanEvent> record) throws Exception {
final CompactionPlanEvent event = record.getValue();
// 獲取inflight compaction instant time
final String instantTime = event.getCompactionInstantTime();
// 獲取compaction operation
final CompactionOperation compactionOperation = event.getOperation();
// 如果是異步壓縮
// 在線程池中執(zhí)行壓縮,不會影響checkpoint過程
if (asyncCompaction) {
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
executor.execute(
() -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()),
(errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),
"Execute compaction for instant %s from task %d", instantTime, taskID);
} else {
// executes the compaction task synchronously for batch mode.
// 否則同步執(zhí)行compaction
LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID);
doCompaction(instantTime, compactionOperation, collector, writeClient.getConfig());
}
}
接下來的doCompaction方法間接調(diào)用了HoodieFlinkMergeOnReadTableCompactor的compact方法,前面的文章已有分析,這里不再贅述。
Pipelines::cluster
該方法啟動周期clustering任務(wù)流。
public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf, RowType rowType, DataStream<Object> dataStream) {
// 使用ClusteringPlanOperator下發(fā)clustering執(zhí)行計劃
DataStream<ClusteringCommitEvent> clusteringStream = dataStream.transform("cluster_plan_generate",
TypeInformation.of(ClusteringPlanEvent.class),
new ClusteringPlanOperator(conf))
// 下發(fā)執(zhí)行計劃的并行度必須是1
.setParallelism(1) // plan generate must be singleton
.setMaxParallelism(1) // plan generate must be singleton
.keyBy(plan ->
// make the distribution strategy deterministic to avoid concurrent modifications
// on the same bucket files
// 按照ClusteringPlanEvent的fileId分組
// 針對同一個file slice的clustering操作會分配給相同的線程執(zhí)行,防止并發(fā)修改
plan.getClusteringGroupInfo().getOperations() .stream().map(ClusteringOperation::getFileId.collect(Collectors.joining()))
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
// 通過ClusteringOperator執(zhí)行clustering
new ClusteringOperator(conf, rowType))
// clustering任務(wù)的并行度為clustering.tasks
.setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS));
// 如果啟用了排序,即clustering.plan.strategy.sort.columns配置項不為空
// 配置該步驟的執(zhí)行內(nèi)存,對應(yīng)配置項為write.sort.memory
if (OptionsResolver.sortClusteringEnabled(conf)) {
ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
// 檢查并提交clustering instant
DataStreamSink<ClusteringCommitEvent> clusteringCommitEventDataStream = clusteringStream.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.setParallelism(1); // clustering commit should be singleton
clusteringCommitEventDataStream.getTransformation().setMaxParallelism(1);
return clusteringCommitEventDataStream;
}
ClusteringPlanOperatorschedule clustering的過程和前面schedule compaction的非常相似。
private void scheduleClustering(HoodieFlinkTable<?> table, long checkpointId) {
// 獲取request狀態(tài)的clustering instant
List<HoodieInstant> pendingClusteringInstantTimes =
ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient());
// the first instant takes the highest priority.
// 獲取時間最早的一個
Option<HoodieInstant> firstRequested = Option.fromJavaOptional(
pendingClusteringInstantTimes.stream()
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).findFirst());
// record metrics
clusteringMetrics.setFirstPendingClusteringInstant(firstRequested);
clusteringMetrics.setPendingClusteringCount(pendingClusteringInstantTimes.size());
if (!firstRequested.isPresent()) {
// do nothing.
LOG.info("No clustering plan for checkpoint " + checkpointId);
return;
}
String clusteringInstantTime = firstRequested.get().getTimestamp();
// generate clustering plan
// should support configurable commit metadata HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);
// 拿到之前生成的clustering plan
Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(
table.getMetaClient(), clusteringInstant);
// 如果沒有獲取到有效的clustering plan,直接返回
if (!clusteringPlanOption.isPresent()) {
// do nothing.
LOG.info("No clustering plan scheduled");
return;
}
HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();
if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null)
|| (clusteringPlan.getInputGroups().isEmpty())) {
// do nothing.
LOG.info("Empty clustering plan for instant " + clusteringInstantTime);
} else {
// Mark instant as clustering inflight
table.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringInstant, Option.empty());
table.getMetaClient().reloadActiveTimeline();
// 遍歷所有的inputGroup,封裝為ClusteringPlanEvent
for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) {
LOG.info("Execute clustering plan for instant {} as {} file slices", clusteringInstantTime, clusteringGroup.getSlices().size());
output.collect(new StreamRecord<>(
new ClusteringPlanEvent(clusteringInstantTime, ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams())
));
}
}
}
Clustering對應(yīng)一個file slice的操作封裝為了HoodieClusteringGroup(和compatcion的CompactionOperation對應(yīng))。這里將同一個plan中所有的HoodieClusteringGroup,每一個封裝為ClusteringPlanEvent,目的是為了下游可以并行執(zhí)行clustering。
ClusteringOperator的processElement方法執(zhí)行clustering計劃。和compaction相同,分為同步執(zhí)行和異步執(zhí)行兩種方式。
@Override
public void processElement(StreamRecord<ClusteringPlanEvent> element) throws Exception {
final ClusteringPlanEvent event = element.getValue();
final String instantTime = event.getClusteringInstantTime();
final List<ClusteringOperation> clusteringOperations = event.getClusteringGroupInfo().getOperations();
if (this.asyncClustering) {
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
executor.execute(
() -> doClustering(instantTime, clusteringOperations),
(errMsg, t) -> collector.collect(new ClusteringCommitEvent(instantTime, getFileIds(clusteringOperations), taskID)),
"Execute clustering for instant %s from task %d", instantTime, taskID);
} else {
// executes the clustering task synchronously for batch mode.
LOG.info("Execute clustering for instant {} from task {}", instantTime, taskID);
doClustering(instantTime, clusteringOperations);
}
}
doClustering方法為clustering過程的純Flink實現(xiàn)。
private void doClustering(String instantTime, List<ClusteringOperation> clusteringOperations) throws Exception {
clusteringMetrics.startClustering();
// 采用bulk insert的方式寫入clustering之后的數(shù)據(jù)
BulkInsertWriterHelper writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
instantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),
this.rowType, true);
Iterator<RowData> iterator;
// 如果clustering操作涉及到log文件,使用readRecordsForGroupWithLogs
// 否則僅使用readRecordsForGroupBaseFiles
// 這兩個方法讀取file group中的數(shù)據(jù),以iterator的形式返回
if (clusteringOperations.stream().anyMatch(operation -> CollectionUtils.nonEmpty(operation.getDeltaFilePaths()))) {
// if there are log files, we read all records into memory for a file group and apply updates.
iterator = readRecordsForGroupWithLogs(clusteringOperations, instantTime);
} else {
// We want to optimize reading records for case there are no log files.
iterator = readRecordsForGroupBaseFiles(clusteringOperations);
}
// 如果配置了clustering.plan.strategy.sort.columns
// 說明需要排序
if (this.sortClusteringEnabled) {
RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType);
// 使用BinaryexternalSorter來排序
// BinaryExternalSorter根據(jù)clustering.plan.strategy.sort.columns
// 生成排序代碼
BinaryExternalSorter sorter = initSorter();
while (iterator.hasNext()) {
RowData rowData = iterator.next();
BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy();
sorter.write(binaryRowData);
}
// 使用bulk insert寫入排序之后的數(shù)據(jù)
BinaryRowData row = binarySerializer.createInstance();
while ((row = sorter.getIterator().next(row)) != null) {
writerHelper.write(row);
}
sorter.close();
} else {
while (iterator.hasNext()) {
writerHelper.write(iterator.next());
}
}
List<WriteStatus> writeStatuses = writerHelper.getWriteStatuses(this.taskID);
clusteringMetrics.endClustering();
collector.collect(new ClusteringCommitEvent(instantTime, getFileIds(clusteringOperations), writeStatuses, this.taskID));
writerHelper.close();
}
Pipelines::clean
clean方法的實現(xiàn)較為簡單。Flink為數(shù)據(jù)流增加了一個CleanFunction類型的sink,并行度為1。代碼如下所示。
public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object> dataStream) {
DataStreamSink<Object> cleanCommitDataStream = dataStream.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits");
cleanCommitDataStream.getTransformation().setMaxParallelism(1);
return cleanCommitDataStream;
我們繼續(xù)分析CleanFunction。如果啟用了異步clean,CleanFunction在啟動的時候(open)異步執(zhí)行一次。創(chuàng)建checkpoint的時候snapshotState啟動異步clean服務(wù)。在checkpoint完成的時候notifyCheckpointComplete等待clean錯操作執(zhí)行完畢。相關(guān)代碼如下所示。
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext());
this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();
String instantTime = writeClient.createNewInstantTime();
LOG.info(String.format("exec clean with instant time %s...", instantTime));
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {
executor.execute(() -> {
this.isCleaning = true;
try {
this.writeClient.clean(instantTime);
} finally {
this.isCleaning = false;
}
}, "wait for cleaning finish");
}
}
@Override
public void notifyCheckpointComplete(long l) throws Exception {
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) {
executor.execute(() -> {
try {
this.writeClient.waitForCleaningFinish();
} finally {
// ensure to switch the isCleaning flag
this.isCleaning = false;
}
}, "wait for cleaning finish");
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
try {
this.writeClient.startAsyncCleaning();
this.isCleaning = true;
} catch (Throwable throwable) {
// catch the exception to not affect the normal checkpointing
LOG.warn("Error while start async cleaning", throwable);
}
}
}
除了Pipeline::clean方法直接使用CleanFunction之外,我們還注意到ClusteringCommitSink和CompactionCommitSink都繼承了CleanFunction,都沒有重寫snapshotState和notifyCheckPointComplete方法。因此這兩個sink的行為和CleanFunction一致,在checkpoint的時候會觸發(fā)clean操作。
繼續(xù)觀察這兩個sink的doCommit方法,我們發(fā)現(xiàn)最后一段都有如下代碼:
// Whether to clean up the old log file when compaction
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {
this.writeClient.clean();
}
該段代碼表示,如果沒有啟用異步clean,且當(dāng)前時刻沒有clean沒有正在執(zhí)行,執(zhí)行同步clean操作。
接下來我們分析this.writeClient.startAsyncCleaning()調(diào)用,一路跟蹤下去。跟蹤過程中的非關(guān)鍵代碼這里不再展示。
- CleanFunction的this.writeClient.startAsyncCleaning()
- HoodieFlinkWriteClient的tableServiceClient.startAsyncCleanerService(this);
- BaseHoodieTableServiceClient的this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(writeClient);
分析到這里,我們確定了clean表服務(wù)是在AsyncCleanerService中封裝的。繼續(xù)分析startAsyncCleaningIfEnabled方法,該方法首先判斷是否開啟了異步clean配置,如果開啟了,創(chuàng)建一個異步clean服務(wù),代碼如下所示:
public static AsyncCleanerService startAsyncCleaningIfEnabled(BaseHoodieWriteClient writeClient) {
HoodieWriteConfig config = writeClient.getConfig();
// hoodie.clean.automatic和hoodie.clean.async這兩個任意一個配置為false,不運行該服務(wù)
if (!config.isAutoClean() || !config.isAsyncClean()) {
LOG.info("The HoodieWriteClient is not configured to auto & async clean. Async clean service will not start.");
return null;
}
// 創(chuàng)建并啟動AsyncCleanerService
AsyncCleanerService asyncCleanerService = new AsyncCleanerService(writeClient);
asyncCleanerService.start(null);
return asyncCleanerService;
}
我們繼續(xù)分析asyncCleanerService.start,它位于HoodieAsyncService的start方法中。
其中的startService方法將異步服務(wù)邏輯本身和運行異步服務(wù)的executor封裝為Pair返回。
public void start(Function<Boolean, Boolean> onShutdownCallback) {
if (started) {
LOG.warn("The async service already started.");
return;
}
Pair<CompletableFuture, ExecutorService> res = startService();
future = res.getKey();
executor = res.getValue();
started = true;
shutdownCallback(onShutdownCallback);
}
startService方法在實現(xiàn)類AsyncCleanerFunction中,如下所示。
@Override
protected Pair<CompletableFuture, ExecutorService> startService() {
String instantTime = writeClient.createNewInstantTime();
LOG.info(String.format("Starting async clean service with instant time %s...", instantTime));
//在線程池中啟動異步clean操作
return Pair.of(CompletableFuture.supplyAsync(() -> {
writeClient.clean(instantTime);
return true;
}, executor), executor);
}
調(diào)用完該方法之后,異步clean的邏輯封裝賦值到future變量中。
按照上面的分析notifyCheckpointComplete的時候執(zhí)行waitForCompletion方法。如果clean操作還沒有結(jié)束,這里阻塞等待其執(zhí)行完畢。
public static void waitForCompletion(AsyncCleanerService asyncCleanerService) {
if (asyncCleanerService != null) {
LOG.info("Waiting for async clean service to finish");
try {
asyncCleanerService.waitForShutdown();
} catch (Exception e) {
throw new HoodieException("Error waiting for async clean service to finish", e);
}
}
}
分析HoodieAsnycService的waitForshutdown方法,同步等待clean執(zhí)行完畢,內(nèi)容如下:
public void waitForShutdown() throws ExecutionException, InterruptedException {
if (future == null) {
return;
}
try {
future.get();
} catch (ExecutionException ex) {
LOG.error("Service shutdown with error", ex);
throw ex;
}
}
參考文獻
Compaction | Apache Hudi
Cleaning | Apache Hudi
Clustering | Apache Hudi