前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Checkpoint对齐机制源码分析

Checkpoint对齐机制源码分析

作者头像
Flink实战剖析
发布2022-04-18 13:18:42
5330
发布2022-04-18 13:18:42
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

checkpoint是保证Flink状态容错的重要机制,通过checkpoint可以实现不同的数据语义,也就是我们所说的Exactly-Once与At-Least-Once,通过不同的checkpoint机制实现不同的数据语义,这里所说的机制表示的是checkpoint对齐机制:对齐,实现Exactly-Once语义,不对齐,实现At-Least-Once语义。

官方文档解释:

对齐通常发生在需要接受上游多个输入流的操作中,例如keyBy、join等操作,接下来将会从源码角度分析对齐机制的实现。

checkpoint机制的处理发生在StreamInputProcessor/StreamTwoInputProcessor中,该类主要负责从远端读取数据然后交给StreamOperator处理,数据读取由CheckpointBarrierHandler完成,同时也负责对齐机制的处理,由getNextNonBlocked方法完成,该接口有两个不同的实现类BarrierBuffer与BarrierTracker:

代码语言:javascript
复制
//在StreamInputProcessor/StreamTwoInputProcessor 中创建CheckpointBarrierHandler
//被调用
public static CheckpointBarrierHandler createCheckpointBarrierHandler(
      StreamTask<?, ?> checkpointedTask,
      CheckpointingMode checkpointMode,
      IOManager ioManager,
      InputGate inputGate,
      Configuration taskManagerConfig) throws IOException {

    CheckpointBarrierHandler barrierHandler;
    if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
      long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
      if (!(maxAlign == -1 || maxAlign > 0)) {
        throw new IllegalConfigurationException(
          TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
          + " must be positive or -1 (infinite)");
      }
      if (taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)) {
        barrierHandler = new BarrierBuffer(inputGate, new CachedBufferBlocker(inputGate.getPageSize()), maxAlign);
      } else {
        barrierHandler = new BarrierBuffer(inputGate, new BufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);
      }
    } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
      barrierHandler = new BarrierTracker(inputGate);
    } else {
      throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode);
    }

    if (checkpointedTask != null) {
      barrierHandler.registerCheckpointEventHandler(checkpointedTask);
    }
    return barrierHandler;
  }

由此可见BarrierBuffer用来实现对齐机制,BarrierTracker用来实现非对齐机制。

对齐-BarrierBuffer

在BarrierBuffer包含了对齐使用的几个重要的成员变量:BufferBlocker类型的bufferBlocker、boolean类型数组的blockedChannels ,BufferBlocker内部包含一个ArraryDeque的队列,用于缓存对齐时的数据,blockedChannels用于判断通道是否处于对齐状态中。

对齐流程方法:

代码语言:javascript
复制
@Override
  public BufferOrEvent getNextNonBlocked() throws Exception {
    while (true) {

      //.....
      BufferOrEvent bufferOrEvent = next.get();
      if (isBlocked(bufferOrEvent.getChannelIndex())) {
         //当前获取数据channel处于对齐状态中则将数据添加到缓存中
         //也就是 BufferBlocker中
        bufferBlocker.add(bufferOrEvent);
        checkSizeLimit();
      }
      else if (bufferOrEvent.isBuffer()) {
        //buffer 则直接返回
        return bufferOrEvent;
      }
      else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
        if (!endOfStream) {
          // 处理CheckpointBarrier 类型的数据
          processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
        }
      }
      //.......
    }
  }

processBarrier方法:

