Flink源碼閱讀之Checkpoint執(zhí)行過程

前言

對應(yīng)Flink來說checkpoint的作用及重要性就不細(xì)說了,前面文章寫過checkpoint的詳細(xì)過程checkpoint周期性觸發(fā)過程。不熟悉checkpoint大概過程的同學(xué)可以查閱。
本篇我們在一起根據(jù)源碼看下checkpoint的詳細(xì)執(zhí)行過程。

checkpoint過程

源頭

我們都知道checkpoint的周期性觸發(fā)是由jobmanager中的一個叫做CheckpointCoordinator角色發(fā)起的,具體執(zhí)行在CheckpointCoordinator.triggerCheckpoint中,這個方法代碼邏輯很長,概括一下主要包括:

  1. 預(yù)檢查。包括
  • 是否需要強制進行 checkpoint
  • 當(dāng)前正在排隊的并發(fā) checkpoint 的數(shù)目是否超過閾值
  • 距離上一次成功 checkpoint 的間隔時間是否過小
    如果上述條件不滿足則不會進行這次checkpoint。
  1. 檢查需要觸發(fā)的task是否都是running狀態(tài),否則放棄。之前踩過坑,請見記一次flink不做checkpoint的問題。
  2. 檢查所有需要ack checkpoint完成的task是否都是running狀態(tài)。否則放棄。
    上面的檢查都通過之后就可以做checkpoint啦。
  3. 生成唯一自增的checkpointID。
  4. 初始化CheckpointStorageLocation,用于存儲這次checkpoint快照的路徑,不同的backend有區(qū)別。
  5. 生成 PendingCheckpoint,這表示一個處于中間狀態(tài)的 checkpoint,并保存在 checkpointId -> PendingCheckpoint 這樣的映射關(guān)系中。
  6. 注冊一個調(diào)度任務(wù),在 checkpoint 超時后取消此次 checkpoint,并重新觸發(fā)一次新的 checkpoint
  7. 調(diào)用 Execution.triggerCheckpoint() 方法向所有需要 trigger 的 task 發(fā)起 checkpoint 請求
for (Execution execution: executions) {
                if (props.isSynchronous()) {
                    execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
                } else {
                    execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
                }
            }

最終通過 RPC 調(diào)用 TaskExecutorGateway.triggerCheckpoint,即請求執(zhí)行 TaskExecutor.triggerCheckpoin()。 因為一個 TaskExecutor 中可能有多個 Task 正在運行,因而要根據(jù)觸發(fā) checkpoint 的 ExecutionAttemptID 找到對應(yīng)的 Task,然后調(diào)用 Task.triggerCheckpointBarrier() 方法

private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {

        final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
        if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
            throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
        }

        final LogicalSlot slot = assignedResource;

        if (slot != null) {
            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

            taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
        } else {
            LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
        }
    }
@Override
    public CompletableFuture<Acknowledge> triggerCheckpoint(
            ExecutionAttemptID executionAttemptID,
            long checkpointId,
            long checkpointTimestamp,
            CheckpointOptions checkpointOptions,
            boolean advanceToEndOfEventTime) {
        log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);

        final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
        if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
            throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
        }

        final Task task = taskSlotTable.getTask(executionAttemptID);

        if (task != null) {
            task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions, advanceToEndOfEventTime);

            return CompletableFuture.completedFuture(Acknowledge.get());
        } else {
            final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';

            log.debug(message);
            return FutureUtils.completedExceptionally(new CheckpointException(message, CheckpointFailureReason.TASK_CHECKPOINT_FAILURE));
        }
    }

Task 執(zhí)行 checkpoint 的真正邏輯被封裝在 AbstractInvokable.triggerCheckpointAsync(...) 中,

