Hudi 源碼之 Flink Table Service 排期和執(zhí)行

前言

Hudi Flink支持配置table service的異步執(zhí)行。Schedule的時機為checkpoint完成的時候。執(zhí)行過程在線程池中完成。Flink Hudi 常用的table service有compaction,clustering和clean三種。它們對應(yīng)的配置項為:

  • clustering.async.enabled:是否開啟異步的clustering。默認不開啟。
  • compaction.async.enabled:是否開啟異步compaction。默認開啟。
  • clean.async.enabled:是否開啟異步clean。默認開啟。

本篇主要分析Flink中Hudi table service的排期和執(zhí)行時機。至于compaction clustering和clean表服務(wù)具體的執(zhí)行邏輯,參見:

Scheduling排期

Hudi Flink table service的排期主要位于如下兩個方法中:

  • StreamWriteOperatorCoordinator::handleInputEvent: batch模式需要schedule的table service。
  • StreamWriteOperatorCoordinator::notifyCheckpointComplete: streaming模式需要schedule的table service

接下來我們分別分析這兩個方法。
handleEndInputWvent方法:

private void handleEndInputEvent(WriteMetadataEvent event) {  
  addEventToBuffer(event);  
  // 如果已經(jīng)接收到所有數(shù)據(jù)
  if (allEventsReceived()) {  
    // start to commit the instant.  
    // 提交
    // 如果數(shù)據(jù)成功寫入,返回true,表示提交成功
    boolean committed = commitInstant(this.instant);  
    if (committed) {  
      // The executor thread inherits the classloader of the #handleEventFromOperator  
      // caller, which is a AppClassLoader.      Thread.currentThread().setContextClassLoader(getClass().getClassLoader());  
      // sync Hive synchronously if it is enabled in batch mode.  
      // 如果開啟了hive sync,執(zhí)行
      syncHive();  
      // schedules the compaction or clustering if it is enabled in batch execution mode  
      // 表服務(wù)排期
      scheduleTableServices(true);  
    }  
  }  
}

notifyCheckpointComplete方法。在checkpoint執(zhí)行成功的時候執(zhí)行回調(diào)。

@Override  
public void notifyCheckpointComplete(long checkpointId) {  
  executor.execute(  
      () -> {  
        // The executor thread inherits the classloader of the #notifyCheckpointComplete  
        // caller, which is a AppClassLoader.  
        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());  
        // for streaming mode, commits the ever received events anyway,  
        // the stream write task snapshot and flush the data buffer synchronously in sequence,        
        // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)        
        // 如果數(shù)據(jù)成功寫入,返回true,表示提交成功
        final boolean committed = commitInstant(this.instant, checkpointId);  
        // schedules the compaction or clustering if it is enabled in stream execution mode  
        // 排期表服務(wù)
        scheduleTableServices(committed);  
  
        if (committed) {  
          // start new instant.  
          // 寫入instant
          startInstant();  
          // sync Hive if is enabled  
          // 如果開啟了hive sync,執(zhí)行
          syncHiveAsync();  
        }  
      }, "commits the instant %s", this.instant  
  );  
}

scheduleTableServices方法:

private void scheduleTableServices(Boolean committed) {  
  // if compaction is on, schedule the compaction  
  // 如果是MOR表,并且開啟了compaction.schedule.enabled配置(默認開啟)
  if (tableState.scheduleCompaction) {  
    CompactionUtil.scheduleCompaction(writeClient, tableState.isDeltaTimeCompaction, committed);  
  }  
  // if clustering is on, schedule the clustering  
  // clustering.schedule.enabled如果開啟(默認不開啟)
  // 對于bucket index表,如果配置的是consistent hash(一致性hash),要求寫入類型必須是upsert
  // 否則(SIMPLE類型)要求寫入類型必須是insert
  if (tableState.scheduleClustering) {  
    ClusteringUtil.scheduleClustering(conf, writeClient, committed);  
  }  
}

