前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >flink checkpoint 源码分析 (二)

flink checkpoint 源码分析 (二)

作者头像
sanmutongzi
发布2020-03-04 15:48:42
1.6K0
发布2020-03-04 15:48:42
举报
文章被收录于专栏:stream processstream process

转发请注明原创地址http://www.cnblogs.com/dongxiao-yang/p/8260370.html

flink checkpoint 源码分析 (一)一文主要讲述了在JobManager端定时生成TriggerCheckpoint的代码部分,本文继续研究下TaskManager端如何处理收到的TriggerCheckpoint消息并执行对应的备份操作。

TriggerCheckpoint消息进入TaskManager的处理路径为 handleMessage -> handleCheckpointingMessage -> Task.triggerCheckpointBarrier

代码语言:javascript
复制
 1     public void triggerCheckpointBarrier(
 2             final long checkpointID,
 3             long checkpointTimestamp,
 4             final CheckpointOptions checkpointOptions) {
 5 
 6         final AbstractInvokable invokable = this.invokable;
 7         final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
 8 
 9         if (executionState == ExecutionState.RUNNING && invokable != null) {
10             if (invokable instanceof StatefulTask) {
11                 // build a local closure
12                 final StatefulTask statefulTask = (StatefulTask) invokable;
13                 final String taskName = taskNameWithSubtask;
14                 final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
15                     FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
16                 Runnable runnable = new Runnable() {
17                     @Override
18                     public void run() {
19                         // set safety net from the task's context for checkpointing thread
20                         LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
21                         FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
22 
23                         try {
24                             boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
25                             if (!success) {
26                                 checkpointResponder.declineCheckpoint(
27                                         getJobID(), getExecutionId(), checkpointID,
28                                         new CheckpointDeclineTaskNotReadyException(taskName));
29                             }
30                         }
31                         catch (Throwable t) {
32                             if (getExecutionState() == ExecutionState.RUNNING) {
33                                 failExternally(new Exception(
34                                     "Error while triggering checkpoint " + checkpointID + " for " +
35                                         taskNameWithSubtask, t));
36                             } else {
37                                 LOG.debug("Encountered error while triggering checkpoint {} for " +
38                                     "{} ({}) while being not in state running.", checkpointID,
39                                     taskNameWithSubtask, executionId, t);
40                             }
41                         } finally {
42                             FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
43                         }
44                     }
45                 };
46                 executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
47             }
48             else {
49                 checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
50                         new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask));
51                 
52                 LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).",
53                         taskNameWithSubtask, executionId);
54 
55             }
56         }
57         else {
58             LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
59 
60             // send back a message that we did not do the checkpoint
61             checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
62                     new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
63         }
64     }

在正常的情况下,triggerCheckpointBarrier会调用StreamTask内部实现的triggerCheckpoint()方法,并根据调用链条

代码语言:javascript
复制
triggerCheckpoint->performCheckpoint->checkpointState->CheckpointingOperation.executeCheckpointing
代码语言:javascript
复制
    public void executeCheckpointing() throws Exception {
            startSyncPartNano = System.nanoTime();

            boolean failed = true;
            try {
                for (StreamOperator<?> op : allOperators) {
                    checkpointStreamOperator(op);
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
                            checkpointMetaData.getCheckpointId(), owner.getName());
                }

                startAsyncPartNano = System.nanoTime();

                checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);

                // at this point we are transferring ownership over snapshotInProgressList for cleanup to the thread
                runAsyncCheckpointingAndAcknowledge();
                failed = false;

                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - finished synchronous part of checkpoint {}." +
                            "Alignment duration: {} ms, snapshot duration {} ms",
                        owner.getName(), checkpointMetaData.getCheckpointId(),
                        checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
                        checkpointMetrics.getSyncDurationMillis());
                }

在executeCheckpointing方法里进行了两个操作,首先是对该task对应的所有StreamOperator对象调用checkpointStreamOperator(op)

checkpointStreamOperator代码:

代码语言:javascript
复制
    private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
            if (null != op) {
                // first call the legacy checkpoint code paths
                nonPartitionedStates.add(op.snapshotLegacyOperatorState(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions));

                OperatorSnapshotResult snapshotInProgress = op.snapshotState(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions);

                snapshotInProgressList.add(snapshotInProgress);
            } else {
                nonPartitionedStates.add(null);
                OperatorSnapshotResult emptySnapshotInProgress = new OperatorSnapshotResult();
                snapshotInProgressList.add(emptySnapshotInProgress);
            }
        }

StreamOperator的snapshotState(long checkpointId,long timestamp,CheckpointOptions checkpointOptions)方法最终由它的子类AbstractStreamOperator给出了一个final实现

代码语言:javascript
复制
    @Override
    public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {

        KeyGroupRange keyGroupRange = null != keyedStateBackend ?
                keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;

        OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();

        CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);

        try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
                checkpointId,
                timestamp,
                factory,
                keyGroupRange,
                getContainingTask().getCancelables())) {

            snapshotState(snapshotContext);

            snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
            snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

            if (null != operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(
                    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }

            if (null != keyedStateBackend) {
                snapshotInProgress.setKeyedStateManagedFuture(
                    keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
        } catch (Exception snapshotException) {
            try {
                snapshotInProgress.cancel();
            } catch (Exception e) {
                snapshotException.addSuppressed(e);
            }

            throw new Exception("Could not complete snapshot " + checkpointId + " for operator " +
                getOperatorName() + '.', snapshotException);
        }

        return snapshotInProgress;
    }

上述代码里的snapshotState(snapshotContext)方法在不同的最终operator中有自己的具体实现。

executeCheckpointing的第二个操作是然后是调用runAsyncCheckpointingAndAcknowledge执行

所有的state固化文件操作并返回acknowledgeCheckpoint给JobManager。

代码语言:javascript
复制
    private static final class AsyncCheckpointRunnable implements Runnable, Closeable {
.....
.....

                if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
                        CheckpointingOperation.AsynCheckpointState.COMPLETED)) {

                    owner.getEnvironment().acknowledgeCheckpoint(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetrics,
                        subtaskState);

补充,在上文提到的performCheckpoint方法内,调用checkpointState方法之前,flink会把预先把checkpointBarrier发送到下游task,以便下游operator尽快开始他们的checkpoint进程,

这也是flink barrier机制生成barrier的地方。

代码语言:javascript
复制
    synchronized (lock) {
            if (isRunning) {
                // we can do a checkpoint

                // Since both state checkpointing and downstream barrier emission occurs in this
                // lock scope, they are an atomic operation regardless of the order in which they occur.
                // Given this, we immediately emit the checkpoint barriers, so the downstream operators
                // can start their checkpoint work as soon as possible
                operatorChain.broadcastCheckpointBarrier(
                        checkpointMetaData.getCheckpointId(),
                        checkpointMetaData.getTimestamp(),
                        checkpointOptions);

                checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
                return true;
代码语言:javascript
复制
    public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
        try {
            CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
            for (RecordWriterOutput<?> streamOutput : streamOutputs) {
                streamOutput.broadcastEvent(barrier);
            }
        }
        catch (InterruptedException e) {
            throw new IOException("Interrupted while broadcasting checkpoint barrier");
        }
    }

上述描述的触发checkpoint调用路径是针对source task的链路。对于其余非souce的operator,

方法链路为StreamInputProcessor/StreamTwoInputProcessor.processInput() ->barrierHandler.getNextNonBlocked()->processBarrier ->notifyCheckpoint->triggerCheckpointOnBarrier

参考文档:

Flink 原理与实现:如何生成 StreamGraph

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-01-12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档