public void triggerCheckpointBarrier(
            final long checkpointID,
            final long checkpointTimestamp,
            final CheckpointOptions checkpointOptions,
            final boolean advanceToEndOfEventTime) {

        final AbstractInvokable invokable = this.invokable;
        final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);

        if (executionState == ExecutionState.RUNNING && invokable != null) {
            try {
                invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, advanceToEndOfEventTime);
            }
            catch (RejectedExecutionException ex) {
                // This may happen if the mailbox is closed. It means that the task is shutting down, so we just ignore it.
                LOG.debug(
                    "Triggering checkpoint {} for {} ({}) was rejected by the mailbox",
                    checkpointID, taskNameWithSubtask, executionId);
            }
            catch (Throwable t) {
                if (getExecutionState() == ExecutionState.RUNNING) {
                    failExternally(new Exception(
                        "Error while triggering checkpoint " + checkpointID + " for " +
                            taskNameWithSubtask, t));
                } else {
                    LOG.debug("Encountered error while triggering checkpoint {} for " +
                        "{} ({}) while being not in state running.", checkpointID,
                        taskNameWithSubtask, executionId, t);
                }
            }
        }
        else {
            LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);

            // send back a message that we did not do the checkpoint
            checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
                    new CheckpointException("Task name with subtask : " + taskNameWithSubtask, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
        }
    }

triggerCheckpointAsync方法分別被SourceStreamTask和普通StreamTask覆蓋,主要邏輯還是在StreamTask中

private boolean performCheckpoint(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetrics checkpointMetrics,
            boolean advanceToEndOfTime) throws Exception {

        LOG.debug("Starting checkpoint ({}) {} on task {}",
            checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());

        final long checkpointId = checkpointMetaData.getCheckpointId();

        if (isRunning) {
            actionExecutor.runThrowing(() -> {

                if (checkpointOptions.getCheckpointType().isSynchronous()) {
                    setSynchronousSavepointId(checkpointId);

                    if (advanceToEndOfTime) {
                        advanceToEndOfEventTime();
                    }
                }

                // All of the following steps happen as an atomic step from the perspective of barriers and
                // records/watermarks/timers/callbacks.
                // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
                // checkpoint alignments

                // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
                //           The pre-barrier work should be nothing or minimal in the common case.
                operatorChain.prepareSnapshotPreBarrier(checkpointId);

                // Step (2): Send the checkpoint barrier downstream
                operatorChain.broadcastCheckpointBarrier(
                        checkpointId,
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions);

                // Step (3): Take the state snapshot. This should be largely asynchronous, to not
                //           impact progress of the streaming topology
                checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);

            });

            return true;
        } else {
            actionExecutor.runThrowing(() -> {
                // we cannot perform our checkpoint - let the downstream operators know that they
                // should not wait for any input from this operator

                // we cannot broadcast the cancellation markers on the 'operator chain', because it may not
                // yet be created
                final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
                recordWriter.broadcastEvent(message);
            });

            return false;
        }
    }

主要做三件事:1)checkpoint的準(zhǔn)備操作,這里通常不進行太多操作;2)發(fā)送 CheckpointBarrier;3)存儲檢查點快照。

廣播Barrier

public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
        CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
        for (RecordWriterOutput<?> streamOutput : streamOutputs) {
            streamOutput.broadcastEvent(barrier);
        }
    }

進行快照

private void checkpointState(
            CheckpointMetaData checkpointMetaData,
            CheckpointOptions checkpointOptions,
            CheckpointMetrics checkpointMetrics) throws Exception {
        //checkpoint的存儲地址及元數(shù)據(jù)信息
        CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
                checkpointMetaData.getCheckpointId(),
                checkpointOptions.getTargetLocation());
        //將checkpoint的過程封裝為CheckpointingOperation對象
        CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
            this,
            checkpointMetaData,
            checkpointOptions,
            storage,
            checkpointMetrics);

        checkpointingOperation.executeCheckpointing();
    }

每一個算子的快照被抽象為 OperatorSnapshotFutures,包含了 operator state 和 keyed state 的快照結(jié)果:

public class OperatorSnapshotFutures {

    @Nonnull
    private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture;

    @Nonnull
    private RunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture;

    @Nonnull
    private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture;

    @Nonnull
    private RunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture;
    }

由于每一個 StreamTask 可能包含多個算子,因而內(nèi)部使用一個 Map 維護 OperatorID -> OperatorSnapshotFutures 的關(guān)系。

        private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;

快照的過程分同步和異步兩個部分

public void executeCheckpointing() throws Exception {
            startSyncPartNano = System.nanoTime();

            try {
            //同步
                for (StreamOperator<?> op : allOperators) {
                    checkpointStreamOperator(op);
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
                        checkpointMetaData.getCheckpointId(), owner.getName());
                }

                startAsyncPartNano = System.nanoTime();

                checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);

                // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
                //異步
                // checkpoint 可以配置成同步執(zhí)行,也可以配置成異步執(zhí)行的
                // 如果是同步執(zhí)行的,在這里實際上所有的 runnable future 都是已經(jīng)完成的狀態(tài)
                AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
                    owner,
                    operatorSnapshotsInProgress,
                    checkpointMetaData,
                    checkpointMetrics,
                    startAsyncPartNano);

                owner.cancelables.registerCloseable(asyncCheckpointRunnable);
                owner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);

                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - finished synchronous part of checkpoint {}. " +
                            "Alignment duration: {} ms, snapshot duration {} ms",
                        owner.getName(), checkpointMetaData.getCheckpointId(),
                        checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                        checkpointMetrics.getSyncDurationMillis());
                }
            } catch (Exception ex) {
                // Cleanup to release resources
                for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
                    if (null != operatorSnapshotResult) {
                        try {
                            operatorSnapshotResult.cancel();
                        } catch (Exception e) {
                            LOG.warn("Could not properly cancel an operator snapshot result.", e);
                        }
                    }
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +
                            "Alignment duration: {} ms, snapshot duration {} ms",
                        owner.getName(), checkpointMetaData.getCheckpointId(),
                        checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                        checkpointMetrics.getSyncDurationMillis());
                }

                if (checkpointOptions.getCheckpointType().isSynchronous()) {
                    // in the case of a synchronous checkpoint, we always rethrow the exception,
                    // so that the task fails.
                    // this is because the intention is always to stop the job after this checkpointing
                    // operation, and without the failure, the task would go back to normal execution.
                    throw ex;
                } else {
                    owner.getEnvironment().declineCheckpoint(checkpointMetaData.getCheckpointId(), ex);
                }
            }
        }

在同步執(zhí)行階段,會依次調(diào)用每一個算子的 StreamOperator.snapshotState,返回結(jié)果是一個 runnable future。根據(jù) checkpoint 配置成同步模式和異步模式的區(qū)別,這個 future 可能處于完成狀態(tài),也可能處于未完成狀態(tài):

private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
            if (null != op) {
//同步過程調(diào)用算子的snapshotState方法,返回OperatorSnapshotFutures可能已完成或未完成
                OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions,
                        storageLocation);
                operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
            }
        }

詳細(xì)過程在AbstractStreamOperator#snapshotState