Executing執(zhí)行

HoodieTableSink

Hudi Flink創(chuàng)建table service異步任務(wù)流位于HoodieTableSink::getSinkRuntimeProvider。

// ...
// Append mode  
// 如果是增量寫入模式
if (OptionsResolver.isAppendMode(conf)) {  
  // close compaction for append mode  
  // 關(guān)閉compaction schedule模式
  conf.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);  
  // append 模式寫入數(shù)據(jù)
  DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream);  
  // 如果需要異步clustering
  // write.operation為insert并且啟用了異步clustering(clustering.async.enabled為true)
  if (OptionsResolver.needsAsyncClustering(conf)) {  
    // 執(zhí)行clustering
    return Pipelines.cluster(conf, rowType, pipeline);  
    // 如果hoodie.cleaner.policy.failed.writes配置為lazy
  } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {  
    // add clean function to rollback failed writes for lazy failed writes cleaning policy  
    // 執(zhí)行清理
    return Pipelines.clean(conf, pipeline);  
  } else {  
    // 否則什么也不做
    return Pipelines.dummySink(pipeline);  
  }  
}  
  
DataStream<Object> pipeline;  
// bootstrap加載索引
final DataStream<HoodieRecord> hoodieRecordDataStream =  
    Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);  
// write pipeline  
// 流式寫入
pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);  
// compaction  
// 是否需要異步壓縮
// compaction.async.enabled是否為true。默認為true
if (OptionsResolver.needsAsyncCompaction(conf)) {  
  // use synchronous compaction for bounded source.  
  // 如果是bounded數(shù)據(jù)源(有頭有尾),使用同步壓縮
  if (context.isBounded()) {  
    conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);  
  }  
  // 執(zhí)行壓縮
  return Pipelines.compact(conf, pipeline);  
} else {  
  // 如果沒有配置壓縮,執(zhí)行清理
  return Pipelines.clean(conf, pipeline);  
}
// ...

通過上面的分析不難得知,F(xiàn)link中compact,clean和clustering表服務(wù)都在Pipeline中創(chuàng)建。接下來我們分析Pipeline的源代碼。

Pipelines

Pipeline創(chuàng)建了一條專用的數(shù)據(jù)流,這些數(shù)據(jù)流分別用來周期性創(chuàng)建compaction和clustering的執(zhí)行計劃,以及執(zhí)行compact,clean和clustering。它們獨立于系統(tǒng)的業(yè)務(wù)數(shù)據(jù)流。

Pipelines::compact

該方法用來啟動周期壓縮任務(wù)流。

public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {  
// 使用CompactionPlanOperator下發(fā)compaction執(zhí)行計劃
  DataStreamSink<CompactionCommitEvent> compactionCommitEventDataStream = dataStream.transform("compact_plan_generate",  
          TypeInformation.of(CompactionPlanEvent.class),  
          new CompactionPlanOperator(conf))  
      // plan生成過程必須是單并行度
      .setParallelism(1) // plan generate must be singleton  
      .setMaxParallelism(1)  
      // make the distribution strategy deterministic to avoid concurrent modifications  
      // on the same bucket files      .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())  
      // 使用CompactorOperator執(zhí)行壓縮計劃
      .transform("compact_task",  
          TypeInformation.of(CompactionCommitEvent.class),  
          new CompactOperator(conf))  
      // 并行度配置為compaction.tasks
      .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))  
      // CompactionSommitSink檢查并提交compaction instant
      .addSink(new CompactionCommitSink(conf))  
      .name("compact_commit")  
      // 執(zhí)行commit的并行度必須是1
      .setParallelism(1); // compaction commit should be singleton  
  compactionCommitEventDataStream.getTransformation().setMaxParallelism(1);  
  return compactionCommitEventDataStream;  
}

