前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文搞懂 Flink checkpoint snapshot 全过程

一文搞懂 Flink checkpoint snapshot 全过程

作者头像
shengjk1
发布2020-07-07 15:03:11
2K0
发布2020-07-07 15:03:11
举报
文章被收录于专栏:码字搬砖码字搬砖
前言

上一篇,我们了解了 checkpoint 全流程,现在我们具体讲解一下 checkpoint 时 snapshot 的全过程。现在我们具体看一下 checkpoint 时是如何做 snapshot 的

正文

由 checkpoint 全流程 我们可以知道

代码语言:javascript
复制
public void executeCheckpointing() throws Exception {
			startSyncPartNano = System.nanoTime();

			try {
				// 调用 StreamOperator 进行 snapshotState 的入口方法
				// 先 sourceOperator (flatMap -> source) 再 sinkOperator (sink -> filter)
				for (StreamOperator<?> op : allOperators) {
					//对每一个算子进行 snapshotInProgress 并存储至 operatorSnapshotsInProgress
					// (存储 是异步checkpoint的一个引用) 然后分别进行本地 checkpoint store and jobManager ack
					// 捕获 barrier 的过程其实就是处理 input 数据的过程,对应着 StreamInputProcessor.processInput() 方法
					checkpointStreamOperator(op);
				}
......
		}

是做 snapshot 逻辑,具体如下( AbstractStreamOperator.snapshotState )

代码语言:javascript
复制
@Override
	// 由此处统一持久化
	public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
			CheckpointStreamFactory factory) throws Exception {

		KeyGroupRange keyGroupRange = null != keyedStateBackend ?
				keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;

		OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();

		try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
				checkpointId,
				timestamp,
				factory,
				keyGroupRange,
				getContainingTask().getCancelables())) {

			snapshotState(snapshotContext);

			snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
			snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());

			// state 持久化
			if (null != operatorStateBackend) {
				snapshotInProgress.setOperatorStateManagedFuture(
					// 触发一个异步的 snapshot 至 DefaultOperatorStateBackend(内部的)
					operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
			}
			// source -> flatMap --> rebance --> filter --> keyby --> sink
			// 只有当 sink 的时候,keyedStateBackend 才不为 null , 才会执行 backend 的 snapshot
			if (null != keyedStateBackend) {
				snapshotInProgress.setKeyedStateManagedFuture(
					// 触发一个异步的 snapshot 至 StateBacked
					keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
			}
		} catch (Exception snapshotException) {
			try {
				snapshotInProgress.cancel();
			} catch (Exception e) {
				snapshotException.addSuppressed(e);
			}

			String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " +
				getOperatorName() + ".";

			if (!getContainingTask().isCanceled()) {
				LOG.info(snapshotFailMessage, snapshotException);
			}
			throw new Exception(snapshotFailMessage, snapshotException);
		}

		return snapshotInProgress;
	}

由此可以知道,如果是 null != operatorStateBackend 则 operatorStateBackend.snapshot,如果 null != keyedStateBackend 则 keyedStateBackend.snapshot。 此处,我们以 RocksDBIncrementalSnapshotOperation 为例 ( operatorStateBackend.snapshot 的代码注释已经很清楚了 )

代码语言:javascript
复制
@Nonnull
	@Override
	protected RunnableFuture<SnapshotResult<KeyedStateHandle>> doSnapshot(
		long checkpointId,
		long checkpointTimestamp,
		@Nonnull CheckpointStreamFactory checkpointStreamFactory,
		@Nonnull CheckpointOptions checkpointOptions) throws Exception {

		final SnapshotDirectory snapshotDirectory = prepareLocalSnapshotDirectory(checkpointId);
		LOG.trace("Local RocksDB checkpoint goes to backup path {}.", snapshotDirectory);

		//  RocksDBIncrementalRestoreOperation 中 kvStateInformation 赋值
		//  kvStateInformation.put(columnFamilyName, registeredColumn(RocksDBKeyedStateBackend.RocksDbKvStateInfo));
		final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots = new ArrayList<>(kvStateInformation.size());
		final Set<StateHandleID> baseSstFiles = snapshotMetaData(checkpointId, stateMetaInfoSnapshots);

		// 对 rocksdb 做 checkpoint 为 RocksDBIncrementalSnapshotOperation.uploadSstFiles 做准备
		takeDBNativeCheckpoint(snapshotDirectory);

		// snapshot
		final RocksDBIncrementalSnapshotOperation snapshotOperation =
			new RocksDBIncrementalSnapshotOperation(
				checkpointId,
				checkpointStreamFactory,
				snapshotDirectory,
				baseSstFiles,
				stateMetaInfoSnapshots);

		return snapshotOperation.toAsyncSnapshotFutureTask(cancelStreamRegistry);
	}

进入 RocksDBIncrementalSnapshotOperation 内部

代码语言:javascript
复制
@Override
		protected SnapshotResult<KeyedStateHandle> callInternal() throws Exception {

			boolean completed = false;

			// Handle to the meta data file
			SnapshotResult<StreamStateHandle> metaStateHandle = null;
			// Handles to new sst files since the last completed checkpoint will go here
			final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
			// Handles to the misc files in the current snapshot will go here
			final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();

			try {

				// 写 meta (全量) 到 hdfs
				metaStateHandle = materializeMetaData();

				// Sanity checks - they should never fail
				Preconditions.checkNotNull(metaStateHandle, "Metadata was not properly created.");
				Preconditions.checkNotNull(metaStateHandle.getJobManagerOwnedSnapshot(),
					"Metadata for job manager was not properly created.");

				//  将新产生的 sst file、misc file upload to checkpointFs
				uploadSstFiles(sstFiles, miscFiles);

				synchronized (materializedSstFiles) {
					materializedSstFiles.put(checkpointId, sstFiles.keySet());
				}

				final IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle =
					new IncrementalRemoteKeyedStateHandle(
						backendUID,
						keyGroupRange,
						checkpointId,
						sstFiles,
						miscFiles,
						metaStateHandle.getJobManagerOwnedSnapshot());

				//PermanentSnapshotDirectory
				final DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle();
				final SnapshotResult<KeyedStateHandle> snapshotResult;
				if (directoryStateHandle != null && metaStateHandle.getTaskLocalSnapshot() != null) {

					// 增量的 localSnapshot
					IncrementalLocalKeyedStateHandle localDirKeyedStateHandle =
						new IncrementalLocalKeyedStateHandle(
							backendUID,
							checkpointId,
							directoryStateHandle,
							keyGroupRange,
							metaStateHandle.getTaskLocalSnapshot(),
							sstFiles.keySet());

					//  localSnapshot report to local state manager,
					//  jobManagerState(jmIncrementalKeyedStateHandle) report to job manager
					snapshotResult = SnapshotResult.withLocalState(jmIncrementalKeyedStateHandle, localDirKeyedStateHandle);
				} else {
					//jobManagerState(jmIncrementalKeyedStateHandle) report to job manager
					snapshotResult = SnapshotResult.of(jmIncrementalKeyedStateHandle);
				}

				completed = true;

				return snapshotResult;
			} finally {
				if (!completed) {
					final List<StateObject> statesToDiscard =
						new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
					statesToDiscard.add(metaStateHandle);
					statesToDiscard.addAll(miscFiles.values());
					statesToDiscard.addAll(sstFiles.values());
					cleanupIncompleteSnapshot(statesToDiscard);
				}
			}
		}

元数据是全部持久化,而数据仅仅将新产生的 sst file、misc file upload to checkpointFs

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-07-06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
    • 正文
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档