admin管理员组

文章数量:1026989

一文搞懂 checkpoint 全过程

前言

前面我们讲解了 一文搞懂 Flink 处理 Barrier 全过程 和 一文搞定 Flink Checkpoint Barrier 全流程 基本上都是跟 checkpoint 相关。这次我们就具体看一下 checkpoint 是如何发生的。

正文

跟 checkpoint 相关的起点在 buildGraph

@Deprecatedpublic static ExecutionGraph buildGraph(@Nullable ExecutionGraph prior,JobGraph jobGraph,Configuration jobManagerConfig,ScheduledExecutorService futureExecutor,Executor ioExecutor,SlotProvider slotProvider,ClassLoader classLoader,CheckpointRecoveryFactory recoveryFactory,Time rpcTimeout,RestartStrategy restartStrategy,MetricGroup metrics,int parallelismForAutoMax,BlobWriter blobWriter,Time allocationTimeout,Logger log)throws JobExecutionException, JobException {checkNotNull(jobGraph, "job graph cannot be null");final String jobName = jobGraph.getName();final JobID jobId = jobGraph.getJobID();final FailoverStrategy.Factory failoverStrategy =FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);final JobInformation jobInformation = new JobInformation(jobId,jobName,jobGraph.getSerializedExecutionConfig(),jobGraph.getJobConfiguration(),jobGraph.getUserJarBlobKeys(),jobGraph.getClasspaths());// create a new execution graph, if none exists so farfinal ExecutionGraph executionGraph;try {executionGraph = (prior != null) ? prior :new ExecutionGraph(jobInformation,futureExecutor,ioExecutor,rpcTimeout,restartStrategy,failoverStrategy,slotProvider,classLoader,blobWriter,allocationTimeout);} catch (IOException e) {throw new JobException("Could not create the ExecutionGraph.", e);}......// configure the state checkpointingJobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();if (snapshotSettings != null) {// 确定哪些 operator chain trigger checkpoint ,哪些 operator chain ack ,哪些 operator chain confirm// 用来 trigger checkpointList<ExecutionJobVertex> triggerVertices =idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);//用来 ack checkpointList<ExecutionJobVertex> ackVertices =idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);//用来 confirm checkpoint List<ExecutionJobVertex> confirmVertices =idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);CompletedCheckpointStore completedCheckpoints;CheckpointIDCounter checkpointIdCounter;try {int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);if (maxNumberOfCheckpointsToRetain <= 0) {// warning and use 1 as the default value if the setting in// state.checkpoints.max-retained-checkpoints is not greater than 0.log.warn("The setting for '{} : {}' is invalid. Using default value of {}",CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),maxNumberOfCheckpointsToRetain,CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());maxNumberOfCheckpointsToRetain = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();}// HA 会连接 zookeeper maxNumberOfCheckpointsToRetain 保持多少个 checkpoint 默认是一个completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);}catch (Exception e) {throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e);}// Maximum number of remembered checkpoints 默认是 10 个int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);// 用户 web 界面显示 checkpoint ack 情况CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(historySize,ackVertices,snapshotSettings.getCheckpointCoordinatorConfiguration(),metrics);// The default directory for externalized checkpointsString externalizedCheckpointsDir = jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);......final StateBackend rootBackend;try {// 在 builder executionGraph 确定 state backendrootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(applicationConfiguredBackend, jobManagerConfig, classLoader, log);}catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);}
......final CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();// 结合 checkpoint config,还有// triggerVertices、ackVertices、confirmVertices、state backend、checkpointStatsTracker// 会创建 CheckpointCoordinator 对象executionGraph.enableCheckpointing(chkConfig.getCheckpointInterval(),chkConfig.getCheckpointTimeout(),chkConfig.getMinPauseBetweenCheckpoints(),chkConfig.getMaxConcurrentCheckpoints(),chkConfig.getCheckpointRetentionPolicy(),triggerVertices,ackVertices,confirmVertices,hooks,checkpointIdCounter,completedCheckpoints,rootBackend,checkpointStatsTracker);}......return executionGraph;}

在 build graph 时确定了 triggerVertices ( 用来触发 chekcpoint,也是下面提到的 trigger tasks 往往是 source task operator chains ),ackVertices ( 用来接收 checkpoint 已经完成的报告,也是下面要提到的 ackTasks , 每个需要做 checkpoint 的 operator chain 都会属于它 )以及 confirmVertices ( 用来确认 checkpoint 已经完成, 每个需要做 checkpoint 的 operator chain 都需要 confirm ,这也算是 checkpoint 的二阶段提交了 )。
当 flink 提交 job 时,会启动 CheckpointCoordinator.startCheckpointScheduler 方法

