前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊flink的CheckpointScheduler

聊聊flink的CheckpointScheduler

作者头像
code4it
发布2018-12-25 10:42:56
1.3K0
发布2018-12-25 10:42:56
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下flink的CheckpointScheduler

CheckpointCoordinatorDeActivator

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java

代码语言:javascript
复制
/**
 * This actor listens to changes in the JobStatus and activates or deactivates the periodic
 * checkpoint scheduler.
 */
public class CheckpointCoordinatorDeActivator implements JobStatusListener {
 private final CheckpointCoordinator coordinator;
 public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
 this.coordinator = checkNotNull(coordinator);
 }
 @Override
 public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {
 if (newJobStatus == JobStatus.RUNNING) {
 // start the checkpoint scheduler
 coordinator.startCheckpointScheduler();
 } else {
 // anything else should stop the trigger for now
 coordinator.stopCheckpointScheduler();
 }
 }
}

- CheckpointCoordinatorDeActivator实现了JobStatusListener接口,在jobStatusChanges的时候,根据状态来调用coordinator.startCheckpointScheduler或者coordinator.stopCheckpointScheduler

CheckpointCoordinator.ScheduledTrigger

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

代码语言:javascript
复制
public class CheckpointCoordinator {
 /** Map from checkpoint ID to the pending checkpoint */
 private final Map<Long, PendingCheckpoint> pendingCheckpoints;
 /** The number of consecutive failed trigger attempts */
 private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);
 //......
 public void startCheckpointScheduler() {
 synchronized (lock) {
 if (shutdown) {
 throw new IllegalArgumentException("Checkpoint coordinator is shut down");
 }
 // make sure all prior timers are cancelled
 stopCheckpointScheduler();
 periodicScheduling = true;
 long initialDelay = ThreadLocalRandom.current().nextLong(
 minPauseBetweenCheckpointsNanos / 1_000_000L, baseInterval + 1L);
 currentPeriodicTrigger = timer.scheduleAtFixedRate(
 new ScheduledTrigger(), initialDelay, baseInterval, TimeUnit.MILLISECONDS);
 }
 }
 public void stopCheckpointScheduler() {
 synchronized (lock) {
 triggerRequestQueued = false;
 periodicScheduling = false;
 if (currentPeriodicTrigger != null) {
 currentPeriodicTrigger.cancel(false);
 currentPeriodicTrigger = null;
 }
 for (PendingCheckpoint p : pendingCheckpoints.values()) {
 p.abortError(new Exception("Checkpoint Coordinator is suspending."));
 }
 pendingCheckpoints.clear();
 numUnsuccessfulCheckpointsTriggers.set(0);
 }
 }
 private final class ScheduledTrigger implements Runnable {
 @Override
 public void run() {
 try {
 triggerCheckpoint(System.currentTimeMillis(), true);
 }
 catch (Exception e) {
 LOG.error("Exception while triggering checkpoint for job {}.", job, e);
 }
 }
 }
 //......
}

- CheckpointCoordinator的startCheckpointScheduler方法首先调用stopCheckpointScheduler取消PendingCheckpoint,之后使用timer.scheduleAtFixedRate重新调度ScheduledTrigger

- stopCheckpointScheduler会调用PendingCheckpoint.abortError来取消pendingCheckpoints,然后清空pendingCheckpoints(`Map<Long, PendingCheckpoint>`)以及numUnsuccessfulCheckpointsTriggers(`AtomicInteger`)

- ScheduledTrigger实现了Runnable接口,其run方法主要是调用triggerCheckpoint,传递的isPeriodic参数为true

CheckpointCoordinator.triggerCheckpoint

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