CompactionPlanOperator::notifyCheckpointComplete。在checkpoint的時候檢查是否生成的有requested狀態(tài)的compaction instant。如果有,生成CompactionPlanEvent發(fā)往下游。
SteamWriteOperatorCoordinator用來生成requested狀態(tài)的compaction instant,CompactionPlanOperator用來獲取到這些compaction instant,讀取保存的執(zhí)行計劃然后發(fā)往下游。

@Override  
public void notifyCheckpointComplete(long checkpointId) {  
  try {  
    table.getMetaClient().reloadActiveTimeline();  
    // There is no good way to infer when the compaction task for an instant crushed  
    // or is still undergoing. So we use a configured timeout threshold to control the rollback:    // {@code FlinkOptions.COMPACTION_TIMEOUT_SECONDS},    // when the earliest inflight instant has timed out, assumes it has failed    // already and just rolls it back.  
    // comment out: do we really need the timeout rollback ?    // CompactionUtil.rollbackEarliestCompaction(table, conf);    
    scheduleCompaction(table, checkpointId);  
  } catch (Throwable throwable) {  
    // make it fail-safe  
    LOG.error("Error while scheduling compaction plan for checkpoint: " + checkpointId, throwable);  
  }  
}

scheduleCompaction方法讀取第一個狀態(tài)為reqested狀態(tài)的compaction instant,獲取到它的compaction plan,將該compaction涉及到的compaction plan中的compactionOperation(即Compaction操作涉及到的file group信息)包裝為CompactionPlanEvent發(fā)往下游。

private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {  
  // 獲取包含所有pending compaction的timeline
  HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();  
  
  // the first instant takes the highest priority.  
  // 找到時間最早的requested compaction instant
  Option<HoodieInstant> firstRequested = pendingCompactionTimeline  
      .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant();  
  // record metrics  
  compactionMetrics.setFirstPendingCompactionInstant(firstRequested);  
  compactionMetrics.setPendingCompactionCount(pendingCompactionTimeline.countInstants());  
  // 如果沒有requested狀態(tài)的compaction instant,說明沒有必要schedule
  if (!firstRequested.isPresent()) {  
    // do nothing.  
    LOG.info("No compaction plan for checkpoint " + checkpointId);  
    return;  
  }  
  // 獲取這個requested compaction instant對應(yīng)的時間
  String compactionInstantTime = firstRequested.get().getTimestamp();  
  
  // generate compaction plan  
  // should support configurable commit metadata 
  獲取compaction plan
  HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(  
      table.getMetaClient(), compactionInstantTime);  

  // 如果沒有獲取到有效的compactio plan
  if (compactionPlan == null || (compactionPlan.getOperations() == null)  
      || (compactionPlan.getOperations().isEmpty())) {  
    // do nothing.  
    LOG.info("Empty compaction plan for instant " + compactionInstantTime);  
  } else {  
    // 獲取這個requested狀態(tài)的 compaction instant
    HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);  
    // Mark instant as compaction inflight  
    // 將它的狀態(tài)修改為inflight
    table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);  
    table.getMetaClient().reloadActiveTimeline();  
    // 獲取plan中所有的compaction operation,封裝為compactionPlanEvent發(fā)往下游
    List<CompactionOperation> operations = compactionPlan.getOperations().stream()  
        .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());  
    LOG.info("Execute compaction plan for instant {} as {} file groups", compactionInstantTime, operations.size());  
    // 刪除標(biāo)記文件
    WriteMarkersFactory  
        .get(table.getConfig().getMarkersType(), table, compactionInstantTime)  
        .deleteMarkerDir(table.getContext(), table.getConfig().getMarkersDeleteParallelism());  
    // 每一個operation封裝為一個CompactionPlanEvent
    // 這樣到下游的時候可以將這些壓縮任務(wù)均分
    // 每個并行度處理一部分file group的壓縮
    for (CompactionOperation operation : operations) {  
      output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation)));  
    }  
  }  
}

CompactOperatorprocessElement方法接收上面生成的CompactionPlanEvent,執(zhí)行壓縮任務(wù)。

@Override  
public void processElement(StreamRecord<CompactionPlanEvent> record) throws Exception {  
  final CompactionPlanEvent event = record.getValue();  
  // 獲取inflight compaction instant time
  final String instantTime = event.getCompactionInstantTime(); 
  // 獲取compaction operation 
  final CompactionOperation compactionOperation = event.getOperation(); 
  // 如果是異步壓縮
  // 在線程池中執(zhí)行壓縮,不會影響checkpoint過程 
  if (asyncCompaction) {  
    // executes the compaction task asynchronously to not block the checkpoint barrier propagate.  
    executor.execute(  
        () -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()),  
        (errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),  
        "Execute compaction for instant %s from task %d", instantTime, taskID);  
  } else {  
    // executes the compaction task synchronously for batch mode.  
    // 否則同步執(zhí)行compaction
    LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID);  
    doCompaction(instantTime, compactionOperation, collector, writeClient.getConfig());  
  }  
}

接下來的doCompaction方法間接調(diào)用了HoodieFlinkMergeOnReadTableCompactorcompact方法,前面的文章已有分析,這里不再贅述。

Pipelines::cluster

該方法啟動周期clustering任務(wù)流。

public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf, RowType rowType, DataStream<Object> dataStream) {  
  // 使用ClusteringPlanOperator下發(fā)clustering執(zhí)行計劃
  DataStream<ClusteringCommitEvent> clusteringStream = dataStream.transform("cluster_plan_generate",  
          TypeInformation.of(ClusteringPlanEvent.class),  
          new ClusteringPlanOperator(conf))  
      // 下發(fā)執(zhí)行計劃的并行度必須是1
      .setParallelism(1) // plan generate must be singleton  
      .setMaxParallelism(1) // plan generate must be singleton  
      .keyBy(plan ->  
          // make the distribution strategy deterministic to avoid concurrent modifications  
          // on the same bucket files    
          // 按照ClusteringPlanEvent的fileId分組
          // 針對同一個file slice的clustering操作會分配給相同的線程執(zhí)行,防止并發(fā)修改    
          plan.getClusteringGroupInfo().getOperations()                .stream().map(ClusteringOperation::getFileId.collect(Collectors.joining()))  
      .transform("clustering_task",  
          TypeInformation.of(ClusteringCommitEvent.class),  
          // 通過ClusteringOperator執(zhí)行clustering
          new ClusteringOperator(conf, rowType))  
      // clustering任務(wù)的并行度為clustering.tasks
      .setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS));  
  // 如果啟用了排序,即clustering.plan.strategy.sort.columns配置項不為空
  // 配置該步驟的執(zhí)行內(nèi)存,對應(yīng)配置項為write.sort.memory
  if (OptionsResolver.sortClusteringEnabled(conf)) {  
    ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(),  
        conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);  
  }  
  // 檢查并提交clustering instant
  DataStreamSink<ClusteringCommitEvent> clusteringCommitEventDataStream = clusteringStream.addSink(new ClusteringCommitSink(conf))  
      .name("clustering_commit")  
      .setParallelism(1); // clustering commit should be singleton  
  clusteringCommitEventDataStream.getTransformation().setMaxParallelism(1);  
  return clusteringCommitEventDataStream;  
}

ClusteringPlanOperatorschedule clustering的過程和前面schedule compaction的非常相似。