public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
            CheckpointStreamFactory factory) throws Exception {

        KeyGroupRange keyGroupRange = null != keyedStateBackend ?
                keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;

        OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();

        StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
            checkpointId,
            timestamp,
            factory,
            keyGroupRange,
            getContainingTask().getCancelables());

        try {
            //對狀態(tài)進行快照,包括KeyedState和OperatorState
            snapshotState(snapshotContext);

            snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
            snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

            //寫入operatorState快照
            if (null != operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(
                    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
            //寫入keyedState快照    
            if (null != keyedStateBackend) {
                snapshotInProgress.setKeyedStateManagedFuture(
                    keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
        } catch (Exception snapshotException) {
            try {
                snapshotInProgress.cancel();
            } catch (Exception e) {
                snapshotException.addSuppressed(e);
            }

            String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
                getOperatorName() + ".";

            if (!getContainingTask().isCanceled()) {
                LOG.info(snapshotFailMessage, snapshotException);
            }
            try {
                snapshotContext.closeExceptionally();
            } catch (IOException e) {
                snapshotException.addSuppressed(e);
            }
            throw new CheckpointException(snapshotFailMessage, CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException);
        }

        return snapshotInProgress;
    }

我們知道state還分為raw state(原生state)和managed state(flink管理的state),timer定時器屬于raw state,也需要寫到snapshot中。

/**
     * Stream operators with state, which want to participate in a snapshot need to override this hook method.
     *
     * @param context context that provides information and means required for taking a snapshot
     */
    public void snapshotState(StateSnapshotContext context) throws Exception {
        final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
        //TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
        // 所有的 timer 都作為 raw keyed state 寫入
        if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
            ((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {

            KeyedStateCheckpointOutputStream out;

            try {
                out = context.getRawKeyedOperatorStateOutput();
            } catch (Exception exception) {
                throw new Exception("Could not open raw keyed operator state stream for " +
                    getOperatorName() + '.', exception);
            }

            try {
                KeyGroupsList allKeyGroups = out.getKeyGroupList();
                for (int keyGroupIdx : allKeyGroups) {
                    out.startNewKeyGroup(keyGroupIdx);

                    timeServiceManager.snapshotStateForKeyGroup(
                        new DataOutputViewStreamWrapper(out), keyGroupIdx);
                }
            } catch (Exception exception) {
                throw new Exception("Could not write timer service of " + getOperatorName() +
                    " to checkpoint state stream.", exception);
            } finally {
                try {
                    out.close();
                } catch (Exception closeException) {
                    LOG.warn("Could not close raw keyed operator state stream for {}. This " +
                        "might have prevented deleting some state data.", getOperatorName(), closeException);
                }
            }
        }
    }

上面是AbstractStreamOperator中的snapshotState做的操作,還有個子類AbstractUdfStreamOperator

public void snapshotState(StateSnapshotContext context) throws Exception {
        //先調(diào)用父類方法,寫入timer
        super.snapshotState(context);
        StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
    }
public static void snapshotFunctionState(
            StateSnapshotContext context,
            OperatorStateBackend backend,
            Function userFunction) throws Exception {

        Preconditions.checkNotNull(context);
        Preconditions.checkNotNull(backend);

        while (true) {

            if (trySnapshotFunctionState(context, backend, userFunction)) {
                break;
            }

            // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
            if (userFunction instanceof WrappingFunction) {
                userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
            } else {
                break;
            }
        }
    }


private static boolean trySnapshotFunctionState(
            StateSnapshotContext context,
            OperatorStateBackend backend,
            Function userFunction) throws Exception {
        //如果用戶函數(shù)實現(xiàn)了CheckpointedFunction接口,則調(diào)用udf中的snapshotState方法進行快照
        if (userFunction instanceof CheckpointedFunction) {
            ((CheckpointedFunction) userFunction).snapshotState(context);

            return true;
        }
        // 如果用戶函數(shù)實現(xiàn)了 ListCheckpointed
        if (userFunction instanceof ListCheckpointed) {
        //先調(diào)用 snapshotState 方法獲取當(dāng)前狀態(tài)
            @SuppressWarnings("unchecked")
            List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
                    snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
            //獲取狀態(tài)后端存儲引用
            ListState<Serializable> listState = backend.
                    getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
            //清空
            listState.clear();
            //當(dāng)前狀態(tài)寫入狀態(tài)后端存儲
            if (null != partitionableState) {
                try {
                    for (Serializable statePartition : partitionableState) {
                        listState.add(statePartition);
                    }
                } catch (Exception e) {
                    listState.clear();

                    throw new Exception("Could not write partitionable state to operator " +
                        "state backend.", e);
                }
            }

            return true;
        }

        return false;
    }

到這里我們知道了checkpoint過程中如何調(diào)用到我們自己實現(xiàn)的快照方法。再看下flink管理的狀態(tài)是如何寫入快照的。

            if (null != operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(
                    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
            
            if (null != keyedStateBackend) {
                snapshotInProgress.setKeyedStateManagedFuture(
                    keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }

首先來看看 operator state。DefaultOperatorStateBackend 將實際的工作交給 DefaultOperatorStateBackendSnapshotStrategy 完成。首先,會為對當(dāng)前注冊的所有 operator state(包含 list state 和 broadcast state)做深度拷貝,然后將實際的寫入操作封裝在一個異步的 FutureTask 中,這個 FutureTask 的主要任務(wù)包括: 1)打開輸出流 2)寫入狀態(tài)元數(shù)據(jù)信息 3)寫入狀態(tài) 4)關(guān)閉輸出流,獲得狀態(tài)句柄。如果不啟用異步checkpoint模式,那么這個 FutureTask 在同步階段就會立刻執(zhí)行。

public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(
        final long checkpointId,
        final long timestamp,
        @Nonnull final CheckpointStreamFactory streamFactory,
        @Nonnull final CheckpointOptions checkpointOptions) throws IOException {

        if (registeredOperatorStates.isEmpty() && registeredBroadcastStates.isEmpty()) {
            return DoneFuture.of(SnapshotResult.empty());
        }

        final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =
            new HashMap<>(registeredOperatorStates.size());
        final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =
            new HashMap<>(registeredBroadcastStates.size());

        ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(userClassLoader);
        try {
            // eagerly create deep copies of the list and the broadcast states (if any)
            // in the synchronous phase, so that we can use them in the async writing.
            //獲得已注冊的所有 list state 和 broadcast state 的深拷貝
            if (!registeredOperatorStates.isEmpty()) {
                for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) {
                    PartitionableListState<?> listState = entry.getValue();
                    if (null != listState) {
                        listState = listState.deepCopy();
                    }
                    registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
                }
            }

            if (!registeredBroadcastStates.isEmpty()) {
                for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStates.entrySet()) {
                    BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
                    if (null != broadcastState) {
                        broadcastState = broadcastState.deepCopy();
                    }
                    registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
                }
            }
        } finally {
            Thread.currentThread().setContextClassLoader(snapshotClassLoader);
        }
//將主要寫入操作封裝為一個異步的FutureTask
        AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable =
            new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>() {

                @Override
                protected SnapshotResult<OperatorStateHandle> callInternal() throws Exception {
                    // 創(chuàng)建狀態(tài)輸出流
                    CheckpointStreamFactory.CheckpointStateOutputStream localOut =
                        streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
                    snapshotCloseableRegistry.registerCloseable(localOut);
                    // 收集元數(shù)據(jù)
                    // get the registered operator state infos ...
                    List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots =
                        new ArrayList<>(registeredOperatorStatesDeepCopies.size());

                    for (Map.Entry<String, PartitionableListState<?>> entry :
                        registeredOperatorStatesDeepCopies.entrySet()) {
                        operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
                    }
                    // 寫入元數(shù)據(jù)
                    // ... get the registered broadcast operator state infos ...
                    List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots =
                        new ArrayList<>(registeredBroadcastStatesDeepCopies.size());

                    for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
                        registeredBroadcastStatesDeepCopies.entrySet()) {
                        broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
                    }
        
                    // ... write them all in the checkpoint stream ...
                    DataOutputView dov = new DataOutputViewStreamWrapper(localOut);

                    OperatorBackendSerializationProxy backendSerializationProxy =
                        new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);

                    backendSerializationProxy.write(dov);

                    // ... and then go for the states ...
                    // 寫入狀態(tài)
                    // we put BOTH normal and broadcast state metadata here
                    int initialMapCapacity =
                        registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size();
                    final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =
                        new HashMap<>(initialMapCapacity);

                    for (Map.Entry<String, PartitionableListState<?>> entry :
                        registeredOperatorStatesDeepCopies.entrySet()) {

                        PartitionableListState<?> value = entry.getValue();
                        long[] partitionOffsets = value.write(localOut);
                        OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
                        writtenStatesMetaData.put(
                            entry.getKey(),
                            new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                    }

                    // ... and the broadcast states themselves ...
                    for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry :
                        registeredBroadcastStatesDeepCopies.entrySet()) {

                        BackendWritableBroadcastState<?, ?> value = entry.getValue();
                        long[] partitionOffsets = {value.write(localOut)};
                        OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
                        writtenStatesMetaData.put(
                            entry.getKey(),
                            new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                    }

                    // ... and, finally, create the state handle.
                    OperatorStateHandle retValue = null;

                    if (snapshotCloseableRegistry.unregisterCloseable(localOut)) {
                        //關(guān)閉輸出流,獲得狀態(tài)句柄,后面可以用這個句柄讀取狀態(tài)
                        StreamStateHandle stateHandle = localOut.closeAndGetHandle();

                        if (stateHandle != null) {
                            retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
                        }

                        return SnapshotResult.of(retValue);
                    } else {
                        throw new IOException("Stream was already unregistered.");
                    }
                }

                @Override
                protected void cleanupProvidedResources() {
                    // nothing to do
                }

                @Override
                protected void logAsyncSnapshotComplete(long startTime) {
                    if (asynchronousSnapshots) {
                        logAsyncCompleted(streamFactory, startTime);
                    }
                }
            };

        final FutureTask<SnapshotResult<OperatorStateHandle>> task =
            snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry);
//如果不是異步 checkpoint 那么在這里直接運行 FutureTask,即在同步階段就完成了狀態(tài)的寫入
        if (!asynchronousSnapshots) {
            task.run();
        }

        return task;
    }