代码语言:javascript
复制
public class CheckpointCoordinator {
 /** Tasks who need to be sent a message when a checkpoint is started */
 private final ExecutionVertex[] tasksToTrigger;
 /** Tasks who need to acknowledge a checkpoint before it succeeds */
 private final ExecutionVertex[] tasksToWaitFor;
 /** Map from checkpoint ID to the pending checkpoint */
 private final Map<Long, PendingCheckpoint> pendingCheckpoints;
 /** The maximum number of checkpoints that may be in progress at the same time */
 private final int maxConcurrentCheckpointAttempts;
 /** The min time(in ns) to delay after a checkpoint could be triggered. Allows to
  * enforce minimum processing time between checkpoint attempts */
 private final long minPauseBetweenCheckpointsNanos;
 public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
 return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic).isSuccess();
 }
 @VisibleForTesting
 public CheckpointTriggerResult triggerCheckpoint(
 long timestamp,
 CheckpointProperties props,
 @Nullable String externalSavepointLocation,
 boolean isPeriodic) {
 // make some eager pre-checks
 synchronized (lock) {
 // abort if the coordinator has been shutdown in the meantime
 if (shutdown) {
 return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
 }
 // Don't allow periodic checkpoint if scheduling has been disabled
 if (isPeriodic && !periodicScheduling) {
 return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);
 }
 // validate whether the checkpoint can be triggered, with respect to the limit of
 // concurrent checkpoints, and the minimum time between checkpoints.
 // these checks are not relevant for savepoints
 if (!props.forceCheckpoint()) {
 // sanity check: there should never be more than one trigger request queued
 if (triggerRequestQueued) {
 LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
 return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
 }
 // if too many checkpoints are currently in progress, we need to mark that a request is queued
 if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
 triggerRequestQueued = true;
 if (currentPeriodicTrigger != null) {
 currentPeriodicTrigger.cancel(false);
 currentPeriodicTrigger = null;
 }
 return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
 }
 // make sure the minimum interval between checkpoints has passed
 final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
 final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
 if (durationTillNextMillis > 0) {
 if (currentPeriodicTrigger != null) {
 currentPeriodicTrigger.cancel(false);
 currentPeriodicTrigger = null;
 }
 // Reassign the new trigger to the currentPeriodicTrigger
 currentPeriodicTrigger = timer.scheduleAtFixedRate(
 new ScheduledTrigger(),
 durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
 return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 }
 }
 }
 // check if all tasks that we need to trigger are running.
 // if not, abort the checkpoint
 Execution[] executions = new Execution[tasksToTrigger.length];
 for (int i = 0; i < tasksToTrigger.length; i++) {
 Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
 if (ee == null) {
 LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
 tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
 job);
 return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
 } else if (ee.getState() == ExecutionState.RUNNING) {
 executions[i] = ee;
 } else {
 LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
 tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
 job,
 ExecutionState.RUNNING,
 ee.getState());
 return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
 }
 }
 // next, check if all tasks that need to acknowledge the checkpoint are running.
 // if not, abort the checkpoint
 Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
 for (ExecutionVertex ev : tasksToWaitFor) {
 Execution ee = ev.getCurrentExecutionAttempt();
 if (ee != null) {
 ackTasks.put(ee.getAttemptId(), ev);
 } else {
 LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
 ev.getTaskNameWithSubtaskIndex(),
 job);
 return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
 }
 }
 // we will actually trigger this checkpoint!
 // we lock with a special lock to make sure that trigger requests do not overtake each other.
 // this is not done with the coordinator-wide lock, because the 'checkpointIdCounter'
 // may issue blocking operations. Using a different lock than the coordinator-wide lock,
 // we avoid blocking the processing of 'acknowledge/decline' messages during that time.
 synchronized (triggerLock) {
 final CheckpointStorageLocation checkpointStorageLocation;
 final long checkpointID;
 //......
 final PendingCheckpoint checkpoint = new PendingCheckpoint(
 job,
 checkpointID,
 timestamp,
 ackTasks,
 props,
 checkpointStorageLocation,
 executor);
 if (statsTracker != null) {
 PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
 checkpointID,
 timestamp,
 props);
 checkpoint.setStatsCallback(callback);
 }
 // schedule the timer that will clean up the expired checkpoints
 final Runnable canceller = () -> {
 synchronized (lock) {
 // only do the work if the checkpoint is not discarded anyways
 // note that checkpoint completion discards the pending checkpoint object
 if (!checkpoint.isDiscarded()) {
 LOG.info("Checkpoint {} of job {} expired before completing.", checkpointID, job);
 checkpoint.abortExpired();
 pendingCheckpoints.remove(checkpointID);
 rememberRecentCheckpointId(checkpointID);
 triggerQueuedRequests();
 }
 }
 };
 try {
 // re-acquire the coordinator-wide lock
 synchronized (lock) {
 // since we released the lock in the meantime, we need to re-check
 // that the conditions still hold.
 if (shutdown) {
 return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
 }
 else if (!props.forceCheckpoint()) {
 if (triggerRequestQueued) {
 LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);
 return new CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
 }
 if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
 triggerRequestQueued = true;
 if (currentPeriodicTrigger != null) {
 currentPeriodicTrigger.cancel(false);
 currentPeriodicTrigger = null;
 }
 return new CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);
 }
 // make sure the minimum interval between checkpoints has passed
 final long earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;
 final long durationTillNextMillis = (earliestNext - System.nanoTime()) / 1_000_000;
 if (durationTillNextMillis > 0) {
 if (currentPeriodicTrigger != null) {
 currentPeriodicTrigger.cancel(false);
 currentPeriodicTrigger = null;
 }
 // Reassign the new trigger to the currentPeriodicTrigger
 currentPeriodicTrigger = timer.scheduleAtFixedRate(
 new ScheduledTrigger(),
 durationTillNextMillis, baseInterval, TimeUnit.MILLISECONDS);
 return new CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);
 }
 }
 LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
 pendingCheckpoints.put(checkpointID, checkpoint);
 ScheduledFuture<?> cancellerHandle = timer.schedule(
 canceller,
 checkpointTimeout, TimeUnit.MILLISECONDS);
 if (!checkpoint.setCancellerHandle(cancellerHandle)) {
 // checkpoint is already disposed!
 cancellerHandle.cancel(false);
 }
 // trigger the master hooks for the checkpoint
 final List<MasterState> masterStates = MasterHooks.triggerMasterHooks(masterHooks.values(),
 checkpointID, timestamp, executor, Time.milliseconds(checkpointTimeout));
 for (MasterState s : masterStates) {
 checkpoint.addMasterState(s);
 }
 }
 // end of lock scope
 final CheckpointOptions checkpointOptions = new CheckpointOptions(
 props.getCheckpointType(),
 checkpointStorageLocation.getLocationReference());
 // send the messages to the tasks that trigger their checkpoint
 for (Execution execution: executions) {
 execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
 }
 numUnsuccessfulCheckpointsTriggers.set(0);
 return new CheckpointTriggerResult(checkpoint);
 }
 catch (Throwable t) {
 // guard the map against concurrent modifications
 synchronized (lock) {
 pendingCheckpoints.remove(checkpointID);
 }
 int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
 LOG.warn("Failed to trigger checkpoint {} for job {}. ({} consecutive failed attempts so far)",
 checkpointID, job, numUnsuccessful, t);
 if (!checkpoint.isDiscarded()) {
 checkpoint.abortError(new Exception("Failed to trigger checkpoint", t));
 }
 try {
 checkpointStorageLocation.disposeOnFailure();
 }
 catch (Throwable t2) {
 LOG.warn("Cannot dispose failed checkpoint storage location {}", checkpointStorageLocation, t2);
 }
 return new CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
 }
 } // end trigger lock
 }
 //......
}

- 首先判断如果不是forceCheckpoint的话,则判断当前的pendingCheckpoints值是否超过maxConcurrentCheckpointAttempts,超过的话,立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);之后判断距离lastCheckpointCompletionNanos的时间是否大于等于minPauseBetweenCheckpointsNanos,否则fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS),确保checkpoint不被频繁触发

- 之后检查tasksToTrigger的任务(`触发checkpoint的时候需要通知到的task`)是否都处于RUNNING状态,不是的话则立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING)

- 之后检查tasksToWaitFor的任务(`需要在执行成功的时候ack checkpoint的任务`)是否都处于RUNNING状态,不是的话立刻fail fast,返回CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING)

- 前面几步检查通过了之后才开始真正的checkpoint的触发,它首先分配一个checkpointID,然后初始化checkpointStorageLocation,如果异常则返回CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);之后创建PendingCheckpoint,同时准备canceller(`用于在失效的时候执行abort操作`);之后对于不是forceCheckpoint的,再重新来一轮TOO_MANY_CONCURRENT_CHECKPOINTS、MINIMUM_TIME_BETWEEN_CHECKPOINTS校验

- 最后就是针对Execution,挨个触发execution的triggerCheckpoint操作,成功返回CheckpointTriggerResult(checkpoint),异常则返回CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION)

### Execution.triggerCheckpoint

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/executiongraph/Execution.java

代码语言:javascript
复制
public class Execution implements AccessExecution, Archiveable<ArchivedExecution>, LogicalSlot.Payload {
 public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
 final LogicalSlot slot = assignedResource;
 if (slot != null) {
 final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
 } else {
 LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
 "no longer running.");
 }
 }
 //......
}

- triggerCheckpoint主要是调用taskManagerGateway.triggerCheckpoint,这里的taskManagerGateway为RpcTaskManagerGateway

RpcTaskManagerGateway

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java

代码语言:javascript
复制
/**
 * Implementation of the {@link TaskManagerGateway} for Flink's RPC system.
 */
public class RpcTaskManagerGateway implements TaskManagerGateway {
 private final TaskExecutorGateway taskExecutorGateway;
 public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
 taskExecutorGateway.triggerCheckpoint(
 executionAttemptID,
 checkpointId,
 timestamp,
 checkpointOptions);
 }
 //......
}

- RpcTaskManagerGateway的triggerCheckpoint方法调用taskExecutorGateway.triggerCheckpoint,这里的taskExecutorGateway为AkkaInvocationHandler,通过rpc通知TaskExecutor

TaskExecutor.triggerCheckpoint

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskexecutor/TaskExecutor.java

代码语言:javascript
复制
/**
 * TaskExecutor implementation. The task executor is responsible for the execution of multiple
 * {@link Task}.
 */
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
 public CompletableFuture<Acknowledge> triggerCheckpoint(
 ExecutionAttemptID executionAttemptID,
 long checkpointId,
 long checkpointTimestamp,
 CheckpointOptions checkpointOptions) {
 log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
 final Task task = taskSlotTable.getTask(executionAttemptID);
 if (task != null) {
 task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
 return CompletableFuture.completedFuture(Acknowledge.get());
 } else {
 final String message = "TaskManager received a checkpoint request for unknown task " + executionAttemptID + '.';
 log.debug(message);
 return FutureUtils.completedExceptionally(new CheckpointException(message));
 }
 }
 //......
}

- TaskExecutor的triggerCheckpoint方法这里调用task.triggerCheckpointBarrier

Task.triggerCheckpointBarrier

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/taskmanager/Task.java

代码语言:javascript
复制
public class Task implements Runnable, TaskActions, CheckpointListener {
 @Nullable
 private volatile AbstractInvokable invokable;
 public void triggerCheckpointBarrier(
 final long checkpointID,
 long checkpointTimestamp,
 final CheckpointOptions checkpointOptions) {
 final AbstractInvokable invokable = this.invokable;
 final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
 if (executionState == ExecutionState.RUNNING && invokable != null) {
 // build a local closure
 final String taskName = taskNameWithSubtask;
 final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
 FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
 Runnable runnable = new Runnable() {
 @Override
 public void run() {
 // set safety net from the task's context for checkpointing thread
 LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
 FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
 try {
 boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
 if (!success) {
 checkpointResponder.declineCheckpoint(
 getJobID(), getExecutionId(), checkpointID,
 new CheckpointDeclineTaskNotReadyException(taskName));
 }
 }
 catch (Throwable t) {
 if (getExecutionState() == ExecutionState.RUNNING) {
 failExternally(new Exception(
 "Error while triggering checkpoint " + checkpointID + " for " +
 taskNameWithSubtask, t));
 } else {
 LOG.debug("Encountered error while triggering checkpoint {} for " +
 "{} ({}) while being not in state running.", checkpointID,
 taskNameWithSubtask, executionId, t);
 }
 } finally {
 FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
 }
 }
 };
 executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
 }
 else {
 LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
 // send back a message that we did not do the checkpoint
 checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
 new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
 }
 }
 //......
}

- Task的triggerCheckpointBarrier方法首先判断executionState是否RUNNING以及invokable是否不为null,不满足条件则执行checkpointResponder.declineCheckpoint

- 满足条件则执行executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId))

- 这个runnable方法里头会执行invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions),这里的invokable为SourceStreamTask

### SourceStreamTask.triggerCheckpoint

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java

代码语言:javascript
复制
@Internal
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
 extends StreamTask<OUT, OP> {
 private volatile boolean externallyInducedCheckpoints;
 @Override
 public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
 if (!externallyInducedCheckpoints) {
 return super.triggerCheckpoint(checkpointMetaData, checkpointOptions);
 }
 else {
 // we do not trigger checkpoints here, we simply state whether we can trigger them
 synchronized (getCheckpointLock()) {
 return isRunning();
 }
 }
 }
 //......
}

- SourceStreamTask的triggerCheckpoint先判断,如果externallyInducedCheckpoints为false,则调用父类StreamTask的triggerCheckpoint

StreamTask.triggerCheckpoint

代码语言:javascript
复制
@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 extends AbstractInvokable
 implements AsyncExceptionHandler {
 @Override
 public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
 try {
 // No alignment if we inject a checkpoint
 CheckpointMetrics checkpointMetrics = new CheckpointMetrics()
 .setBytesBufferedInAlignment(0L)
 .setAlignmentDurationNanos(0L);
 return performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
 }
 catch (Exception e) {
 // propagate exceptions only if the task is still in "running" state
 if (isRunning) {
 throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() +
 " for operator " + getName() + '.', e);
 } else {
 LOG.debug("Could not perform checkpoint {} for operator {} while the " +
 "invokable was not in state running.", checkpointMetaData.getCheckpointId(), getName(), e);
 return false;
 }
 }
 }
 private boolean performCheckpoint(
 CheckpointMetaData checkpointMetaData,
 CheckpointOptions checkpointOptions,
 CheckpointMetrics checkpointMetrics) throws Exception {
 LOG.debug("Starting checkpoint ({}) {} on task {}",
 checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), getName());
 synchronized (lock) {
 if (isRunning) {
 // we can do a checkpoint
 // All of the following steps happen as an atomic step from the perspective of barriers and
 // records/watermarks/timers/callbacks.
 // We generally try to emit the checkpoint barrier as soon as possible to not affect downstream
 // checkpoint alignments
 // Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.
 //           The pre-barrier work should be nothing or minimal in the common case.
 operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
 // Step (2): Send the checkpoint barrier downstream
 operatorChain.broadcastCheckpointBarrier(
 checkpointMetaData.getCheckpointId(),
 checkpointMetaData.getTimestamp(),
 checkpointOptions);
 // Step (3): Take the state snapshot. This should be largely asynchronous, to not
 //           impact progress of the streaming topology
 checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
 return true;
 }
 else {
 final CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
 Exception exception = null;
 for (StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>> streamRecordWriter : streamRecordWriters) {
 try {
 streamRecordWriter.broadcastEvent(message);
 } catch (Exception e) {
 exception = ExceptionUtils.firstOrSuppressed(
 new Exception("Could not send cancel checkpoint marker to downstream tasks.", e),
 exception);
 }
 }
 if (exception != null) {
 throw exception;
 }
 return false;
 }
 }
 }
 private void checkpointState(
 CheckpointMetaData checkpointMetaData,
 CheckpointOptions checkpointOptions,
 CheckpointMetrics checkpointMetrics) throws Exception {
 CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
 checkpointMetaData.getCheckpointId(),
 checkpointOptions.getTargetLocation());
 CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
 this,
 checkpointMetaData,
 checkpointOptions,
 storage,
 checkpointMetrics);
 checkpointingOperation.executeCheckpointing();
 }
 //......
}

- StreamTask的triggerCheckpoint方法的主要处理逻辑在performCheckpoint方法上,该方法针对task的isRunning分别进行不同处理

- isRunning为true的时候,这里头分了三步来处理,第一步执行operatorChain.prepareSnapshotPreBarrier,第二步执行operatorChain.broadcastCheckpointBarrier,第三步执行checkpointState方法,checkpointState里头创建CheckpointingOperation,然后调用checkpointingOperation.executeCheckpointing()

- 如果isRunning为false,则这里streamRecordWriter.broadcastEvent(message),这里的message为CancelCheckpointMarker

OperatorChain.prepareSnapshotPreBarrier

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OperatorChain.java

代码语言:javascript
复制
@Internal
public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {
 public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
 // go forward through the operator chain and tell each operator
 // to prepare the checkpoint
 final StreamOperator<?>[] operators = this.allOperators;
 for (int i = operators.length - 1; i >= 0; --i) {
 final StreamOperator<?> op = operators[i];
 if (op != null) {
 op.prepareSnapshotPreBarrier(checkpointId);
 }
 }
 }
 //......
}

- OperatorChain的prepareSnapshotPreBarrier会遍历allOperators挨个调用StreamOperator的prepareSnapshotPreBarrier方法

OperatorChain.broadcastCheckpointBarrier

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/OperatorChain.java

代码语言:javascript
复制
@Internal
public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements StreamStatusMaintainer {
 public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
 CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
 for (RecordWriterOutput<?> streamOutput : streamOutputs) {
 streamOutput.broadcastEvent(barrier);
 }
 }
 //......
}

- OperatorChain的broadcastCheckpointBarrier方法则会遍历streamOutputs挨个调用streamOutput的broadcastEvent方法

CheckpointingOperation.executeCheckpointing

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/StreamTask.java

代码语言:javascript
复制
private static final class CheckpointingOperation {
 //......
 public void executeCheckpointing() throws Exception {
 startSyncPartNano = System.nanoTime();
 try {
 for (StreamOperator<?> op : allOperators) {
 checkpointStreamOperator(op);
 }
 if (LOG.isDebugEnabled()) {
 LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}",
 checkpointMetaData.getCheckpointId(), owner.getName());
 }
 startAsyncPartNano = System.nanoTime();
 checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000);
 // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
 AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
 owner,
 operatorSnapshotsInProgress,
 checkpointMetaData,
 checkpointMetrics,
 startAsyncPartNano);
 owner.cancelables.registerCloseable(asyncCheckpointRunnable);
 owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
 if (LOG.isDebugEnabled()) {
 LOG.debug("{} - finished synchronous part of checkpoint {}. " +
 "Alignment duration: {} ms, snapshot duration {} ms",
 owner.getName(), checkpointMetaData.getCheckpointId(),
 checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
 checkpointMetrics.getSyncDurationMillis());
 }
 } catch (Exception ex) {
 // Cleanup to release resources
 for (OperatorSnapshotFutures operatorSnapshotResult : operatorSnapshotsInProgress.values()) {
 if (null != operatorSnapshotResult) {
 try {
 operatorSnapshotResult.cancel();
 } catch (Exception e) {
 LOG.warn("Could not properly cancel an operator snapshot result.", e);
 }
 }
 }
 if (LOG.isDebugEnabled()) {
 LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. " +
 "Alignment duration: {} ms, snapshot duration {} ms",
 owner.getName(), checkpointMetaData.getCheckpointId(),
 checkpointMetrics.getAlignmentDurationNanos() / 1_000_000,
 checkpointMetrics.getSyncDurationMillis());
 }
 owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex);
 }
 }
 @SuppressWarnings("deprecation")
 private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
 if (null != op) {
 OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
 checkpointMetaData.getCheckpointId(),
 checkpointMetaData.getTimestamp(),
 checkpointOptions,
 storageLocation);
 operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
 }
 }
 private enum AsyncCheckpointState {
 RUNNING,
 DISCARDED,
 COMPLETED
 }
 }

- CheckpointingOperation定义在StreamTask类里头,executeCheckpointing方法先对所有的StreamOperator执行checkpointStreamOperator操作,checkpointStreamOperator方法会调用StreamOperator的snapshotState方法,之后创建AsyncCheckpointRunnable任务并提交异步运行

## AbstractStreamOperator.snapshotState

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java

代码语言:javascript
复制
@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
 implements StreamOperator<OUT>, Serializable {
 @Override
 public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
 CheckpointStreamFactory factory) throws Exception {
 KeyGroupRange keyGroupRange = null != keyedStateBackend ?
 keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
 OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
 try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
 checkpointId,
 timestamp,
 factory,
 keyGroupRange,
 getContainingTask().getCancelables())) {
 snapshotState(snapshotContext);
 //......
 } catch (Exception snapshotException) {
 //......
 }
 return snapshotInProgress;
 }
 public void snapshotState(StateSnapshotContext context) throws Exception {
 final KeyedStateBackend<?> keyedStateBackend = getKeyedStateBackend();
 //TODO all of this can be removed once heap-based timers are integrated with RocksDB incremental snapshots
 if (keyedStateBackend instanceof AbstractKeyedStateBackend &&
 ((AbstractKeyedStateBackend<?>) keyedStateBackend).requiresLegacySynchronousTimerSnapshots()) {
 KeyedStateCheckpointOutputStream out;
 try {
 out = context.getRawKeyedOperatorStateOutput();
 } catch (Exception exception) {
 throw new Exception("Could not open raw keyed operator state stream for " +
 getOperatorName() + '.', exception);
 }
 try {
 KeyGroupsList allKeyGroups = out.getKeyGroupList();
 for (int keyGroupIdx : allKeyGroups) {
 out.startNewKeyGroup(keyGroupIdx);
 timeServiceManager.snapshotStateForKeyGroup(
 new DataOutputViewStreamWrapper(out), keyGroupIdx);
 }
 } catch (Exception exception) {
 throw new Exception("Could not write timer service of " + getOperatorName() +
 " to checkpoint state stream.", exception);
 } finally {
 try {
 out.close();
 } catch (Exception closeException) {
 LOG.warn("Could not close raw keyed operator state stream for {}. This " +
 "might have prevented deleting some state data.", getOperatorName(), closeException);
 }
 }
 }
 }
 //......
}

- AbstractStreamOperator的snapshotState方法只有在keyedStateBackend是AbstractKeyedStateBackend类型,而且requiresLegacySynchronousTimerSnapshots为true的条件下才会操作,具体是触发timeServiceManager.snapshotStateForKeyGroup(new DataOutputViewStreamWrapper(out), keyGroupIdx);不过它有不同的子类可能覆盖了snapshotState方法,比如AbstractUdfStreamOperator

AbstractUdfStreamOperator

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java

代码语言:javascript
复制
@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
 extends AbstractStreamOperator<OUT>
 implements OutputTypeConfigurable<OUT> {
 @Override
 public void snapshotState(StateSnapshotContext context) throws Exception {
 super.snapshotState(context);
 StreamingFunctionUtils.snapshotFunctionState(context, getOperatorStateBackend(), userFunction);
 }
 //......
}

- AbstractUdfStreamOperator覆盖了父类AbstractStreamOperator的snapshotState方法,新增了StreamingFunctionUtils.snapshotFunctionState操作

StreamingFunctionUtils.snapshotFunctionState

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/util/functions/StreamingFunctionUtils.java

代码语言:javascript
复制
@Internal
public final class StreamingFunctionUtils {
 public static void snapshotFunctionState(
 StateSnapshotContext context,
 OperatorStateBackend backend,
 Function userFunction) throws Exception {
 Preconditions.checkNotNull(context);
 Preconditions.checkNotNull(backend);
 while (true) {
 if (trySnapshotFunctionState(context, backend, userFunction)) {
 break;
 }
 // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
 if (userFunction instanceof WrappingFunction) {
 userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
 } else {
 break;
 }
 }
 }
 private static boolean trySnapshotFunctionState(
 StateSnapshotContext context,
 OperatorStateBackend backend,
 Function userFunction) throws Exception {
 if (userFunction instanceof CheckpointedFunction) {
 ((CheckpointedFunction) userFunction).snapshotState(context);
 return true;
 }
 if (userFunction instanceof ListCheckpointed) {
 @SuppressWarnings("unchecked")
 List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
 snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
 ListState<Serializable> listState = backend.
 getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
 listState.clear();
 if (null != partitionableState) {
 try {
 for (Serializable statePartition : partitionableState) {
 listState.add(statePartition);
 }
 } catch (Exception e) {
 listState.clear();
 throw new Exception("Could not write partitionable state to operator " +
 "state backend.", e);
 }
 }
 return true;
 }
 return false;
 }
 //......
}

- snapshotFunctionState方法,这里执行了trySnapshotFunctionState操作,这里userFunction的类型,如果实现了CheckpointedFunction接口,则调用CheckpointedFunction.snapshotState,如果实现了ListCheckpointed接口,则调用ListCheckpointed.snapshotState方法,注意这里先clear了ListState,然后调用ListState.add方法将返回的List添加到ListState中

小结

- flink的CheckpointCoordinatorDeActivator在job的status为RUNNING的时候会触发CheckpointCoordinator的startCheckpointScheduler,非RUNNING的时候调用CheckpointCoordinator的stopCheckpointScheduler方法

- CheckpointCoordinator的startCheckpointScheduler主要是注册了ScheduledTrigger任务,其run方法执行triggerCheckpoint操作,triggerCheckpoint方法在真正触发checkpoint之前会进行一系列的校验,不满足则立刻fail fast,其中可能的原因有(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS、CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS、NOT_ALL_REQUIRED_TASKS_RUNNING);满足条件的话,就是挨个遍历executions,调用Execution.triggerCheckpoint,它借助taskManagerGateway.triggerCheckpoint来通过rpc调用TaskExecutor的triggerCheckpoint方法

- TaskExecutor的triggerCheckpoint主要是调用Task的triggerCheckpointBarrier方法,后者主要是异步执行一个runnable,里头的run方法是调用invokable.triggerCheckpoint,这里的invokable为SourceStreamTask,而它主要是调用父类StreamTask的triggerCheckpoint方法,该方法的主要逻辑在performCheckpoint操作上;performCheckpoint在isRunning为true的时候,分了三步来处理,第一步执行operatorChain.prepareSnapshotPreBarrier,第二步执行operatorChain.broadcastCheckpointBarrier,第三步执行checkpointState方法,checkpointState里头创建CheckpointingOperation,然后调用checkpointingOperation.executeCheckpointing()

- CheckpointingOperation的executeCheckpointing方法会对所有的StreamOperator执行checkpointStreamOperator操作,而checkpointStreamOperator方法会调用StreamOperator的snapshotState方法;AbstractStreamOperator的snapshotState方法只有在keyedStateBackend是AbstractKeyedStateBackend类型,而且requiresLegacySynchronousTimerSnapshots为true的条件下才会操作

- AbstractUdfStreamOperator覆盖了父类AbstractStreamOperator的snapshotState方法,新增了StreamingFunctionUtils.snapshotFunctionState操作,该操作会根据userFunction的类型调用相应的方法(`如果实现了CheckpointedFunction接口,则调用CheckpointedFunction.snapshotState,如果实现了ListCheckpointed接口,则调用ListCheckpointed.snapshotState方法`)

doc

- [Working with State](https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#working-with-state)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-12-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CheckpointCoordinatorDeActivator
  • CheckpointCoordinator.ScheduledTrigger
    • CheckpointCoordinator.triggerCheckpoint
      • RpcTaskManagerGateway
      • TaskExecutor.triggerCheckpoint
        • Task.triggerCheckpointBarrier
        • StreamTask.triggerCheckpoint
          • OperatorChain.prepareSnapshotPreBarrier
            • OperatorChain.broadcastCheckpointBarrier
              • CheckpointingOperation.executeCheckpointing
                • AbstractUdfStreamOperator
                  • StreamingFunctionUtils.snapshotFunctionState
                  • 小结
                  • doc
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档