// flink 在启动 job 时,会启 动这个方法public void startCheckpointScheduler() {synchronized (lock) {if (shutdown) {throw new IllegalArgumentException("Checkpoint coordinator is shut down");}// make sure all prior timers are cancelledstopCheckpointScheduler();periodicScheduling = true;long initialDelay = ThreadLocalRandom.current().nextLong(minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);// 定时任务currentPeriodicTrigger = timer.scheduleAtFixedRate(new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);}}

通过一个定时任务来执行 ScheduledTrigger

//触发 checkpointprivate final class ScheduledTrigger implements Runnable {@Overridepublic void run() {try {triggerCheckpoint(System.currentTimeMillis(), true);}catch (Exception e) {LOG.error("Exception while triggering checkpoint for job {}.", job, e);}}}

开始执行 trigger checkpoint

@VisibleForTesting//触发 checkpointpublic CheckpointTriggerResult triggerCheckpoint(long timestamp,CheckpointProperties props,@Nullable String externalSavepointLocation,boolean isPeriodic) {......// check if all tasks that we need to trigger are running.// if not, abort the checkpointExecution[] executions = new Execution[tasksToTrigger.length];for (int i = 0; i < tasksToTrigger.length; i++) {Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();if (ee == null) {LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",tasksToTrigger[i].getTaskNameWithSubtaskIndex(),job);return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);} else if (ee.getState() == ExecutionState.RUNNING) {executions[i] = ee;} else {LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",tasksToTrigger[i].getTaskNameWithSubtaskIndex(),job,ExecutionState.RUNNING,ee.getState());return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}// next, check if all tasks that need to acknowledge the checkpoint are running.// if not, abort the checkpointMap<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);for (ExecutionVertex ev : tasksToWaitFor) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {ackTasks.put(ee.getAttemptId(), ev);} else {LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",ev.getTaskNameWithSubtaskIndex(),job);return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}......//启动一个checkpoint,但还没有被确认,待所有 task 都确认了本次 checkpoint,那么这个 checkpoint 对象将转化为一个 CompleteCheckpointfinal PendingCheckpoint checkpoint = new PendingCheckpoint(job,checkpointID,timestamp,ackTasks, // 需要 ack checkpoint 的 tasksprops,checkpointStorageLocation,executor);if (statsTracker != null) {PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(checkpointID,timestamp,props);checkpoint.setStatsCallback(callback);}// schedule the timer that will clean up the expired checkpointsfinal Runnable canceller = () -> {synchronized (lock) {// only do the work if the checkpoint is not discarded anyways// note that checkpoint completion discards the pending checkpoint objectif (!checkpoint.isDiscarded()) {LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);checkpoint.abortExpired();pendingCheckpoints.remove(checkpointID);rememberRecentCheckpointId(checkpointID);triggerQueuedRequests();}}};try {// re-acquire the coordinator-wide locksynchronized (lock) {......// end of lock scopefinal CheckpointOptions checkpointOptions = new CheckpointOptions(props.getCheckpointType(),checkpointStorageLocation.getLocationReference());// send the messages to the tasks that trigger their checkpointfor (Execution execution: executions) {//trigger task (operator chain,在产生 ExecutionGraph 是确定的 )// 调用 TaskExecutor.triggerCheckpoint 最终调用 task.triggerCheckpointBarrier// source ->flatMapexecution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}numUnsuccessfulCheckpointsTriggers.set(0);return new CheckpointTriggerResult(checkpoint);}......} // end trigger lock}

这里有 trigger task 触发 checkpoint 。追踪至 task.triggerCheckpoint

@Override// trigger operator chain task trigger checkpointpublic CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID,long checkpointId,long checkpointTimestamp,CheckpointOptions checkpointOptions) {log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);final Task task = taskSlotTable.getTask(executionAttemptID);if (task != null) {task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);return CompletableFuturepletedFuture(Acknowledge.get());} else {final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';log.debug(message);return FutureUtilspletedExceptionally(new CheckpointException(message));}}

然后就到 triggerCheckpointBarrier 方法了

// trigger operator chain trigger checkpoint  最终触发 triggerCheckpointBarrierpublic void triggerCheckpointBarrier(final long checkpointID,long checkpointTimestamp,final CheckpointOptions checkpointOptions) {//实际上就是 StreamTask  Task类实际上是将 checkpoint 委托给了具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码// source ->flatMap// invokable 实际上是 operator chainfinal AbstractInvokable invokable = this.invokable;final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);if (executionState == ExecutionState.RUNNING && invokable != null) {// build a local closurefinal String taskName = taskNameWithSubtask;final SafetyNetCloseableRegistry safetyNetCloseableRegistry =FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();Runnable runnable = new Runnable() {@Overridepublic void run() {// set safety net from the task's context for checkpointing threadLOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);try {// invokable 事实上就是 StreamTask Task 类实际上是将 checkpoint 委托给了更具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);if (!success) {checkpointResponder.declineCheckpoint(getJobID(), getExecutionId(), checkpointID,new CheckpointDeclineTaskNotReadyException(taskName));}} catch (Throwable t) {if (getExecutionState() == ExecutionState.RUNNING) {failExternally(new Exception("Error while triggering checkpoint " + checkpointID + " for " +taskNameWithSubtask, t));} else {LOG.debug("Encountered error while triggering checkpoint {} for " +"{} ({}) while being not in state running.", checkpointID,taskNameWithSubtask, executionId, t);}} finally {FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);}}};executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));} else {LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);// send back a message that we did not do the checkpointcheckpointResponder.declineCheckpoint(jobId, executionId, checkpointID,new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));}}