keyed state 寫入的基本流程與此相似,但由于 keyed state 在存儲時有多種實現(xiàn),包括基于堆內(nèi)存和 RocksDB 的不同實現(xiàn),此外基于 RocksDB 的實現(xiàn)還包括支持增量 checkpoint,因而相比于 operator state 要更復(fù)雜一些。

至此,我們介紹了快照操作的第一個階段,即同步執(zhí)行的階段。異步執(zhí)行階段被封裝為 AsyncCheckpointRunnable,主要的操作包括 1)執(zhí)行同步階段創(chuàng)建的 FutureTask 2)完成后向 CheckpointCoordinator 發(fā)送 Ack 響應(yīng)。

protected static final class AsyncCheckpointRunnable implements Runnable, Closeable {
        @Override
        public void run() {
            FileSystemSafetyNet.initializeSafetyNetForThread();
            try {
                TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =
                    new TaskStateSnapshot(operatorSnapshotsInProgress.size());
                TaskStateSnapshot localTaskOperatorSubtaskStates =
                    new TaskStateSnapshot(operatorSnapshotsInProgress.size());

                // 完成每一個 operator 的狀態(tài)寫入
                // 如果是同步 checkpoint,那么在此之前狀態(tài)已經(jīng)寫入完成
                // 如果是異步 checkpoint,那么在這里才會寫入狀態(tài)
                for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()) {
                    OperatorID operatorID = entry.getKey();
                    OperatorSnapshotFutures snapshotInProgress = entry.getValue();
                    // finalize the async part of all by executing all snapshot runnables
                    OperatorSnapshotFinalizer finalizedSnapshots =
                        new OperatorSnapshotFinalizer(snapshotInProgress);

                    jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                        operatorID,
                        finalizedSnapshots.getJobManagerOwnedState());

                    localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
                        operatorID,
                        finalizedSnapshots.getTaskLocalState());
                }

                final long asyncEndNanos = System.nanoTime();
                final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000L;

                checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);

                if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,
                    CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
                    //報告 snapshot 完成
                    reportCompletedSnapshotStates(
                        jobManagerTaskOperatorSubtaskStates,
                        localTaskOperatorSubtaskStates,
                        asyncDurationMillis);

                } else {
                    LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
                        owner.getName(),
                        checkpointMetaData.getCheckpointId());
                }
            } catch (Exception e) {
                handleExecutionException(e);
            } finally {
                owner.cancelables.unregisterCloseable(this);
                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
            }
        }
    }

    private void reportCompletedSnapshotStates(
            TaskStateSnapshot acknowledgedTaskStateSnapshot,
            TaskStateSnapshot localTaskStateSnapshot,
            long asyncDurationMillis) {
            TaskStateManager taskStateManager = owner.getEnvironment().getTaskStateManager();
            boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
            boolean hasLocalState = localTaskStateSnapshot.hasState();
            // we signal stateless tasks by reporting null, so that there are no attempts to assign empty state
            // to stateless tasks on restore. This enables simple job modifications that only concern
            // stateless without the need to assign them uids to match their (always empty) states.
            taskStateManager.reportTaskStateSnapshots(
                checkpointMetaData,
                checkpointMetrics,
                hasAckState ? acknowledgedTaskStateSnapshot : null,
                hasLocalState ? localTaskStateSnapshot : null);
        }
}