代码语言:javascript
复制
  private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
    //barrierId表示当前批次的checkpointId
    final long barrierId = receivedBarrier.getId();
    // 如果是单输入流 则直接触发checkpoint
    if (totalNumberOfInputChannels == 1) {
      if (barrierId > currentCheckpointId) {
        // new checkpoint
        currentCheckpointId = barrierId;
        notifyCheckpoint(receivedBarrier);
      }
      return;
    }
    //多输入流的处理,numBarriersReceived表示已接收到的
     //当前批次checkpointId 的channel 个数
     //numBarriersReceived >0 表示正在对齐过程中
    if (numBarriersReceived > 0) {
      // this is only true if some alignment is already progress and was not canceled
      if (barrierId == currentCheckpointId) {
        // regular case
        onBarrier(channelIndex);
      }
      else if (barrierId > currentCheckpointId) {
        // 如果到来的barrierId也就是checkpointId 大于当前正在
        //发生对齐机制的checkpointId ,那么会取消当前的checkpoint(比喻说超时导致)
        // 并且重置blockedChannels状态 重置numBarriersReceived为0
        //然后开启下一次(barrierId) checkpoint对齐机制
        LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
            "Skipping current checkpoint.",
          inputGate.getOwningTaskName(),
          barrierId,
          currentCheckpointId);

        notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
        releaseBlocksAndResetBarriers();
        beginNewAlignment(barrierId, channelIndex);
      }
      else {
        // ignore trailing barrier from an earlier checkpoint (obsolete now)
        return;
      }
    }
    else if (barrierId > currentCheckpointId) {
      //numBarriersReceived==0   开启一次新的chechpoint
      //将对应的blockedChannels置为阻塞状态true
      beginNewAlignment(barrierId, channelIndex);
    }
    else {
      // either the current checkpoint was canceled (numBarriers == 0) or
      // this barrier is from an old subsumed checkpoint
      return;
    }

    // check if we have all barriers - since canceled checkpoints always have zero barriers
    // this can only happen on a non canceled checkpoint
    if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
      // actually trigger checkpoint
      if (LOG.isDebugEnabled()) {
        LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
          inputGate.getOwningTaskName(),
          receivedBarrier.getId(),
          receivedBarrier.getTimestamp());
      }
      //对齐完成 将缓存的数据(BufferBlocker中的数据)插入到消费队列中
      //被消费 ,然后触发checkpoint
      releaseBlocksAndResetBarriers();
      notifyCheckpoint(receivedBarrier);
    }
  }

对齐总体流程:在接受上游多个输入情况,当从一个输入中接受到checkpointBarrier时,会暂时将该输入channel 置为阻塞状态,并且将后续从该channel读取到的数据暂存在缓存中,当后续所有channel的checkpointBarrier都达到后,将暂存数据置为可消费状态,并且开始checkpoint。

非对齐-BarrierTracker

对于非对齐机制相对来说就比较简单,不会发生数据缓存,当所有的channel的checkpointBarrier达到就开始执行checkpoint。

代码语言:javascript
复制
public BufferOrEvent getNextNonBlocked() throws Exception {
    while (true) {
      Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent();
      if (!next.isPresent()) {
        // buffer or input exhausted
        return null;
      }

      BufferOrEvent bufferOrEvent = next.get();
      if (bufferOrEvent.isBuffer()) {
        return bufferOrEvent;
      }
      else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
        processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
      }
      else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
        processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
      }
      else {
        // some other event
        return bufferOrEvent;
      }
    }
  }

processBarrier方法:

代码语言:javascript
复制
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
    final long barrierId = receivedBarrier.getId();
    // 如果只有一个输入则直接触发checkpoint
    if (totalNumberOfInputChannels == 1) {
      notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
      return;
    }

    // general path for multiple input channels
    if (LOG.isDebugEnabled()) {
      LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);
    }

    // find the checkpoint barrier in the queue of pending barriers
    CheckpointBarrierCount cbc = null;
    int pos = 0;
   //寻找同一批次的checkpoint
    for (CheckpointBarrierCount next : pendingCheckpoints) {
      if (next.checkpointId == barrierId) {
        cbc = next;
        break;
      }
      pos++;
    }

    if (cbc != null) {
      // add one to the count to that barrier and check for completion
      int numBarriersNew = cbc.incrementBarrierCount();
      if (numBarriersNew == totalNumberOfInputChannels) {
        // 集齐七龙珠 可以触发checkpoint了
        for (int i = 0; i <= pos; i++) {
          pendingCheckpoints.pollFirst();
        }

        // notify the listener
        if (!cbc.isAborted()) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Received all barriers for checkpoint {}", barrierId);
          }

          notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
        }
      }
    }
    else {
      // 新的开始了
      if (barrierId > latestPendingCheckpointID) {
        latestPendingCheckpointID = barrierId;
        pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));

        // make sure we do not track too many checkpoints
        if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
          pendingCheckpoints.pollFirst();
        }
      }
    }
  }

非对齐总体流程:在接受上游多个输入情况下,每一个批次的checkpoint不会发生数据缓存,会直接交给下游去处理,checkpoint信息会被缓存在一个CheckpointBarrierCount类型的队列中,CheckpointBarrierCount标识了一次checkpoint与其channel输入checkpointBarrier个数,当checkpointBarrier个数与channel个数相同则会触发checkpoint。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-01-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档