由 invokable 调用 triggerCheckpoint。由于 trigger task 都是 source operator chain 所以进入 sourceStreamTask

@Overridepublic boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {if (!externallyInducedCheckpoints) {return super.triggerCheckpoint(checkpointMetaData, checkpointOptions);}else {// we do not trigger checkpoints here, we simply state whether we can trigger themsynchronized (getCheckpointLock()) {return isRunning();}}}

具体跟踪到 StreamTask

// trigger opator chain 一路调用到这里,开始出现 barrier (实际上是定时任务 checkpoint 产生的)private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics) throws Exception {LOG.debug("Starting checkpoint ({}) {} on task {}",checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());synchronized (lock) {if (isRunning) {// we can do a checkpoint// All of the following steps happen as an atomic step from the perspective of barriers and// records/watermarks/timers/callbacks.// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream// checkpoint alignments// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.//           The pre-barrier work should be nothing or minimal in the common case.//注意,从这里开始,整个执行链路上开始出现BarrieroperatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());// Step (2): Send the checkpoint barrier downstream/*反压时,此处会阻塞 source chain do checkpoint,因为会申请内存发送 barrier 到下游,下游的 operator 接收到本 barrier 就会触发其自身的 checkpoint*/operatorChain.broadcastCheckpointBarrier(checkpointMetaData.getCheckpointId(),checkpointMetaData.getTimestamp(),checkpointOptions);// Step (3): Take the state snapshot. This should be largely asynchronous, to not//           impact progress of the streaming topology// 执行 checkoint source task chain(trigger task )是直接通过 triggerCheckpoint 来触发 checkpoint 的// 而非 source task chain 是通过 processBarrier 来触发 checkpoint 的checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);return true;}else {// we cannot perform our checkpoint - let the downstream operators know that they// should not wait for any input from this operator// we cannot broadcast the cancellation markers on the 'operator chain', because it may not// yet be createdfinal CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());Exception exception = null;for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : recordWriters) {try {//类似于 barrier 的另一种消息recordWriter.broadcastEvent(message);} catch (Exception e) {exception = ExceptionUtils.firstOrSuppressed(new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),exception);}}if (exception != null) {throw exception;}return false;}}}

除了首次出现 barrier 并广播 barrier 外,最重要的就是 checkpointState

private void checkpointState(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics) throws Exception {CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointMetaData.getCheckpointId(),checkpointOptions.getTargetLocation());CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this,checkpointMetaData,checkpointOptions,storage,checkpointMetrics);// 执行 checkpointcheckpointingOperation.executeCheckpointing();}
public void executeCheckpointing() throws Exception {startSyncPartNano = System.nanoTime();try {// 调用 StreamOperator 进行 snapshotState 的入口方法// 先 sourceOperator (flatMap -> source) 再 sinkOperator (sink -> filter)for (StreamOperator<?> op : allOperators) {//对每一个算子进行 snapshotInProgress 并存储至 operatorSnapshotsInProgress// (存储 是异步checkpoint的一个引用) 然后分别进行本地 checkpoint store and jobManager ack// 捕获 barrier 的过程其实就是处理 input 数据的过程,对应着 StreamInputProcessor.processInput() 方法checkpointStreamOperator(op);}if (LOG.isDebugEnabled()) {LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",checkpointMetaData.getCheckpointId(), owner.getName());}startAsyncPartNano = System.nanoTime();checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit//当一个 operator 保存完 checkpoint 数据后,就会启动一个异步对象 AsyncCheckpointRunnable,// 用以报告该检查点已完成,其具体逻辑在 reportCompletedSnapshotStates 中AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(owner,operatorSnapshotsInProgress,checkpointMetaData,checkpointMetrics,startAsyncPartNano);owner.cancelables.registerCloseable(asyncCheckpointRunnable);// 这里注册了一个 Runnable,在执行完 checkpoint 之后向 JobManager 发出 CompletedCheckPoint 消息, ack// 这也是 fault tolerant 两阶段提交的一部分,最后调用 jobMaster 的 acknowledgeCheckpointowner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);if (LOG.isDebugEnabled()) {LOG.debug("{} - finished synchronous part of checkpoint {}. " +"Alignment duration: {} ms, snapshot duration {} ms",owner.getName(), checkpointMetaData.getCheckpointId(),checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,checkpointMetrics.getSyncDurationMillis());}} catch (Exception ex) {// Cleanup to release resourcesfor (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {if (null != operatorSnapshotResult) {try {operatorSnapshotResult.cancel();} catch (Exception e) {LOG.warn("Could not properly cancel an operator snapshot result.", e);}}}if (LOG.isDebugEnabled()) {LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +"Alignment duration: {} ms, snapshot duration {} ms",owner.getName(), checkpointMetaData.getCheckpointId(),checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,checkpointMetrics.getSyncDurationMillis());}owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex);}}

jobMaster 的 acknowledgeCheckpoint 最终会调用 CheckpointCoordinator.receiveAcknowledgeMessage 方法。如果接受到的消息时 SUCCESS 的话,则会

/*** Try to complete the given pending checkpoint.** <p>Important: This method should only be called in the checkpoint lock scope.** @param pendingCheckpoint to complete* @throws CheckpointException if the completion failed*//*把 pendinCgCheckpoint 转换为 CompletedCheckpoint把 CompletedCheckpoint 加入已完成的检查点集合,并从未完成检查点集合删除该检查点再度向各个 operator 发出 rpc ,通知该检查点已完成*/private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {final long checkpointId = pendingCheckpoint.getCheckpointId();final CompletedCheckpoint completedCheckpoint;// As a first step to complete the checkpoint, we register its state with the registryMap<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();sharedStateRegistry.registerAll(operatorStates.values());try {try {completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();}catch (Exception e1) {// abort the current pending checkpoint if we fails to finalize the pending checkpoint.if (!pendingCheckpoint.isDiscarded()) {pendingCheckpoint.abortError(e1);}throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', e1);}// the pending checkpoint must be discarded after the finalizationPreconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);try {completedCheckpointStore.addCheckpoint(completedCheckpoint);} catch (Exception exception) {// we failed to store the completed checkpoint. Let's clean upexecutor.execute(new Runnable() {@Overridepublic void run() {try {completedCheckpoint.discardOnFailedStoring();} catch (Throwable t) {LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);}}});throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);}} finally {pendingCheckpoints.remove(checkpointId);triggerQueuedRequests();}rememberRecentCheckpointId(checkpointId);// drop those pending checkpoints that are at prior to the completed onedropSubsumedCheckpoints(checkpointId);// record the time when this was completed, to calculate// the 'min delay between checkpoints'lastCheckpointCompletionNanos = System.nanoTime();LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());if (LOG.isDebugEnabled()) {StringBuilder builder = new StringBuilder();builder.append("Checkpoint state: ");for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {builder.append(state);builder.append(", ");}// Remove last two chars ", "builder.setLength(builder.length() - 2);LOG.debug(builder.toString());}// send the "notify complete" call to all verticesfinal long timestamp = completedCheckpoint.getTimestamp();//也就是 confirm tasksfor (ExecutionVertex ev : tasksToCommitTo) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {//层层通知对应的算子对 checkpoint 已完成做出响应ee.notifyCheckpointComplete(checkpointId, timestamp);}}}

confirm tasks 层层确认,究竟是如何确认的呢?追踪至 task.notifyCheckpointComplete

@Overridepublic void notifyCheckpointComplete(final long checkpointID) {final AbstractInvokable invokable = this.invokable;if (executionState == ExecutionState.RUNNING && invokable != null) {Runnable runnable = new Runnable() {@Overridepublic void run() {try {// operator chain notify checkpoint complete 调用 StreamTask.notifyCheckpointCompleteinvokable.notifyCheckpointComplete(checkpointID);// operator chain notify checkpoint complete over taskStateManagertaskStateManager.notifyCheckpointComplete(checkpointID);} catch (Throwable t) {if (getExecutionState() == ExecutionState.RUNNING) {// fail task if checkpoint confirmation failed.failExternally(new RuntimeException("Error while confirming checkpoint",t));}}}};executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " +taskNameWithSubtask);} else {LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);}}

在往下,我们以 kafka 为例,具体可参考
Flink如何保存Offset

至此为此 source task ( trigger task ) 的 checkpoint 已经完成。

前面我们说了,整个流程中首次出现 barrier ,而 barrier 又可以看做是特殊的 msg,广播到下游之后会怎么样呢?具体可以参考
一文搞懂 Flink 处理 Barrier 全过程
我们可以知道 numBarriersReceived + numClosedChannels == totalNumberOfInputChannels 的时候,notifyCheckpoint(receivedBarrier);,最终又会调用 StreamTask.performCheckpoint方法。至此非 source task operator chain 已进行完 checkpoint,循环往复。

一文搞懂 checkpoint 全过程

前言

前面我们讲解了 一文搞懂 Flink 处理 Barrier 全过程 和 一文搞定 Flink Checkpoint Barrier 全流程 基本上都是跟 checkpoint 相关。这次我们就具体看一下 checkpoint 是如何发生的。

正文

跟 checkpoint 相关的起点在 buildGraph

@Deprecatedpublic static ExecutionGraph buildGraph(@Nullable ExecutionGraph prior,JobGraph jobGraph,Configuration jobManagerConfig,ScheduledExecutorService futureExecutor,Executor ioExecutor,SlotProvider slotProvider,ClassLoader classLoader,CheckpointRecoveryFactory recoveryFactory,Time rpcTimeout,RestartStrategy restartStrategy,MetricGroup metrics,int parallelismForAutoMax,BlobWriter blobWriter,Time allocationTimeout,Logger log)throws JobExecutionException, JobException {checkNotNull(jobGraph, "job graph cannot be null");final String jobName = jobGraph.getName();final JobID jobId = jobGraph.getJobID();final FailoverStrategy.Factory failoverStrategy =FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);final JobInformation jobInformation = new JobInformation(jobId,jobName,jobGraph.getSerializedExecutionConfig(),jobGraph.getJobConfiguration(),jobGraph.getUserJarBlobKeys(),jobGraph.getClasspaths());// create a new execution graph, if none exists so farfinal ExecutionGraph executionGraph;try {executionGraph = (prior != null) ? prior :new ExecutionGraph(jobInformation,futureExecutor,ioExecutor,rpcTimeout,restartStrategy,failoverStrategy,slotProvider,classLoader,blobWriter,allocationTimeout);} catch (IOException e) {throw new JobException("Could not create the ExecutionGraph.", e);}......// configure the state checkpointingJobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();if (snapshotSettings != null) {// 确定哪些 operator chain trigger checkpoint ,哪些 operator chain ack ,哪些 operator chain confirm// 用来 trigger checkpointList<ExecutionJobVertex> triggerVertices =idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);//用来 ack checkpointList<ExecutionJobVertex> ackVertices =idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);//用来 confirm checkpoint List<ExecutionJobVertex> confirmVertices =idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);CompletedCheckpointStore completedCheckpoints;CheckpointIDCounter checkpointIdCounter;try {int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);if (maxNumberOfCheckpointsToRetain <= 0) {// warning and use 1 as the default value if the setting in// state.checkpoints.max-retained-checkpoints is not greater than 0.log.warn("The setting for '{} : {}' is invalid. Using default value of {}",CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),maxNumberOfCheckpointsToRetain,CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());maxNumberOfCheckpointsToRetain = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();}// HA 会连接 zookeeper maxNumberOfCheckpointsToRetain 保持多少个 checkpoint 默认是一个completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);}catch (Exception e) {throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e);}// Maximum number of remembered checkpoints 默认是 10 个int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);// 用户 web 界面显示 checkpoint ack 情况CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(historySize,ackVertices,snapshotSettings.getCheckpointCoordinatorConfiguration(),metrics);// The default directory for externalized checkpointsString externalizedCheckpointsDir = jobManagerConfig.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);......final StateBackend rootBackend;try {// 在 builder executionGraph 确定 state backendrootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(applicationConfiguredBackend, jobManagerConfig, classLoader, log);}catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);}
......final CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();// 结合 checkpoint config,还有// triggerVertices、ackVertices、confirmVertices、state backend、checkpointStatsTracker// 会创建 CheckpointCoordinator 对象executionGraph.enableCheckpointing(chkConfig.getCheckpointInterval(),chkConfig.getCheckpointTimeout(),chkConfig.getMinPauseBetweenCheckpoints(),chkConfig.getMaxConcurrentCheckpoints(),chkConfig.getCheckpointRetentionPolicy(),triggerVertices,ackVertices,confirmVertices,hooks,checkpointIdCounter,completedCheckpoints,rootBackend,checkpointStatsTracker);}......return executionGraph;}

在 build graph 时确定了 triggerVertices ( 用来触发 chekcpoint,也是下面提到的 trigger tasks 往往是 source task operator chains ),ackVertices ( 用来接收 checkpoint 已经完成的报告,也是下面要提到的 ackTasks , 每个需要做 checkpoint 的 operator chain 都会属于它 )以及 confirmVertices ( 用来确认 checkpoint 已经完成, 每个需要做 checkpoint 的 operator chain 都需要 confirm ,这也算是 checkpoint 的二阶段提交了 )。
当 flink 提交 job 时,会启动 CheckpointCoordinator.startCheckpointScheduler 方法

// flink 在启动 job 时,会启 动这个方法public void startCheckpointScheduler() {synchronized (lock) {if (shutdown) {throw new IllegalArgumentException("Checkpoint coordinator is shut down");}// make sure all prior timers are cancelledstopCheckpointScheduler();periodicScheduling = true;long initialDelay = ThreadLocalRandom.current().nextLong(minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);// 定时任务currentPeriodicTrigger = timer.scheduleAtFixedRate(new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);}}

通过一个定时任务来执行 ScheduledTrigger

//触发 checkpointprivate final class ScheduledTrigger implements Runnable {@Overridepublic void run() {try {triggerCheckpoint(System.currentTimeMillis(), true);}catch (Exception e) {LOG.error("Exception while triggering checkpoint for job {}.", job, e);}}}

开始执行 trigger checkpoint

@VisibleForTesting//触发 checkpointpublic CheckpointTriggerResult triggerCheckpoint(long timestamp,CheckpointProperties props,@Nullable String externalSavepointLocation,boolean isPeriodic) {......// check if all tasks that we need to trigger are running.// if not, abort the checkpointExecution[] executions = new Execution[tasksToTrigger.length];for (int i = 0; i < tasksToTrigger.length; i++) {Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();if (ee == null) {LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",tasksToTrigger[i].getTaskNameWithSubtaskIndex(),job);return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);} else if (ee.getState() == ExecutionState.RUNNING) {executions[i] = ee;} else {LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",tasksToTrigger[i].getTaskNameWithSubtaskIndex(),job,ExecutionState.RUNNING,ee.getState());return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}// next, check if all tasks that need to acknowledge the checkpoint are running.// if not, abort the checkpointMap<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);for (ExecutionVertex ev : tasksToWaitFor) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {ackTasks.put(ee.getAttemptId(), ev);} else {LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",ev.getTaskNameWithSubtaskIndex(),job);return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}......//启动一个checkpoint,但还没有被确认,待所有 task 都确认了本次 checkpoint,那么这个 checkpoint 对象将转化为一个 CompleteCheckpointfinal PendingCheckpoint checkpoint = new PendingCheckpoint(job,checkpointID,timestamp,ackTasks, // 需要 ack checkpoint 的 tasksprops,checkpointStorageLocation,executor);if (statsTracker != null) {PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(checkpointID,timestamp,props);checkpoint.setStatsCallback(callback);}// schedule the timer that will clean up the expired checkpointsfinal Runnable canceller = () -> {synchronized (lock) {// only do the work if the checkpoint is not discarded anyways// note that checkpoint completion discards the pending checkpoint objectif (!checkpoint.isDiscarded()) {LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);checkpoint.abortExpired();pendingCheckpoints.remove(checkpointID);rememberRecentCheckpointId(checkpointID);triggerQueuedRequests();}}};try {// re-acquire the coordinator-wide locksynchronized (lock) {......// end of lock scopefinal CheckpointOptions checkpointOptions = new CheckpointOptions(props.getCheckpointType(),checkpointStorageLocation.getLocationReference());// send the messages to the tasks that trigger their checkpointfor (Execution execution: executions) {//trigger task (operator chain,在产生 ExecutionGraph 是确定的 )// 调用 TaskExecutor.triggerCheckpoint 最终调用 task.triggerCheckpointBarrier// source ->flatMapexecution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}numUnsuccessfulCheckpointsTriggers.set(0);return new CheckpointTriggerResult(checkpoint);}......} // end trigger lock}

这里有 trigger task 触发 checkpoint 。追踪至 task.triggerCheckpoint

@Override// trigger operator chain task trigger checkpointpublic CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID,long checkpointId,long checkpointTimestamp,CheckpointOptions checkpointOptions) {log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);final Task task = taskSlotTable.getTask(executionAttemptID);if (task != null) {task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);return CompletableFuturepletedFuture(Acknowledge.get());} else {final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';log.debug(message);return FutureUtilspletedExceptionally(new CheckpointException(message));}}

然后就到 triggerCheckpointBarrier 方法了

// trigger operator chain trigger checkpoint  最终触发 triggerCheckpointBarrierpublic void triggerCheckpointBarrier(final long checkpointID,long checkpointTimestamp,final CheckpointOptions checkpointOptions) {//实际上就是 StreamTask  Task类实际上是将 checkpoint 委托给了具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码// source ->flatMap// invokable 实际上是 operator chainfinal AbstractInvokable invokable = this.invokable;final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);if (executionState == ExecutionState.RUNNING && invokable != null) {// build a local closurefinal String taskName = taskNameWithSubtask;final SafetyNetCloseableRegistry safetyNetCloseableRegistry =FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();Runnable runnable = new Runnable() {@Overridepublic void run() {// set safety net from the task's context for checkpointing threadLOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);try {// invokable 事实上就是 StreamTask Task 类实际上是将 checkpoint 委托给了更具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);if (!success) {checkpointResponder.declineCheckpoint(getJobID(), getExecutionId(), checkpointID,new CheckpointDeclineTaskNotReadyException(taskName));}} catch (Throwable t) {if (getExecutionState() == ExecutionState.RUNNING) {failExternally(new Exception("Error while triggering checkpoint " + checkpointID + " for " +taskNameWithSubtask, t));} else {LOG.debug("Encountered error while triggering checkpoint {} for " +"{} ({}) while being not in state running.", checkpointID,taskNameWithSubtask, executionId, t);}} finally {FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);}}};executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));} else {LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);// send back a message that we did not do the checkpointcheckpointResponder.declineCheckpoint(jobId, executionId, checkpointID,new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));}}

由 invokable 调用 triggerCheckpoint。由于 trigger task 都是 source operator chain 所以进入 sourceStreamTask

@Overridepublic boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {if (!externallyInducedCheckpoints) {return super.triggerCheckpoint(checkpointMetaData, checkpointOptions);}else {// we do not trigger checkpoints here, we simply state whether we can trigger themsynchronized (getCheckpointLock()) {return isRunning();}}}

具体跟踪到 StreamTask

// trigger opator chain 一路调用到这里,开始出现 barrier (实际上是定时任务 checkpoint 产生的)private boolean performCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics) throws Exception {LOG.debug("Starting checkpoint ({}) {} on task {}",checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());synchronized (lock) {if (isRunning) {// we can do a checkpoint// All of the following steps happen as an atomic step from the perspective of barriers and// records/watermarks/timers/callbacks.// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream// checkpoint alignments// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.//           The pre-barrier work should be nothing or minimal in the common case.//注意,从这里开始,整个执行链路上开始出现BarrieroperatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());// Step (2): Send the checkpoint barrier downstream/*反压时,此处会阻塞 source chain do checkpoint,因为会申请内存发送 barrier 到下游,下游的 operator 接收到本 barrier 就会触发其自身的 checkpoint*/operatorChain.broadcastCheckpointBarrier(checkpointMetaData.getCheckpointId(),checkpointMetaData.getTimestamp(),checkpointOptions);// Step (3): Take the state snapshot. This should be largely asynchronous, to not//           impact progress of the streaming topology// 执行 checkoint source task chain(trigger task )是直接通过 triggerCheckpoint 来触发 checkpoint 的// 而非 source task chain 是通过 processBarrier 来触发 checkpoint 的checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);return true;}else {// we cannot perform our checkpoint - let the downstream operators know that they// should not wait for any input from this operator// we cannot broadcast the cancellation markers on the 'operator chain', because it may not// yet be createdfinal CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());Exception exception = null;for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : recordWriters) {try {//类似于 barrier 的另一种消息recordWriter.broadcastEvent(message);} catch (Exception e) {exception = ExceptionUtils.firstOrSuppressed(new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),exception);}}if (exception != null) {throw exception;}return false;}}}

除了首次出现 barrier 并广播 barrier 外,最重要的就是 checkpointState

private void checkpointState(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics) throws Exception {CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointMetaData.getCheckpointId(),checkpointOptions.getTargetLocation());CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this,checkpointMetaData,checkpointOptions,storage,checkpointMetrics);// 执行 checkpointcheckpointingOperation.executeCheckpointing();}
public void executeCheckpointing() throws Exception {startSyncPartNano = System.nanoTime();try {// 调用 StreamOperator 进行 snapshotState 的入口方法// 先 sourceOperator (flatMap -> source) 再 sinkOperator (sink -> filter)for (StreamOperator<?> op : allOperators) {//对每一个算子进行 snapshotInProgress 并存储至 operatorSnapshotsInProgress// (存储 是异步checkpoint的一个引用) 然后分别进行本地 checkpoint store and jobManager ack// 捕获 barrier 的过程其实就是处理 input 数据的过程,对应着 StreamInputProcessor.processInput() 方法checkpointStreamOperator(op);}if (LOG.isDebugEnabled()) {LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",checkpointMetaData.getCheckpointId(), owner.getName());}startAsyncPartNano = System.nanoTime();checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit//当一个 operator 保存完 checkpoint 数据后,就会启动一个异步对象 AsyncCheckpointRunnable,// 用以报告该检查点已完成,其具体逻辑在 reportCompletedSnapshotStates 中AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(owner,operatorSnapshotsInProgress,checkpointMetaData,checkpointMetrics,startAsyncPartNano);owner.cancelables.registerCloseable(asyncCheckpointRunnable);// 这里注册了一个 Runnable,在执行完 checkpoint 之后向 JobManager 发出 CompletedCheckPoint 消息, ack// 这也是 fault tolerant 两阶段提交的一部分,最后调用 jobMaster 的 acknowledgeCheckpointowner.asyncOperationsThreadPool.execute(asyncCheckpointRunnable);if (LOG.isDebugEnabled()) {LOG.debug("{} - finished synchronous part of checkpoint {}. " +"Alignment duration: {} ms, snapshot duration {} ms",owner.getName(), checkpointMetaData.getCheckpointId(),checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,checkpointMetrics.getSyncDurationMillis());}} catch (Exception ex) {// Cleanup to release resourcesfor (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {if (null != operatorSnapshotResult) {try {operatorSnapshotResult.cancel();} catch (Exception e) {LOG.warn("Could not properly cancel an operator snapshot result.", e);}}}if (LOG.isDebugEnabled()) {LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +"Alignment duration: {} ms, snapshot duration {} ms",owner.getName(), checkpointMetaData.getCheckpointId(),checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,checkpointMetrics.getSyncDurationMillis());}owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex);}}

jobMaster 的 acknowledgeCheckpoint 最终会调用 CheckpointCoordinator.receiveAcknowledgeMessage 方法。如果接受到的消息时 SUCCESS 的话,则会

/*** Try to complete the given pending checkpoint.** <p>Important: This method should only be called in the checkpoint lock scope.** @param pendingCheckpoint to complete* @throws CheckpointException if the completion failed*//*把 pendinCgCheckpoint 转换为 CompletedCheckpoint把 CompletedCheckpoint 加入已完成的检查点集合,并从未完成检查点集合删除该检查点再度向各个 operator 发出 rpc ,通知该检查点已完成*/private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException {final long checkpointId = pendingCheckpoint.getCheckpointId();final CompletedCheckpoint completedCheckpoint;// As a first step to complete the checkpoint, we register its state with the registryMap<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();sharedStateRegistry.registerAll(operatorStates.values());try {try {completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();}catch (Exception e1) {// abort the current pending checkpoint if we fails to finalize the pending checkpoint.if (!pendingCheckpoint.isDiscarded()) {pendingCheckpoint.abortError(e1);}throw new CheckpointException("Could not finalize the pending checkpoint " + checkpointId + '.', e1);}// the pending checkpoint must be discarded after the finalizationPreconditions.checkState(pendingCheckpoint.isDiscarded() && completedCheckpoint != null);try {completedCheckpointStore.addCheckpoint(completedCheckpoint);} catch (Exception exception) {// we failed to store the completed checkpoint. Let's clean upexecutor.execute(new Runnable() {@Overridepublic void run() {try {completedCheckpoint.discardOnFailedStoring();} catch (Throwable t) {LOG.warn("Could not properly discard completed checkpoint {}.", completedCheckpoint.getCheckpointID(), t);}}});throw new CheckpointException("Could not complete the pending checkpoint " + checkpointId + '.', exception);}} finally {pendingCheckpoints.remove(checkpointId);triggerQueuedRequests();}rememberRecentCheckpointId(checkpointId);// drop those pending checkpoints that are at prior to the completed onedropSubsumedCheckpoints(checkpointId);// record the time when this was completed, to calculate// the 'min delay between checkpoints'lastCheckpointCompletionNanos = System.nanoTime();LOG.info("Completed checkpoint {} for job {} ({} bytes in {} ms).", checkpointId, job,completedCheckpoint.getStateSize(), completedCheckpoint.getDuration());if (LOG.isDebugEnabled()) {StringBuilder builder = new StringBuilder();builder.append("Checkpoint state: ");for (OperatorState state : completedCheckpoint.getOperatorStates().values()) {builder.append(state);builder.append(", ");}// Remove last two chars ", "builder.setLength(builder.length() - 2);LOG.debug(builder.toString());}// send the "notify complete" call to all verticesfinal long timestamp = completedCheckpoint.getTimestamp();//也就是 confirm tasksfor (ExecutionVertex ev : tasksToCommitTo) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {//层层通知对应的算子对 checkpoint 已完成做出响应ee.notifyCheckpointComplete(checkpointId, timestamp);}}}

confirm tasks 层层确认,究竟是如何确认的呢?追踪至 task.notifyCheckpointComplete

@Overridepublic void notifyCheckpointComplete(final long checkpointID) {final AbstractInvokable invokable = this.invokable;if (executionState == ExecutionState.RUNNING && invokable != null) {Runnable runnable = new Runnable() {@Overridepublic void run() {try {// operator chain notify checkpoint complete 调用 StreamTask.notifyCheckpointCompleteinvokable.notifyCheckpointComplete(checkpointID);// operator chain notify checkpoint complete over taskStateManagertaskStateManager.notifyCheckpointComplete(checkpointID);} catch (Throwable t) {if (getExecutionState() == ExecutionState.RUNNING) {// fail task if checkpoint confirmation failed.failExternally(new RuntimeException("Error while confirming checkpoint",t));}}}};executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " +taskNameWithSubtask);} else {LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);}}

在往下,我们以 kafka 为例,具体可参考
Flink如何保存Offset

至此为此 source task ( trigger task ) 的 checkpoint 已经完成。

前面我们说了,整个流程中首次出现 barrier ,而 barrier 又可以看做是特殊的 msg,广播到下游之后会怎么样呢?具体可以参考
一文搞懂 Flink 处理 Barrier 全过程
我们可以知道 numBarriersReceived + numClosedChannels == totalNumberOfInputChannels 的时候,notifyCheckpoint(receivedBarrier);,最终又会调用 StreamTask.performCheckpoint方法。至此非 source task operator chain 已进行完 checkpoint,循环往复。

本文标签: 一文搞懂 checkpoint 全过程