public class TaskStateManagerImpl implements TaskStateManager {
    @Override
    public void reportTaskStateSnapshots(
        @Nonnull CheckpointMetaData checkpointMetaData,
        @Nonnull CheckpointMetrics checkpointMetrics,
        @Nullable TaskStateSnapshot acknowledgedState,
        @Nullable TaskStateSnapshot localState) {

        long checkpointId = checkpointMetaData.getCheckpointId();

        localStateStore.storeLocalState(checkpointId, localState);

        //發(fā)送 ACK 響應(yīng)給 CheckpointCoordinator
        checkpointResponder.acknowledgeCheckpoint(
            jobId,
            executionAttemptID,
            checkpointId,
            checkpointMetrics,
            acknowledgedState);
    }
}

Checkpoint 的確認(rèn)

Task 對 checkpoint 的響應(yīng)是通過 CheckpointResponder 接口完成的:

public interface CheckpointResponder {

    /**
     * Acknowledges the given checkpoint.
     */
    void acknowledgeCheckpoint(
        JobID jobID,
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        CheckpointMetrics checkpointMetrics,
        TaskStateSnapshot subtaskState);

    /**
     * Declines the given checkpoint.
     */
    void declineCheckpoint(
        JobID jobID,
        ExecutionAttemptID executionAttemptID,
        long checkpointId,
        Throwable cause);
}