private void scheduleClustering(HoodieFlinkTable<?> table, long checkpointId) {  
  // 獲取request狀態(tài)的clustering instant
  List<HoodieInstant> pendingClusteringInstantTimes =  
      ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient());  
  // the first instant takes the highest priority.  
  // 獲取時間最早的一個
  Option<HoodieInstant> firstRequested = Option.fromJavaOptional(  
      pendingClusteringInstantTimes.stream()  
          .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).findFirst());  
  
  // record metrics  
  clusteringMetrics.setFirstPendingClusteringInstant(firstRequested);  
  clusteringMetrics.setPendingClusteringCount(pendingClusteringInstantTimes.size());  
  
  if (!firstRequested.isPresent()) {  
    // do nothing.  
    LOG.info("No clustering plan for checkpoint " + checkpointId);  
    return;  
  }  
  
  String clusteringInstantTime = firstRequested.get().getTimestamp();  
  
  // generate clustering plan  
  // should support configurable commit metadata  HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);  
  // 拿到之前生成的clustering plan
  Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(  
      table.getMetaClient(), clusteringInstant);  

  // 如果沒有獲取到有效的clustering plan,直接返回
  if (!clusteringPlanOption.isPresent()) {  
    // do nothing.  
    LOG.info("No clustering plan scheduled");  
    return;  
  }  
  
  HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();  
  
  if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null)  
      || (clusteringPlan.getInputGroups().isEmpty())) {  
    // do nothing.  
    LOG.info("Empty clustering plan for instant " + clusteringInstantTime);  
  } else {  
    // Mark instant as clustering inflight  
    table.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringInstant, Option.empty());  
    table.getMetaClient().reloadActiveTimeline();  
    // 遍歷所有的inputGroup,封裝為ClusteringPlanEvent
    for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) {  
      LOG.info("Execute clustering plan for instant {} as {} file slices", clusteringInstantTime, clusteringGroup.getSlices().size());  
      output.collect(new StreamRecord<>(  
          new ClusteringPlanEvent(clusteringInstantTime, ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams())  
      ));  
    }  
  }  
}

Clustering對應(yīng)一個file slice的操作封裝為了HoodieClusteringGroup(和compatcion的CompactionOperation對應(yīng))。這里將同一個plan中所有的HoodieClusteringGroup,每一個封裝為ClusteringPlanEvent,目的是為了下游可以并行執(zhí)行clustering。

ClusteringOperatorprocessElement方法執(zhí)行clustering計劃。和compaction相同,分為同步執(zhí)行和異步執(zhí)行兩種方式。

@Override  
public void processElement(StreamRecord<ClusteringPlanEvent> element) throws Exception {  
  final ClusteringPlanEvent event = element.getValue();  
  final String instantTime = event.getClusteringInstantTime();  
  final List<ClusteringOperation> clusteringOperations = event.getClusteringGroupInfo().getOperations();  
  if (this.asyncClustering) {  
    // executes the compaction task asynchronously to not block the checkpoint barrier propagate.  
    executor.execute(  
        () -> doClustering(instantTime, clusteringOperations),  
        (errMsg, t) -> collector.collect(new ClusteringCommitEvent(instantTime, getFileIds(clusteringOperations), taskID)),  
        "Execute clustering for instant %s from task %d", instantTime, taskID);  
  } else {  
    // executes the clustering task synchronously for batch mode.  
    LOG.info("Execute clustering for instant {} from task {}", instantTime, taskID);  
    doClustering(instantTime, clusteringOperations);  
  }  
}

doClustering方法為clustering過程的純Flink實現(xiàn)。

