转发请注明原创地址http://www.cnblogs.com/dongxiao-yang/p/8260370.html
flink checkpoint 源码分析 (一)一文主要讲述了在JobManager端定时生成TriggerCheckpoint的代码部分,本文继续研究下TaskManager端如何处理收到的TriggerCheckpoint消息并执行对应的备份操作。
TriggerCheckpoint消息进入TaskManager的处理路径为 handleMessage -> handleCheckpointingMessage -> Task.triggerCheckpointBarrier
1 public void triggerCheckpointBarrier(
2 final long checkpointID,
3 long checkpointTimestamp,
4 final CheckpointOptions checkpointOptions) {
5
6 final AbstractInvokable invokable = this.invokable;
7 final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
8
9 if (executionState == ExecutionState.RUNNING && invokable != null) {
10 if (invokable instanceof StatefulTask) {
11 // build a local closure
12 final StatefulTask statefulTask = (StatefulTask) invokable;
13 final String taskName = taskNameWithSubtask;
14 final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
15 FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
16 Runnable runnable = new Runnable() {
17 @Override
18 public void run() {
19 // set safety net from the task's context for checkpointing thread
20 LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
21 FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
22
23 try {
24 boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
25 if (!success) {
26 checkpointResponder.declineCheckpoint(
27 getJobID(), getExecutionId(), checkpointID,
28 new CheckpointDeclineTaskNotReadyException(taskName));
29 }
30 }
31 catch (Throwable t) {
32 if (getExecutionState() == ExecutionState.RUNNING) {
33 failExternally(new Exception(
34 "Error while triggering checkpoint " + checkpointID + " for " +
35 taskNameWithSubtask, t));
36 } else {
37 LOG.debug("Encountered error while triggering checkpoint {} for " +
38 "{} ({}) while being not in state running.", checkpointID,
39 taskNameWithSubtask, executionId, t);
40 }
41 } finally {
42 FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
43 }
44 }
45 };
46 executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
47 }
48 else {
49 checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
50 new CheckpointDeclineTaskNotCheckpointingException(taskNameWithSubtask));
51
52 LOG.error("Task received a checkpoint request, but is not a checkpointing task - {} ({}).",
53 taskNameWithSubtask, executionId);
54
55 }
56 }
57 else {
58 LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
59
60 // send back a message that we did not do the checkpoint
61 checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
62 new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
63 }
64 }
在正常的情况下,triggerCheckpointBarrier会调用StreamTask内部实现的triggerCheckpoint()方法,并根据调用链条
triggerCheckpoint->performCheckpoint->checkpointState->CheckpointingOperation.executeCheckpointing
public void executeCheckpointing() throws Exception {
startSyncPartNano = System.nanoTime();
boolean failed = true;
try {
for (StreamOperator<?> op : allOperators) {
checkpointStreamOperator(op);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
checkpointMetaData.getCheckpointId(), owner.getName());
}
startAsyncPartNano = System.nanoTime();
checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
// at this point we are transferring ownership over snapshotInProgressList for cleanup to the thread
runAsyncCheckpointingAndAcknowledge();
failed = false;
if (LOG.isDebugEnabled()) {
LOG.debug("{} - finished synchronous part of checkpoint {}." +
"Alignment duration: {} ms, snapshot duration {} ms",
owner.getName(), checkpointMetaData.getCheckpointId(),
checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
checkpointMetrics.getSyncDurationMillis());
}
在executeCheckpointing方法里进行了两个操作,首先是对该task对应的所有StreamOperator对象调用checkpointStreamOperator(op)
checkpointStreamOperator代码:
private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
if (null != op) {
// first call the legacy checkpoint code paths
nonPartitionedStates.add(op.snapshotLegacyOperatorState(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions));
OperatorSnapshotResult snapshotInProgress = op.snapshotState(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions);
snapshotInProgressList.add(snapshotInProgress);
} else {
nonPartitionedStates.add(null);
OperatorSnapshotResult emptySnapshotInProgress = new OperatorSnapshotResult();
snapshotInProgressList.add(emptySnapshotInProgress);
}
}
StreamOperator的snapshotState(long checkpointId,long timestamp,CheckpointOptions checkpointOptions)方法最终由它的子类AbstractStreamOperator给出了一个final实现
@Override
public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();
CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);
try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
factory,
keyGroupRange,
getContainingTask().getCancelables())) {
snapshotState(snapshotContext);
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
if (null != keyedStateBackend) {
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
} catch (Exception snapshotException) {
try {
snapshotInProgress.cancel();
} catch (Exception e) {
snapshotException.addSuppressed(e);
}
throw new Exception("Could not complete snapshot " + checkpointId + " for operator " +
getOperatorName() + '.', snapshotException);
}
return snapshotInProgress;
}
上述代码里的snapshotState(snapshotContext)方法在不同的最终operator中有自己的具体实现。
executeCheckpointing的第二个操作是然后是调用runAsyncCheckpointingAndAcknowledge执行
所有的state固化文件操作并返回acknowledgeCheckpoint给JobManager。
private static final class AsyncCheckpointRunnable implements Runnable, Closeable {
.....
.....
if (asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
owner.getEnvironment().acknowledgeCheckpoint(
checkpointMetaData.getCheckpointId(),
checkpointMetrics,
subtaskState);
补充,在上文提到的performCheckpoint方法内,调用checkpointState方法之前,flink会把预先把checkpointBarrier发送到下游task,以便下游operator尽快开始他们的checkpoint进程,
这也是flink barrier机制生成barrier的地方。
synchronized (lock) {
if (isRunning) {
// we can do a checkpoint
// Since both state checkpointing and downstream barrier emission occurs in this
// lock scope, they are an atomic operation regardless of the order in which they occur.
// Given this, we immediately emit the checkpoint barriers, so the downstream operators
// can start their checkpoint work as soon as possible
operatorChain.broadcastCheckpointBarrier(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions);
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
return true;
public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
try {
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(barrier);
}
}
catch (InterruptedException e) {
throw new IOException("Interrupted while broadcasting checkpoint barrier");
}
}
上述描述的触发checkpoint调用路径是针对source task的链路。对于其余非souce的operator,
方法链路为StreamInputProcessor/StreamTwoInputProcessor.processInput() ->barrierHandler.getNextNonBlocked()->processBarrier ->notifyCheckpoint->triggerCheckpointOnBarrier
参考文档: