admin管理员组文章数量:1026989
Flink Checkpoint源码浅析
1. JobManager 端checkpoint调度
dispatcher分发任务后会启动相应的jobMaster, 在创建jobMaster 构建过程中会执行jobGraph -> executeGraph的转换,源码如下:
// JobMaster类
public JobMaster(RpcService rpcService,JobMasterConfiguration jobMasterConfiguration,...)throws Exception {...this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);this.schedulerNG = createScheduler(executionDeploymentTracker, jobManagerJobMetricGroup);this.jobStatusListener = null;...
}
// SchedulerBase类
public SchedulerBase(final Logger log,final JobGraph jobGraph,final BackPressureStatsTracker backPressureStatsTracker,...)throws Exception {...this.executionGraph =createAndRestoreExecutionGraph(jobManagerJobMetricGroup,checkNotNull(shuffleMaster),checkNotNull(partitionTracker),checkNotNull(executionDeploymentTracker),initializationTimestamp);...
}
private ExecutionGraph createExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup,ShuffleMaster<?> shuffleMaster,final JobMasterPartitionTracker partitionTracker,ExecutionDeploymentTracker executionDeploymentTracker,long initializationTimestamp)throws JobExecutionException, JobException {...return ExecutionGraphBuilder.buildGraph(null,jobGraph,jobMasterConfiguration,...);...}
createAndRestoreExecutionGraph()方法调用了createExecutionGraph()方法最终使用ExecutionGraphBuilder进行了ExecuteGraph的生成。
在构建ExecutionGraph过程中(ExecutionGraphBuilder.buildGraph()方法),会调用ExecutionGraph.enableCheckpointing()方法,这个方法不管任务里有没有设置checkpoint都会调用的。在enableCheckpointing()方法里会创建CheckpointCoordinator,这是负责checkpoint的核心实现类,同时会给job添加一个监听器CheckpointCoordinatorDeActivator(只有设置了checkpoint才会注册这个监听器),CheckpointCoordinatorDeActivator负责checkpoint的启动和停止。源码如下:
// ExecutionGraphBuilder类
public static ExecutionGraph buildGraph(@Nullable ExecutionGraph prior,JobGraph jobGraph,Configuration jobManagerConfig,...)throws JobExecutionException, JobException {...// configure the state checkpointingJobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();if (snapshotSettings != null) {List<ExecutionJobVertex> triggerVertices =idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
List<ExecutionJobVertex> ackVertices =idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
List<ExecutionJobVertex> confirmVertices =idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);// 一系列的checkpoint设置,包括statebackend, user-define hook, checkpointIdCounter等...executionGraph.enableCheckpointing(chkConfig,triggerVertices,ackVertices,confirmVertices,hooks,checkpointIdCounter,completedCheckpoints,rootBackend,checkpointStatsTracker);...
}
在 build graph 时确定了 triggerVertices ( 用来触发 chekcpoint),ackVertices ( 用来接收 checkpoint 已经完成的报告 )以及 confirmVertices ( 用来确认 checkpoint 已经完成 )。
executionGraph.enableCheckpointing()中做了一些checkpoint相关类的初始化操作,以及checkpoint状态监听器的注册。在JobManager端开始进行任务调度的时候,会对job的状态进行转换,由CREATED转成RUNNING,实现在transitionState()方法中,在这个过程中刚才设置的job监听器CheckpointCoordinatorDeActivator就开始启动checkpoint的定时任务了,调用链为ExecutionGraph.transitionToRunning() -> transitionState() -> notifyJobStatusChange() -> CheckpointCoordinatorDeActivator.jobStatusChanges() -> CheckpointCoordinator.startCheckpointScheduler()源码如下:
public void transitionToRunning() {if (!transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);}
}
private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {...if (state == current) {notifyJobStatusChange(newState, error);return true;}...
}
private void notifyJobStatusChange(JobStatus newState, Throwable error) {if (jobStatusListeners.size() > 0) {final long timestamp = System.currentTimeMillis();final Throwable serializedError = error == null ? null : new SerializedThrowable(error);
for (JobStatusListener listener : jobStatusListeners) {try {listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);} catch (Throwable t) {LOG.warn("Error while notifying JobStatusListener", t);}}}
}
// CheckpointCoordinatorDeActivator类
@Overridepublic void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {if (newJobStatus == JobStatus.RUNNING) {// start the checkpoint schedulercoordinator.startCheckpointScheduler();} else {// anything else should stop the trigger for nowcoordinator.stopCheckpointScheduler();}}
CheckpointCoordinator会部署一个定时任务,用于周期性的触发checkpoint,这个定时任务就是ScheduledTrigger类。
public void startCheckpointScheduler() {synchronized (lock) {if (shutdown) {throw new IllegalArgumentException("Checkpoint coordinator is shut down");}
// make sure all prior timers are cancelledstopCheckpointScheduler();
periodicScheduling = true;currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay());}
}
private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {return timer.scheduleAtFixedRate(new ScheduledTrigger(), initDelay, baseInterval, TimeUnit.MILLISECONDS);
}
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointProperties props,@Nullable String externalSavepointLocation,boolean isPeriodic) {
if (props.getCheckpointType().getPostCheckpointAction() == PostCheckpointAction.TERMINATE&& !(props.isSynchronous() && props.isSavepoint())) {return FutureUtilspletedExceptionally(new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX."));}
CheckpointTriggerRequest request =new CheckpointTriggerRequest(props, externalSavepointLocation, isPeriodic);// 首先做一些前置校验,看是否能触发checkpoint,主要就是检查最大并发checkpoint数,checkpoint间隔时间// 在积压(如果有)的checkpoint中选一个进行处理chooseRequestToExecute(request).ifPresent(this::startTriggeringCheckpoint);return request.onCompletionPromise;
}
private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {try {synchronized (lock) {preCheckGlobalState(request.isPeriodic);}// 找出需要发送checkpoint消息的task(即tasksToTrigger,由生成JobGraph时生成,由所有不包含输入的顶点组成)放入executions// Check if all tasks that we need to trigger are running. If not, abort the checkpoint.final Execution[] executions = getTriggerExecutions();// 找出需要返回checkpoint的ack反馈信息的task放入ackTasks,并将其作为构造PendingCheckpoint的参数// Check if all tasks that need to acknowledge the checkpoint are running. If not, abort the checkpointfinal Map<ExecutionAttemptID, ExecutionVertex> ackTasks = getAckTasks();// 创建PendingCheckpoint, 用户自定义hook触发...... // no exception, no discarding, everything is OKfinal long checkpointId =checkpoint.getCheckpointId();snapshotTaskState(timestamp,checkpointId,checkpoint.getCheckpointStorageLocation(),request.props,executions);coordinatorsToCheckpoint.forEach((ctx) ->ctx.afterSourceBarrierInjection(checkpointId));...} catch (Throwable throwable) {onTriggerFailure(request, throwable);}
}
private void snapshotTaskState(long timestamp,long checkpointID,CheckpointStorageLocation checkpointStorageLocation,CheckpointProperties props,Execution[] executions) {...// send the messages to the tasks that trigger their checkpointfor (Execution execution : executions) {// 两者底层调用的是同一个方法,只有语义上的区别if (props.isSynchronous()) {execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions);} else {execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}}
}
Execution.triggerCheckpoint()就是远程调用TaskManager的triggerCheckpoint()方法:
private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {...final LogicalSlot slot = assignedResource;
if (slot != null) {final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);} else {LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");}
}
2. SourceStreamTask的Checkpoint执行
TaskManager的triggerCheckpoint()方法首先获取到source task(即SourceStreamTask),调用Task.triggerCheckpointBarrier(),triggerCheckpointBarrier()会异步的去执行一个独立线程,这个线程来负责source task的checkpoint执行。
// TaskExecutor类
public CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID,long checkpointId,long checkpointTimestamp,CheckpointOptions checkpointOptions) {log.debug("Trigger checkpoint {}@{} for {}.",checkpointId,checkpointTimestamp,executionAttemptID);...final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
return CompletableFuturepletedFuture(Acknowledge.get());} else {...}
}
// Task类
public void triggerCheckpointBarrier(final long checkpointID,final long checkpointTimestamp,final CheckpointOptions checkpointOptions) {...try {// invokable 事实上就是 StreamTask 类,而 StreamTask 也将委托给更具体的类,直到业务代码invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);}...
}
由 invokable 调用 triggerCheckpoint。由于 trigger task 都是 source operator chain 所以进入 SourceStreamTask:
public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {if (!externallyInducedCheckpoints) {return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);} else {// we do not trigger checkpoints here, we simply state whether we can trigger themsynchronized (lock) {return CompletableFuturepletedFuture(isRunning());}}
}
checkpoint的核心实现在StreamTask.performCheckpoint()方法中,该方法主要有三个步骤
1、在checkpoint之前做一些准备工作,通常情况下operator在这个阶段是不做什么操作的
2、立即向下游广播CheckpointBarrier,以便使下游的task能够及时的接收到CheckpointBarrier也开始进行checkpoint的操作
3、开始进行状态的快照,即checkpoint操作。
在进行performCheckpoint()时,task任务线程是不能够进行数据处理的, checkpoint和任务处理使用的是同一把锁:
public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {...triggerCheckpointAsyncInMailbox(checkpointMetaData, checkpointOptions));...
}private boolean triggerCheckpointAsyncInMailbox(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)throws Exception {...subtaskCheckpointCoordinator.initCheckpoint(checkpointMetaData.getCheckpointId(), checkpointOptions);boolean success =performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);if (!success) {declineCheckpoint(checkpointMetaData.getCheckpointId());}return success;...
} // SubtaskCheckpointCoordinatorImpl类
public void checkpointState(CheckpointMetaData metadata,CheckpointOptions options,CheckpointMetricsBuilder metrics,OperatorChain<?, ?> operatorChain,Supplier<Boolean> isRunning)throws Exception {//校验checkpoint是否需要终止// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint// if necessary.lastCheckpointId = metadata.getCheckpointId();if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {// broadcast cancel checkpoint marker to avoid downstream back-pressure due to// checkpoint barrier align.operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.",metadata.getCheckpointId());return;}// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.// The pre-barrier work should be nothing or minimal in the common case.// 一般无逻辑operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());// Step (2): Send the checkpoint barrier downstream// 封装优先级buffer后add到ResultSubpartition的PrioritizedDeque队列中,更新buffer和backlog数// 当notifyDataAvailable=true时 通知下游消费// 下游CheckpointedInputGate拿到buffer后匹配到是checkpoint事件做出相应动作operatorChain.broadcastEvent(new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),options.isUnalignedCheckpoint());// Step (3): Prepare to spill the in-flight buffers for input and output// 对齐直接跳过if (options.isUnalignedCheckpoint()) {// output data already written while broadcasting eventchannelStateWriter.finishOutput(metadata.getCheckpointId());}// Step (4): Take the state snapshot. This should be largely asynchronous, to not impact// progress of the// streaming topologyMap<OperatorID, OperatorSnapshotFutures> snapshotFutures =new HashMap<>(operatorChain.getNumberOfOperators());try {// takeSnapshotSync 执行checkpoint核心逻辑的入口if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) {// finishAndReportAsync 完成snapshot后,向jobMaster发送报告finishAndReportAsync(snapshotFutures, metadata, metrics, isRunning);} else {cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));}} catch (Exception ex) {cleanup(snapshotFutures, metadata, metrics, ex);throw ex;}
}private boolean takeSnapshotSync(Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress,CheckpointMetaData checkpointMetaData,CheckpointMetricsBuilder checkpointMetrics,CheckpointOptions checkpointOptions,OperatorChain<?, ?> operatorChain,Supplier<Boolean> isRunning)throws Exception {...// 存储checkpoint的位置(Memory/FS/RockDB)CheckpointStreamFactory storage =checkpointStorage.resolveCheckpointStorageLocation(checkpointId, checkpointOptions.getTargetLocation());try {for (StreamOperatorWrapper<?, ?> operatorWrapper :operatorChain.getAllOperators(true)) {if (!operatorWrapper.isClosed()) {operatorSnapshotsInProgress.put(operatorWrapper.getStreamOperator().getOperatorID(),// 执行checkpoint入口buildOperatorSnapshotFutures(checkpointMetaData,checkpointOptions,operatorChain,operatorWrapper.getStreamOperator(),isRunning,channelStateWriteResult,storage));}}} finally {checkpointStorage.clearCacheFor(checkpointId);}...
}//StreamOperatorStateHandler类
void snapshotState(CheckpointedStreamOperator streamOperator,Optional<InternalTimeServiceManager<?>> timeServiceManager,String operatorName,long checkpointId,long timestamp,CheckpointOptions checkpointOptions,CheckpointStreamFactory factory,OperatorSnapshotFutures snapshotInProgress,StateSnapshotContextSynchronousImpl snapshotContext,boolean isUsingCustomRawKeyedState)throws CheckpointException {try {...//执行需要持久化state的操作//比如map操作,它生成的是StreamMap属于AbstractUdfStreamOperator子类,里面封装了snapshotState逻辑,如果没实现ck接口就跳过此步骤streamOperator.snapshotState(snapshotContext);snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());if (null != operatorStateBackend) {snapshotInProgress.setOperatorStateManagedFuture(operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}if (null != keyedStateBackend) {snapshotInProgress.setKeyedStateManagedFuture(keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}} ...
}
如果用户实现了Checkpoint接口则会持久化到指定的stateBackend中反之略过...
这里以AbstractUdfStreamOperator为例(map,filter等Operator都继承了该abstract类):
// AbstractUdfStreamOperator类
public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);//判断userFunction是否属于CheckpointedFunction或者ListCheckpointed的实例//如果是则调用用户实现的snapshotState执行相关逻辑//比如FlinkKafkaConsumerBase则自己实现了CheckpointedFunction的接口StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
}// StreamingFunctionUtils类
public static void snapshotFunctionState(StateSnapshotContext context, OperatorStateBackend backend, Function userFunction)throws Exception {Preconditions.checkNotNull(context);Preconditions.checkNotNull(backend);while (true) {// 校验用户是否有自定义checkpoint逻辑并执行用户自定义逻辑if (trySnapshotFunctionState(context, backend, userFunction)) {break;}// inspect if the user function is wrapped, then unwrap and try again if we can snapshot// the inner functionif (userFunction instanceof WrappingFunction) {userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();} else {break;}}
}private static boolean trySnapshotFunctionState(StateSnapshotContext context, OperatorStateBackend backend, Function userFunction)throws Exception {// 判断用户是否实现CheckpointedFunction接口if (userFunction instanceof CheckpointedFunction) {// 执行用户自定义的snapshot逻辑((CheckpointedFunction) userFunction).snapshotState(context);return true;}if (userFunction instanceof ListCheckpointed) {// ListCheckpointed已废弃不再多说...}return false;
}
3. Task上报checkpoint信息
整个快照生成完毕,最后Flink会调用finishAndReportAsync向Master发送完成报告:
private void finishAndReportAsync(Map<OperatorID, OperatorSnapshotFutures> snapshotFutures,CheckpointMetaData metadata,CheckpointMetricsBuilder metrics,Supplier<Boolean> isRunning) {// we are transferring ownership over snapshotInProgressList for cleanup to the thread,// active on submitasyncOperationsThreadPool.execute(new AsyncCheckpointRunnable(snapshotFutures,metadata,metrics,System.nanoTime(),taskName,registerConsumer(),unregisterConsumer(),env,asyncExceptionHandler,isRunning));
}
最终会调用到CheckpointCoordinator.receiveAcknowledgeMessage()方法:
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo)throws CheckpointException {...synchronized (lock) {// we need to check inside the lock for being shutdown as well, otherwise we// get races and invalid error log messagesif (shutdown) {return false;}final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);if (checkpoint != null && !checkpoint.isDisposed()) {switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(),message.getSubtaskState(),message.getCheckpointMetrics())) {case SUCCESS:LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {} at {}.",checkpointId,message.getTaskExecutionId(),message.getJob(),taskManagerLocationInfo);if (checkpoint.isFullyAcknowledged()) {completePendingCheckpoint(checkpoint);}break;...}...}...}
}
JobManager完成所有task的ack之后,会做以下操作:
1、将PendingCheckpoint 转成CompletedCheckpoint,标志着checkpoint过程完成,CompletedCheckpoint里包含了checkpoint的元数据信息,包括checkpoint的路径地址,状态数据大小等等,同时也会将元数据信息进行持久化,也会把过期的checkpoint数据给删除
2、通知所有的task进行commit操作。
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint){...// 生成CompeletePointcompletedCheckpoint =pendingCheckpoint.finalizeCheckpoint(checkpointsCleaner, this::scheduleTriggerRequest, executor);// 持久化checkpoint到state backendcompletedCheckpointStore.addCheckpoint(completedCheckpoint, checkpointsCleaner, this::scheduleTriggerRequest);...// 通知taskcheckpoint完成// send the "notify complete" call to all vertices, coordinators, etc.sendAcknowledgeMessages(checkpointId, completedCheckpoint.getTimestamp());
}private void sendAcknowledgeMessages(long checkpointId, long timestamp) {// commit tasksfor (ExecutionVertex ev : tasksToCommitTo) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {ee.notifyCheckpointComplete(checkpointId, timestamp);}}// commit coordinatorsfor (OperatorCoordinatorCheckpointContext coordinatorContext : coordinatorsToCheckpoint) {coordinatorContext.notifyCheckpointComplete(checkpointId);}
}
4. JobManager通知Task进行commit
task在接收到消息之后会调用Task.notifyCheckpointComplete()方法,最后会调用StreamOperator.notifyCheckpointComplete(),一般来说不做什么操作。但是像AbstractUdfStreamOperator这种的可能还会由一些其他操作:
// TaskExecutor类
public CompletableFuture<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {...final Task task = taskSlotTable.getTask(executionAttemptID);if (task != null) {task.notifyCheckpointComplete(checkpointId);return CompletableFuturepletedFuture(Acknowledge.get());} ...
}// Task类
public void notifyCheckpointComplete(final long checkpointID) {// invokable 事实上就是 StreamTask 类,而 StreamTask 也将委托给更具体的类,直到业务代码final AbstractInvokable invokable = this.invokable;if (executionState == ExecutionState.RUNNING && invokable != null) {try {invokable.notifyCheckpointCompleteAsync(checkpointID);}...}
}
AbstractUdfStreamOperator主要是针对用户自定义函数的operator,像StreamMap,StreamSource等等,如果用户定义的Function实现了CheckpointListener接口,则会进行额外的一些处理,例如FlinkKafkaConsumerBase会向kafka提交消费的offset,TwoPhaseCommitSinkFunction类会进行事务的提交,例如FlinkKafkaProducer(此处有个注意点,在设置为exactly once后,kafka数据的提交依赖checkpoint的完成,如果kafkaconsumer的隔离等级设为read_committed,只有等到checkpoint完成后才能消费到数据,消费数据会有0-checkpoint_interval的延迟)。
5. 非SourceStreamTask的checkpoint实现
上述是source task的checkpoint实现,source task的checkpoint是由JobManager来触发的,source task会向下游广播发送CheckpointBarrier,那么下游的task就会接收到source task发送的CheckpointBarrier,checkpoint的起始位置也在接收到CheckpointBarrier。非SourceTask一直通过循环从上游读取消息,当接收一条消息后,会对消息类型进行判断,如果是CheckpointBarrier类型的消息则会进一步判断是需要对齐或是进行checkpoint。该逻辑在 CheckpointInputGate#pollNext()
方法中进行:
public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {Optional<BufferOrEvent> next = inputGate.pollNext();if (!next.isPresent()) {return handleEmptyBuffer();}BufferOrEvent bufferOrEvent = next.get();if (bufferOrEvent.isEvent()) {return handleEvent(bufferOrEvent);} else if (bufferOrEvent.isBuffer()) {barrierHandler.addProcessedBytes(bufferOrEvent.getBuffer().getSize());}return next;
}private Optional<BufferOrEvent> handleEvent(BufferOrEvent bufferOrEvent)throws IOException, InterruptedException {Class<? extends AbstractEvent> eventClass = bufferOrEvent.getEvent().getClass();if (eventClass == CheckpointBarrier.class) {CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();// 处理barrierbarrierHandler.processBarrier(checkpointBarrier, bufferOrEvent.getChannelInfo());}...
}// SingleCheckpointBarrierHandler类 barrier对齐(CheckpointBarrierTracker类是 at last once)
public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo)throws IOException {long barrierId = barrier.getId();LOG.debug("{}: Received barrier from channel {} @ {}.", taskName, channelInfo, barrierId);// barrier滞后,已经在处理新checkpoint或者checkpoint已经超时置为非pending状态,丢弃原有的,释放通道消费if (currentCheckpointId > barrierId|| (currentCheckpointId == barrierId && !isCheckpointPending())) {controller.obsoleteBarrierReceived(channelInfo, barrier);return;}// 当前checkpoint滞后,收到新checkpoint的barrier,开始新的checkpointif (currentCheckpointId < barrierId) {if (isCheckpointPending()) {// cancel 旧的checkpointcancelSubsumedCheckpoint(barrierId);}if (getNumOpenChannels() == 1) {// 如果上游通道数只有一个,直接触发checkpointmarkAlignmentStartAndEnd(barrierId, barrier.getTimestamp());} else {// 上游有多个通道,开始对齐markAlignmentStart(barrierId, barrier.getTimestamp());}currentCheckpointId = barrierId;numBarriersReceived = 0;allBarriersReceivedFuture = new CompletableFuture<>();try {// 首次收到barrier处理if (controller.preProcessFirstBarrier(channelInfo, barrier)) {LOG.debug("{}: Triggering checkpoint {} on the first barrier at {}.",taskName,barrier.getId(),barrier.getTimestamp());notifyCheckpoint(barrier);}} catch (CheckpointException e) {abortInternal(barrier.getId(), e);return;}}// 接收barrier并阻塞相应的channelcontroller.barrierReceived(channelInfo, barrier);if (currentCheckpointId == barrierId) {if (++numBarriersReceived == numOpenChannels) {if (getNumOpenChannels() > 1) {markAlignmentEnd();}numBarriersReceived = 0;// 所有barrier均已到达,处理最后一个barrierif (controller.postProcessLastBarrier(channelInfo, barrier)) {LOG.debug("{}: Triggering checkpoint {} on the last barrier at {}.",taskName,barrier.getId(),barrier.getTimestamp());// 开始checkpoint,实际还是调用之前的StreamTask.performCheckpoint()方法,后续跟以上source checkpoint一致notifyCheckpoint(barrier);}allBarriersReceivedFutureplete(null);}}
}
总结: 总的来说checkpoint是通过job状态的变更来启动,接下来找到source task 进行ck,同时将barrier发送到下游算子通知他们开始自己的ck,算子完成后进行回调通知。过称还算比较清晰,细节没有细抠。以上仅仅对checkpoint的barrier过程做了一次简单的分析,具体的状态持久化过程没有涉及,也没有对非对齐barrier做解析,有兴趣的可以自己看看,后续可能会有相应的文章
参考文章:
Flink源码分析——Checkpoint源码分析(一) - 知乎
Flink1.12源码解读——Checkpoint详细执行过程_按时吃早饭ABC的博客-CSDN博客
Flink Checkpoint源码浅析
1. JobManager 端checkpoint调度
dispatcher分发任务后会启动相应的jobMaster, 在创建jobMaster 构建过程中会执行jobGraph -> executeGraph的转换,源码如下:
// JobMaster类
public JobMaster(RpcService rpcService,JobMasterConfiguration jobMasterConfiguration,...)throws Exception {...this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);this.schedulerNG = createScheduler(executionDeploymentTracker, jobManagerJobMetricGroup);this.jobStatusListener = null;...
}
// SchedulerBase类
public SchedulerBase(final Logger log,final JobGraph jobGraph,final BackPressureStatsTracker backPressureStatsTracker,...)throws Exception {...this.executionGraph =createAndRestoreExecutionGraph(jobManagerJobMetricGroup,checkNotNull(shuffleMaster),checkNotNull(partitionTracker),checkNotNull(executionDeploymentTracker),initializationTimestamp);...
}
private ExecutionGraph createExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup,ShuffleMaster<?> shuffleMaster,final JobMasterPartitionTracker partitionTracker,ExecutionDeploymentTracker executionDeploymentTracker,long initializationTimestamp)throws JobExecutionException, JobException {...return ExecutionGraphBuilder.buildGraph(null,jobGraph,jobMasterConfiguration,...);...}
createAndRestoreExecutionGraph()方法调用了createExecutionGraph()方法最终使用ExecutionGraphBuilder进行了ExecuteGraph的生成。
在构建ExecutionGraph过程中(ExecutionGraphBuilder.buildGraph()方法),会调用ExecutionGraph.enableCheckpointing()方法,这个方法不管任务里有没有设置checkpoint都会调用的。在enableCheckpointing()方法里会创建CheckpointCoordinator,这是负责checkpoint的核心实现类,同时会给job添加一个监听器CheckpointCoordinatorDeActivator(只有设置了checkpoint才会注册这个监听器),CheckpointCoordinatorDeActivator负责checkpoint的启动和停止。源码如下:
// ExecutionGraphBuilder类
public static ExecutionGraph buildGraph(@Nullable ExecutionGraph prior,JobGraph jobGraph,Configuration jobManagerConfig,...)throws JobExecutionException, JobException {...// configure the state checkpointingJobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();if (snapshotSettings != null) {List<ExecutionJobVertex> triggerVertices =idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
List<ExecutionJobVertex> ackVertices =idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
List<ExecutionJobVertex> confirmVertices =idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);// 一系列的checkpoint设置,包括statebackend, user-define hook, checkpointIdCounter等...executionGraph.enableCheckpointing(chkConfig,triggerVertices,ackVertices,confirmVertices,hooks,checkpointIdCounter,completedCheckpoints,rootBackend,checkpointStatsTracker);...
}
在 build graph 时确定了 triggerVertices ( 用来触发 chekcpoint),ackVertices ( 用来接收 checkpoint 已经完成的报告 )以及 confirmVertices ( 用来确认 checkpoint 已经完成 )。
executionGraph.enableCheckpointing()中做了一些checkpoint相关类的初始化操作,以及checkpoint状态监听器的注册。在JobManager端开始进行任务调度的时候,会对job的状态进行转换,由CREATED转成RUNNING,实现在transitionState()方法中,在这个过程中刚才设置的job监听器CheckpointCoordinatorDeActivator就开始启动checkpoint的定时任务了,调用链为ExecutionGraph.transitionToRunning() -> transitionState() -> notifyJobStatusChange() -> CheckpointCoordinatorDeActivator.jobStatusChanges() -> CheckpointCoordinator.startCheckpointScheduler()源码如下:
public void transitionToRunning() {if (!transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);}
}
private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {...if (state == current) {notifyJobStatusChange(newState, error);return true;}...
}
private void notifyJobStatusChange(JobStatus newState, Throwable error) {if (jobStatusListeners.size() > 0) {final long timestamp = System.currentTimeMillis();final Throwable serializedError = error == null ? null : new SerializedThrowable(error);
for (JobStatusListener listener : jobStatusListeners) {try {listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);} catch (Throwable t) {LOG.warn("Error while notifying JobStatusListener", t);}}}
}
// CheckpointCoordinatorDeActivator类
@Overridepublic void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {if (newJobStatus == JobStatus.RUNNING) {// start the checkpoint schedulercoordinator.startCheckpointScheduler();} else {// anything else should stop the trigger for nowcoordinator.stopCheckpointScheduler();}}
CheckpointCoordinator会部署一个定时任务,用于周期性的触发checkpoint,这个定时任务就是ScheduledTrigger类。
public void startCheckpointScheduler() {synchronized (lock) {if (shutdown) {throw new IllegalArgumentException("Checkpoint coordinator is shut down");}
// make sure all prior timers are cancelledstopCheckpointScheduler();
periodicScheduling = true;currentPeriodicTrigger = scheduleTriggerWithDelay(getRandomInitDelay());}
}
private ScheduledFuture<?> scheduleTriggerWithDelay(long initDelay) {return timer.scheduleAtFixedRate(new ScheduledTrigger(), initDelay, baseInterval, TimeUnit.MILLISECONDS);
}
public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointProperties props,@Nullable String externalSavepointLocation,boolean isPeriodic) {
if (props.getCheckpointType().getPostCheckpointAction() == PostCheckpointAction.TERMINATE&& !(props.isSynchronous() && props.isSavepoint())) {return FutureUtilspletedExceptionally(new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX."));}
CheckpointTriggerRequest request =new CheckpointTriggerRequest(props, externalSavepointLocation, isPeriodic);// 首先做一些前置校验,看是否能触发checkpoint,主要就是检查最大并发checkpoint数,checkpoint间隔时间// 在积压(如果有)的checkpoint中选一个进行处理chooseRequestToExecute(request).ifPresent(this::startTriggeringCheckpoint);return request.onCompletionPromise;
}
private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {try {synchronized (lock) {preCheckGlobalState(request.isPeriodic);}// 找出需要发送checkpoint消息的task(即tasksToTrigger,由生成JobGraph时生成,由所有不包含输入的顶点组成)放入executions// Check if all tasks that we need to trigger are running. If not, abort the checkpoint.final Execution[] executions = getTriggerExecutions();// 找出需要返回checkpoint的ack反馈信息的task放入ackTasks,并将其作为构造PendingCheckpoint的参数// Check if all tasks that need to acknowledge the checkpoint are running. If not, abort the checkpointfinal Map<ExecutionAttemptID, ExecutionVertex> ackTasks = getAckTasks();// 创建PendingCheckpoint, 用户自定义hook触发...... // no exception, no discarding, everything is OKfinal long checkpointId =checkpoint.getCheckpointId();snapshotTaskState(timestamp,checkpointId,checkpoint.getCheckpointStorageLocation(),request.props,executions);coordinatorsToCheckpoint.forEach((ctx) ->ctx.afterSourceBarrierInjection(checkpointId));...} catch (Throwable throwable) {onTriggerFailure(request, throwable);}
}
private void snapshotTaskState(long timestamp,long checkpointID,CheckpointStorageLocation checkpointStorageLocation,CheckpointProperties props,Execution[] executions) {...// send the messages to the tasks that trigger their checkpointfor (Execution execution : executions) {// 两者底层调用的是同一个方法,只有语义上的区别if (props.isSynchronous()) {execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions);} else {execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}}
}
Execution.triggerCheckpoint()就是远程调用TaskManager的triggerCheckpoint()方法:
private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {...final LogicalSlot slot = assignedResource;
if (slot != null) {final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);} else {LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");}
}
2. SourceStreamTask的Checkpoint执行
TaskManager的triggerCheckpoint()方法首先获取到source task(即SourceStreamTask),调用Task.triggerCheckpointBarrier(),triggerCheckpointBarrier()会异步的去执行一个独立线程,这个线程来负责source task的checkpoint执行。
// TaskExecutor类
public CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID,long checkpointId,long checkpointTimestamp,CheckpointOptions checkpointOptions) {log.debug("Trigger checkpoint {}@{} for {}.",checkpointId,checkpointTimestamp,executionAttemptID);...final Task task = taskSlotTable.getTask(executionAttemptID);
if (task != null) {task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
return CompletableFuturepletedFuture(Acknowledge.get());} else {...}
}
// Task类
public void triggerCheckpointBarrier(final long checkpointID,final long checkpointTimestamp,final CheckpointOptions checkpointOptions) {...try {// invokable 事实上就是 StreamTask 类,而 StreamTask 也将委托给更具体的类,直到业务代码invokable.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);}...
}
由 invokable 调用 triggerCheckpoint。由于 trigger task 都是 source operator chain 所以进入 SourceStreamTask:
public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {if (!externallyInducedCheckpoints) {return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);} else {// we do not trigger checkpoints here, we simply state whether we can trigger themsynchronized (lock) {return CompletableFuturepletedFuture(isRunning());}}
}
checkpoint的核心实现在StreamTask.performCheckpoint()方法中,该方法主要有三个步骤
1、在checkpoint之前做一些准备工作,通常情况下operator在这个阶段是不做什么操作的
2、立即向下游广播CheckpointBarrier,以便使下游的task能够及时的接收到CheckpointBarrier也开始进行checkpoint的操作
3、开始进行状态的快照,即checkpoint操作。
在进行performCheckpoint()时,task任务线程是不能够进行数据处理的, checkpoint和任务处理使用的是同一把锁:
public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {...triggerCheckpointAsyncInMailbox(checkpointMetaData, checkpointOptions));...
}private boolean triggerCheckpointAsyncInMailbox(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions)throws Exception {...subtaskCheckpointCoordinator.initCheckpoint(checkpointMetaData.getCheckpointId(), checkpointOptions);boolean success =performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);if (!success) {declineCheckpoint(checkpointMetaData.getCheckpointId());}return success;...
} // SubtaskCheckpointCoordinatorImpl类
public void checkpointState(CheckpointMetaData metadata,CheckpointOptions options,CheckpointMetricsBuilder metrics,OperatorChain<?, ?> operatorChain,Supplier<Boolean> isRunning)throws Exception {//校验checkpoint是否需要终止// Step (0): Record the last triggered checkpointId and abort the sync phase of checkpoint// if necessary.lastCheckpointId = metadata.getCheckpointId();if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {// broadcast cancel checkpoint marker to avoid downstream back-pressure due to// checkpoint barrier align.operatorChain.broadcastEvent(new CancelCheckpointMarker(metadata.getCheckpointId()));LOG.info("Checkpoint {} has been notified as aborted, would not trigger any checkpoint.",metadata.getCheckpointId());return;}// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.// The pre-barrier work should be nothing or minimal in the common case.// 一般无逻辑operatorChain.prepareSnapshotPreBarrier(metadata.getCheckpointId());// Step (2): Send the checkpoint barrier downstream// 封装优先级buffer后add到ResultSubpartition的PrioritizedDeque队列中,更新buffer和backlog数// 当notifyDataAvailable=true时 通知下游消费// 下游CheckpointedInputGate拿到buffer后匹配到是checkpoint事件做出相应动作operatorChain.broadcastEvent(new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options),options.isUnalignedCheckpoint());// Step (3): Prepare to spill the in-flight buffers for input and output// 对齐直接跳过if (options.isUnalignedCheckpoint()) {// output data already written while broadcasting eventchannelStateWriter.finishOutput(metadata.getCheckpointId());}// Step (4): Take the state snapshot. This should be largely asynchronous, to not impact// progress of the// streaming topologyMap<OperatorID, OperatorSnapshotFutures> snapshotFutures =new HashMap<>(operatorChain.getNumberOfOperators());try {// takeSnapshotSync 执行checkpoint核心逻辑的入口if (takeSnapshotSync(snapshotFutures, metadata, metrics, options, operatorChain, isRunning)) {// finishAndReportAsync 完成snapshot后,向jobMaster发送报告finishAndReportAsync(snapshotFutures, metadata, metrics, isRunning);} else {cleanup(snapshotFutures, metadata, metrics, new Exception("Checkpoint declined"));}} catch (Exception ex) {cleanup(snapshotFutures, metadata, metrics, ex);throw ex;}
}private boolean takeSnapshotSync(Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress,CheckpointMetaData checkpointMetaData,CheckpointMetricsBuilder checkpointMetrics,CheckpointOptions checkpointOptions,OperatorChain<?, ?> operatorChain,Supplier<Boolean> isRunning)throws Exception {...// 存储checkpoint的位置(Memory/FS/RockDB)CheckpointStreamFactory storage =checkpointStorage.resolveCheckpointStorageLocation(checkpointId, checkpointOptions.getTargetLocation());try {for (StreamOperatorWrapper<?, ?> operatorWrapper :operatorChain.getAllOperators(true)) {if (!operatorWrapper.isClosed()) {operatorSnapshotsInProgress.put(operatorWrapper.getStreamOperator().getOperatorID(),// 执行checkpoint入口buildOperatorSnapshotFutures(checkpointMetaData,checkpointOptions,operatorChain,operatorWrapper.getStreamOperator(),isRunning,channelStateWriteResult,storage));}}} finally {checkpointStorage.clearCacheFor(checkpointId);}...
}//StreamOperatorStateHandler类
void snapshotState(CheckpointedStreamOperator streamOperator,Optional<InternalTimeServiceManager<?>> timeServiceManager,String operatorName,long checkpointId,long timestamp,CheckpointOptions checkpointOptions,CheckpointStreamFactory factory,OperatorSnapshotFutures snapshotInProgress,StateSnapshotContextSynchronousImpl snapshotContext,boolean isUsingCustomRawKeyedState)throws CheckpointException {try {...//执行需要持久化state的操作//比如map操作,它生成的是StreamMap属于AbstractUdfStreamOperator子类,里面封装了snapshotState逻辑,如果没实现ck接口就跳过此步骤streamOperator.snapshotState(snapshotContext);snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());if (null != operatorStateBackend) {snapshotInProgress.setOperatorStateManagedFuture(operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}if (null != keyedStateBackend) {snapshotInProgress.setKeyedStateManagedFuture(keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}} ...
}
如果用户实现了Checkpoint接口则会持久化到指定的stateBackend中反之略过...
这里以AbstractUdfStreamOperator为例(map,filter等Operator都继承了该abstract类):
// AbstractUdfStreamOperator类
public void snapshotState(StateSnapshotContext context) throws Exception {super.snapshotState(context);//判断userFunction是否属于CheckpointedFunction或者ListCheckpointed的实例//如果是则调用用户实现的snapshotState执行相关逻辑//比如FlinkKafkaConsumerBase则自己实现了CheckpointedFunction的接口StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
}// StreamingFunctionUtils类
public static void snapshotFunctionState(StateSnapshotContext context, OperatorStateBackend backend, Function userFunction)throws Exception {Preconditions.checkNotNull(context);Preconditions.checkNotNull(backend);while (true) {// 校验用户是否有自定义checkpoint逻辑并执行用户自定义逻辑if (trySnapshotFunctionState(context, backend, userFunction)) {break;}// inspect if the user function is wrapped, then unwrap and try again if we can snapshot// the inner functionif (userFunction instanceof WrappingFunction) {userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();} else {break;}}
}private static boolean trySnapshotFunctionState(StateSnapshotContext context, OperatorStateBackend backend, Function userFunction)throws Exception {// 判断用户是否实现CheckpointedFunction接口if (userFunction instanceof CheckpointedFunction) {// 执行用户自定义的snapshot逻辑((CheckpointedFunction) userFunction).snapshotState(context);return true;}if (userFunction instanceof ListCheckpointed) {// ListCheckpointed已废弃不再多说...}return false;
}
3. Task上报checkpoint信息
整个快照生成完毕,最后Flink会调用finishAndReportAsync向Master发送完成报告:
private void finishAndReportAsync(Map<OperatorID, OperatorSnapshotFutures> snapshotFutures,CheckpointMetaData metadata,CheckpointMetricsBuilder metrics,Supplier<Boolean> isRunning) {// we are transferring ownership over snapshotInProgressList for cleanup to the thread,// active on submitasyncOperationsThreadPool.execute(new AsyncCheckpointRunnable(snapshotFutures,metadata,metrics,System.nanoTime(),taskName,registerConsumer(),unregisterConsumer(),env,asyncExceptionHandler,isRunning));
}
最终会调用到CheckpointCoordinator.receiveAcknowledgeMessage()方法:
public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message, String taskManagerLocationInfo)throws CheckpointException {...synchronized (lock) {// we need to check inside the lock for being shutdown as well, otherwise we// get races and invalid error log messagesif (shutdown) {return false;}final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);if (checkpoint != null && !checkpoint.isDisposed()) {switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(),message.getSubtaskState(),message.getCheckpointMetrics())) {case SUCCESS:LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {} at {}.",checkpointId,message.getTaskExecutionId(),message.getJob(),taskManagerLocationInfo);if (checkpoint.isFullyAcknowledged()) {completePendingCheckpoint(checkpoint);}break;...}...}...}
}
JobManager完成所有task的ack之后,会做以下操作:
1、将PendingCheckpoint 转成CompletedCheckpoint,标志着checkpoint过程完成,CompletedCheckpoint里包含了checkpoint的元数据信息,包括checkpoint的路径地址,状态数据大小等等,同时也会将元数据信息进行持久化,也会把过期的checkpoint数据给删除
2、通知所有的task进行commit操作。
private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint){...// 生成CompeletePointcompletedCheckpoint =pendingCheckpoint.finalizeCheckpoint(checkpointsCleaner, this::scheduleTriggerRequest, executor);// 持久化checkpoint到state backendcompletedCheckpointStore.addCheckpoint(completedCheckpoint, checkpointsCleaner, this::scheduleTriggerRequest);...// 通知taskcheckpoint完成// send the "notify complete" call to all vertices, coordinators, etc.sendAcknowledgeMessages(checkpointId, completedCheckpoint.getTimestamp());
}private void sendAcknowledgeMessages(long checkpointId, long timestamp) {// commit tasksfor (ExecutionVertex ev : tasksToCommitTo) {Execution ee = ev.getCurrentExecutionAttempt();if (ee != null) {ee.notifyCheckpointComplete(checkpointId, timestamp);}}// commit coordinatorsfor (OperatorCoordinatorCheckpointContext coordinatorContext : coordinatorsToCheckpoint) {coordinatorContext.notifyCheckpointComplete(checkpointId);}
}
4. JobManager通知Task进行commit
task在接收到消息之后会调用Task.notifyCheckpointComplete()方法,最后会调用StreamOperator.notifyCheckpointComplete(),一般来说不做什么操作。但是像AbstractUdfStreamOperator这种的可能还会由一些其他操作:
// TaskExecutor类
public CompletableFuture<Acknowledge> confirmCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {...final Task task = taskSlotTable.getTask(executionAttemptID);if (task != null) {task.notifyCheckpointComplete(checkpointId);return CompletableFuturepletedFuture(Acknowledge.get());} ...
}// Task类
public void notifyCheckpointComplete(final long checkpointID) {// invokable 事实上就是 StreamTask 类,而 StreamTask 也将委托给更具体的类,直到业务代码final AbstractInvokable invokable = this.invokable;if (executionState == ExecutionState.RUNNING && invokable != null) {try {invokable.notifyCheckpointCompleteAsync(checkpointID);}...}
}
AbstractUdfStreamOperator主要是针对用户自定义函数的operator,像StreamMap,StreamSource等等,如果用户定义的Function实现了CheckpointListener接口,则会进行额外的一些处理,例如FlinkKafkaConsumerBase会向kafka提交消费的offset,TwoPhaseCommitSinkFunction类会进行事务的提交,例如FlinkKafkaProducer(此处有个注意点,在设置为exactly once后,kafka数据的提交依赖checkpoint的完成,如果kafkaconsumer的隔离等级设为read_committed,只有等到checkpoint完成后才能消费到数据,消费数据会有0-checkpoint_interval的延迟)。
5. 非SourceStreamTask的checkpoint实现
上述是source task的checkpoint实现,source task的checkpoint是由JobManager来触发的,source task会向下游广播发送CheckpointBarrier,那么下游的task就会接收到source task发送的CheckpointBarrier,checkpoint的起始位置也在接收到CheckpointBarrier。非SourceTask一直通过循环从上游读取消息,当接收一条消息后,会对消息类型进行判断,如果是CheckpointBarrier类型的消息则会进一步判断是需要对齐或是进行checkpoint。该逻辑在 CheckpointInputGate#pollNext()
方法中进行:
public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {Optional<BufferOrEvent> next = inputGate.pollNext();if (!next.isPresent()) {return handleEmptyBuffer();}BufferOrEvent bufferOrEvent = next.get();if (bufferOrEvent.isEvent()) {return handleEvent(bufferOrEvent);} else if (bufferOrEvent.isBuffer()) {barrierHandler.addProcessedBytes(bufferOrEvent.getBuffer().getSize());}return next;
}private Optional<BufferOrEvent> handleEvent(BufferOrEvent bufferOrEvent)throws IOException, InterruptedException {Class<? extends AbstractEvent> eventClass = bufferOrEvent.getEvent().getClass();if (eventClass == CheckpointBarrier.class) {CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();// 处理barrierbarrierHandler.processBarrier(checkpointBarrier, bufferOrEvent.getChannelInfo());}...
}// SingleCheckpointBarrierHandler类 barrier对齐(CheckpointBarrierTracker类是 at last once)
public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo)throws IOException {long barrierId = barrier.getId();LOG.debug("{}: Received barrier from channel {} @ {}.", taskName, channelInfo, barrierId);// barrier滞后,已经在处理新checkpoint或者checkpoint已经超时置为非pending状态,丢弃原有的,释放通道消费if (currentCheckpointId > barrierId|| (currentCheckpointId == barrierId && !isCheckpointPending())) {controller.obsoleteBarrierReceived(channelInfo, barrier);return;}// 当前checkpoint滞后,收到新checkpoint的barrier,开始新的checkpointif (currentCheckpointId < barrierId) {if (isCheckpointPending()) {// cancel 旧的checkpointcancelSubsumedCheckpoint(barrierId);}if (getNumOpenChannels() == 1) {// 如果上游通道数只有一个,直接触发checkpointmarkAlignmentStartAndEnd(barrierId, barrier.getTimestamp());} else {// 上游有多个通道,开始对齐markAlignmentStart(barrierId, barrier.getTimestamp());}currentCheckpointId = barrierId;numBarriersReceived = 0;allBarriersReceivedFuture = new CompletableFuture<>();try {// 首次收到barrier处理if (controller.preProcessFirstBarrier(channelInfo, barrier)) {LOG.debug("{}: Triggering checkpoint {} on the first barrier at {}.",taskName,barrier.getId(),barrier.getTimestamp());notifyCheckpoint(barrier);}} catch (CheckpointException e) {abortInternal(barrier.getId(), e);return;}}// 接收barrier并阻塞相应的channelcontroller.barrierReceived(channelInfo, barrier);if (currentCheckpointId == barrierId) {if (++numBarriersReceived == numOpenChannels) {if (getNumOpenChannels() > 1) {markAlignmentEnd();}numBarriersReceived = 0;// 所有barrier均已到达,处理最后一个barrierif (controller.postProcessLastBarrier(channelInfo, barrier)) {LOG.debug("{}: Triggering checkpoint {} on the last barrier at {}.",taskName,barrier.getId(),barrier.getTimestamp());// 开始checkpoint,实际还是调用之前的StreamTask.performCheckpoint()方法,后续跟以上source checkpoint一致notifyCheckpoint(barrier);}allBarriersReceivedFutureplete(null);}}
}
总结: 总的来说checkpoint是通过job状态的变更来启动,接下来找到source task 进行ck,同时将barrier发送到下游算子通知他们开始自己的ck,算子完成后进行回调通知。过称还算比较清晰,细节没有细抠。以上仅仅对checkpoint的barrier过程做了一次简单的分析,具体的状态持久化过程没有涉及,也没有对非对齐barrier做解析,有兴趣的可以自己看看,后续可能会有相应的文章
参考文章:
Flink源码分析——Checkpoint源码分析(一) - 知乎
Flink1.12源码解读——Checkpoint详细执行过程_按时吃早饭ABC的博客-CSDN博客
本文标签: Flink Checkpoint源码浅析
版权声明:本文标题:Flink Checkpoint源码浅析 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/IT/1694671930a254880.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论