private void doClustering(String instantTime, List<ClusteringOperation> clusteringOperations) throws Exception {  
  clusteringMetrics.startClustering();  
  // 采用bulk insert的方式寫入clustering之后的數(shù)據(jù)
  BulkInsertWriterHelper writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,  
      instantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),  
      this.rowType, true);  
  
  Iterator<RowData> iterator;  
  // 如果clustering操作涉及到log文件,使用readRecordsForGroupWithLogs
  // 否則僅使用readRecordsForGroupBaseFiles
  // 這兩個方法讀取file group中的數(shù)據(jù),以iterator的形式返回
  if (clusteringOperations.stream().anyMatch(operation -> CollectionUtils.nonEmpty(operation.getDeltaFilePaths()))) {  
    // if there are log files, we read all records into memory for a file group and apply updates.  
    iterator = readRecordsForGroupWithLogs(clusteringOperations, instantTime);  
  } else {  
    // We want to optimize reading records for case there are no log files.  
    iterator = readRecordsForGroupBaseFiles(clusteringOperations);  
  }  
  // 如果配置了clustering.plan.strategy.sort.columns
  // 說明需要排序
  if (this.sortClusteringEnabled) {  
    RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType);
    // 使用BinaryexternalSorter來排序  
    // BinaryExternalSorter根據(jù)clustering.plan.strategy.sort.columns
    // 生成排序代碼
    BinaryExternalSorter sorter = initSorter();  
    while (iterator.hasNext()) {  
      RowData rowData = iterator.next();  
      BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy();  
      sorter.write(binaryRowData);  
    }  
    // 使用bulk insert寫入排序之后的數(shù)據(jù)
    BinaryRowData row = binarySerializer.createInstance();  
    while ((row = sorter.getIterator().next(row)) != null) {  
      writerHelper.write(row);  
    }  
    sorter.close();  
  } else {  
    while (iterator.hasNext()) {  
      writerHelper.write(iterator.next());  
    }  
  }  
  
  List<WriteStatus> writeStatuses = writerHelper.getWriteStatuses(this.taskID);  
  clusteringMetrics.endClustering();  
  collector.collect(new ClusteringCommitEvent(instantTime, getFileIds(clusteringOperations), writeStatuses, this.taskID));  
  writerHelper.close();  
}

Pipelines::clean

clean方法的實現(xiàn)較為簡單。Flink為數(shù)據(jù)流增加了一個CleanFunction類型的sink,并行度為1。代碼如下所示。

public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object> dataStream) {  
  DataStreamSink<Object> cleanCommitDataStream = dataStream.addSink(new CleanFunction<>(conf))  
      .setParallelism(1)  
      .name("clean_commits");  
  cleanCommitDataStream.getTransformation().setMaxParallelism(1);  
  return cleanCommitDataStream;

我們繼續(xù)分析CleanFunction。如果啟用了異步clean,CleanFunction在啟動的時候(open)異步執(zhí)行一次。創(chuàng)建checkpoint的時候snapshotState啟動異步clean服務(wù)。在checkpoint完成的時候notifyCheckpointComplete等待clean錯操作執(zhí)行完畢。相關(guān)代碼如下所示。

@Override  
public void open(Configuration parameters) throws Exception {  
  super.open(parameters);  
  this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext());  
  this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();  
  String instantTime = writeClient.createNewInstantTime();  
  LOG.info(String.format("exec clean with instant time %s...", instantTime));  
  if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {  
    executor.execute(() -> {  
      this.isCleaning = true;  
      try {  
        this.writeClient.clean(instantTime);  
      } finally {  
        this.isCleaning = false;  
      }  
    }, "wait for cleaning finish");  
  }  
}  
  
@Override  
public void notifyCheckpointComplete(long l) throws Exception {  
  if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) {  
    executor.execute(() -> {  
      try {  
        this.writeClient.waitForCleaningFinish();  
      } finally {  
        // ensure to switch the isCleaning flag  
        this.isCleaning = false;  
      }  
    }, "wait for cleaning finish");  
  }  
}  
  
@Override  
public void snapshotState(FunctionSnapshotContext context) throws Exception {  
  if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {  
    try {  
      this.writeClient.startAsyncCleaning();  
      this.isCleaning = true;  
    } catch (Throwable throwable) {  
      // catch the exception to not affect the normal checkpointing  
      LOG.warn("Error while start async cleaning", throwable);  
    }  
  }  
}

除了Pipeline::clean方法直接使用CleanFunction之外,我們還注意到ClusteringCommitSinkCompactionCommitSink都繼承了CleanFunction,都沒有重寫snapshotStatenotifyCheckPointComplete方法。因此這兩個sink的行為和CleanFunction一致,在checkpoint的時候會觸發(fā)clean操作。
繼續(xù)觀察這兩個sink的doCommit方法,我們發(fā)現(xiàn)最后一段都有如下代碼:

// Whether to clean up the old log file when compaction  
if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {  
  this.writeClient.clean();  
}

該段代碼表示,如果沒有啟用異步clean,且當(dāng)前時刻沒有clean沒有正在執(zhí)行,執(zhí)行同步clean操作。
接下來我們分析this.writeClient.startAsyncCleaning()調(diào)用,一路跟蹤下去。跟蹤過程中的非關(guān)鍵代碼這里不再展示。

  • CleanFunction的this.writeClient.startAsyncCleaning()
  • HoodieFlinkWriteClient的tableServiceClient.startAsyncCleanerService(this);
  • BaseHoodieTableServiceClient的this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(writeClient);
    分析到這里,我們確定了clean表服務(wù)是在AsyncCleanerService中封裝的。繼續(xù)分析startAsyncCleaningIfEnabled方法,該方法首先判斷是否開啟了異步clean配置,如果開啟了,創(chuàng)建一個異步clean服務(wù),代碼如下所示:
public static AsyncCleanerService startAsyncCleaningIfEnabled(BaseHoodieWriteClient writeClient) {  
  HoodieWriteConfig config = writeClient.getConfig();  
  // hoodie.clean.automatic和hoodie.clean.async這兩個任意一個配置為false,不運行該服務(wù)
  if (!config.isAutoClean() || !config.isAsyncClean()) {  
    LOG.info("The HoodieWriteClient is not configured to auto & async clean. Async clean service will not start.");  
    return null;  
  }  
  // 創(chuàng)建并啟動AsyncCleanerService
  AsyncCleanerService asyncCleanerService = new AsyncCleanerService(writeClient);  
  asyncCleanerService.start(null);  
  return asyncCleanerService;  
}

我們繼續(xù)分析asyncCleanerService.start,它位于HoodieAsyncServicestart方法中。
其中的startService方法將異步服務(wù)邏輯本身和運行異步服務(wù)的executor封裝為Pair返回。

public void start(Function<Boolean, Boolean> onShutdownCallback) {  
  if (started) {  
    LOG.warn("The async service already started.");  
    return;  
  }  
  Pair<CompletableFuture, ExecutorService> res = startService();  
  future = res.getKey();  
  executor = res.getValue();  
  started = true;  
  shutdownCallback(onShutdownCallback);  
}

startService方法在實現(xiàn)類AsyncCleanerFunction中,如下所示。

@Override  
protected Pair<CompletableFuture, ExecutorService> startService() {  
  String instantTime = writeClient.createNewInstantTime();  
  LOG.info(String.format("Starting async clean service with instant time %s...", instantTime));  
  //在線程池中啟動異步clean操作
  return Pair.of(CompletableFuture.supplyAsync(() -> {  
    writeClient.clean(instantTime);  
    return true;  
  }, executor), executor);  
}

調(diào)用完該方法之后,異步clean的邏輯封裝賦值到future變量中。
按照上面的分析notifyCheckpointComplete的時候執(zhí)行waitForCompletion方法。如果clean操作還沒有結(jié)束,這里阻塞等待其執(zhí)行完畢。

public static void waitForCompletion(AsyncCleanerService asyncCleanerService) {  
  if (asyncCleanerService != null) {  
    LOG.info("Waiting for async clean service to finish");  
    try {  
      asyncCleanerService.waitForShutdown();  
    } catch (Exception e) {  
      throw new HoodieException("Error waiting for async clean service to finish", e);  
    }  
  }  
}

分析HoodieAsnycServicewaitForshutdown方法,同步等待clean執(zhí)行完畢,內(nèi)容如下:

public void waitForShutdown() throws ExecutionException, InterruptedException {  
  if (future == null) {  
    return;  
  }  
  try {  
    future.get();  
  } catch (ExecutionException ex) {  
    LOG.error("Service shutdown with error", ex);  
    throw ex;  
  }  
}

參考文獻

Compaction | Apache Hudi
Cleaning | Apache Hudi
Clustering | Apache Hudi

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容