RpcCheckpointResponder 作為 CheckpointResponder 的具體實現(xiàn),主要是通過 RPC 調(diào)用通知 CheckpointCoordinatorGateway,即通知給 JobMaster, JobMaster 調(diào)用 CheckpointCoordinator.receiveAcknowledgeMessage() 和 CheckpointCoordinator.receiveDeclineMessage() 進行處理。

確認(rèn)完成

在一個 Task 完成 checkpoint 操作后,CheckpointCoordinator 接收到 Ack 響應(yīng),對 Ack 響應(yīng)的處理流程主要如下:

  • 根據(jù) Ack 的 checkpointID 從 Map<Long, PendingCheckpoint> pendingCheckpoints 中查找對應(yīng)的 PendingCheckpoint
  • 若存在對應(yīng)的 PendingCheckpoint
    • 這個 PendingCheckpoint 沒有被丟棄,調(diào)用 PendingCheckpoint.acknowledgeTask 方法處理 Ack,根據(jù)處理結(jié)果的不同:
      • SUCCESS:判斷是否已經(jīng)接受了所有需要響應(yīng)的 Ack,如果是,則調(diào)用 completePendingCheckpoint 完成此次 checkpoint
      • DUPLICATE:Ack 消息重復(fù)接收,直接忽略
      • UNKNOWN:未知的 Ack 消息,清理上報的 Ack 中攜帶的狀態(tài)句柄
      • DISCARD:Checkpoint 已經(jīng)被 discard,清理上報的 Ack 中攜帶的狀態(tài)句柄
    • 這個 PendingCheckpoint 已經(jīng)被丟棄,拋出異常
  • 若不存在對應(yīng)的 PendingCheckpoint,則清理上報的 Ack 中攜帶的狀態(tài)句柄
    相應(yīng)代碼:
class CheckpointCoordinator {
    public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
        if (shutdown || message == null) {
            return false;
        }

        if (!job.equals(message.getJob())) {
            LOG.error("Received wrong AcknowledgeCheckpoint message for job {}: {}", job, message);
            return false;
        }

        final long checkpointId = message.getCheckpointId();

        synchronized (lock) {
            // we need to check inside the lock for being shutdown as well, otherwise we
            // get races and invalid error log messages
            if (shutdown) {
                return false;
            }

            final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);

            if (checkpoint != null && !checkpoint.isDiscarded()) {

                switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
                    case SUCCESS:
                        LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
                            checkpointId, message.getTaskExecutionId(), message.getJob());

                        if (checkpoint.isFullyAcknowledged()) {
                            completePendingCheckpoint(checkpoint);
                        }
                        break;
                    case DUPLICATE:
                        LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
                            message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
                        break;
                    case UNKNOWN:
                        LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
                                "because the task's execution attempt id was unknown. Discarding " +
                                "the state handle to avoid lingering state.", message.getCheckpointId(),
                            message.getTaskExecutionId(), message.getJob());

                        discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

                        break;
                    case DISCARDED:
                        LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
                            "because the pending checkpoint had been discarded. Discarding the " +
                                "state handle tp avoid lingering state.",
                            message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());

                        discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
                }

                return true;
            }
            else if (checkpoint != null) {
                // this should not happen
                throw new IllegalStateException(
                        "Received message for discarded but non-removed checkpoint " + checkpointId);
            }
            else {
                boolean wasPendingCheckpoint;
                // message is for an unknown checkpoint, or comes too late (checkpoint disposed)
                if (recentPendingCheckpoints.contains(checkpointId)) {
                    wasPendingCheckpoint = true;
                    LOG.warn("Received late message for now expired checkpoint attempt {} from " +
                        "{} of job {}.", checkpointId, message.getTaskExecutionId(), message.getJob());
                }
                else {
                    LOG.debug("Received message for an unknown checkpoint {} from {} of job {}.",
                        checkpointId, message.getTaskExecutionId(), message.getJob());
                    wasPendingCheckpoint = false;
                }

                // try to discard the state so that we don't have lingering state lying around
                discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

                return wasPendingCheckpoint;
            }
        }
    }
}

對于一個已經(jīng)觸發(fā)但還沒有完成的 checkpoint,即 PendingCheckpoint,它是如何處理 Ack 消息的呢?在 PendingCheckpoint 內(nèi)部維護了兩個 Map,分別是:

  • Map<OperatorID, OperatorState> operatorStates; : 已經(jīng)接收到 Ack 的算子的狀態(tài)句柄
  • Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;: 需要 Ack 但還沒有接收到的 Task

每當(dāng)接收到一個 Ack 消息時,PendingCheckpoint 就從 notYetAcknowledgedTasks 中移除對應(yīng)的 Task,并保存 Ack 攜帶的狀態(tài)句柄保存。當(dāng) notYetAcknowledgedTasks 為空時,表明所有的 Ack 消息都接收到了。

一旦 PendingCheckpoint 確認(rèn)所有 Ack 消息都已經(jīng)接收,那么就可以完成此次 checkpoint 了,具體包括:

  • 調(diào)用 PendingCheckpoint.finalizeCheckpoint() 將 PendingCheckpoint 轉(zhuǎn)化為 CompletedCheckpoint
    • 獲取 CheckpointMetadataOutputStream,將所有的狀態(tài)句柄信息通過 CheckpointMetadataOutputStream 寫入到存儲系統(tǒng)中
    • 創(chuàng)建一個 CompletedCheckpoint 對象
  • 將 CompletedCheckpoint 保存到 CompletedCheckpointStore 中
    • CompletedCheckpointStore 有兩種實現(xiàn),分別為 StandaloneCompletedCheckpointStore 和 ZooKeeperCompletedCheckpointStore
    • StandaloneCompletedCheckpointStore 簡單地將 CompletedCheckpointStore 存放在一個數(shù)組中
    • ZooKeeperCompletedCheckpointStore 提供高可用實現(xiàn):先將 CompletedCheckpointStore 寫入到 RetrievableStateStorageHelper 中(通常是文件系統(tǒng)),然后將文件句柄存在 ZK 中
    • 保存的 CompletedCheckpointStore 數(shù)量是有限的,會刪除舊的快照
  • 移除被越過的 PendingCheckpoint,因為 CheckpointID 是遞增的,那么所有比當(dāng)前完成的 CheckpointID 小的 PendingCheckpoint 都可以被丟棄了
  • 依次調(diào)用 Execution.notifyCheckpointComplete() 通知所有的 Task 當(dāng)前 Checkpoint 已經(jīng)完成
    • 通過 RPC 調(diào)用 TaskExecutor.confirmCheckpoint() 告知對應(yīng)的 Task

Task收到notifyCheckpointComplete確認(rèn)后進行后續(xù)處理,比如kafkaproduce的兩段式提交過程。

總結(jié)

本文分析了checkpoint進行snapshot的過程,包括廣播barrier、進行snapshot以及checkpoint完成后的ACK過程。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容