前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文搞懂 Flink 处理 Barrier 全过程

一文搞懂 Flink 处理 Barrier 全过程

作者头像
shengjk1
发布2020-07-06 22:41:04
2.2K0
发布2020-07-06 22:41:04
举报
文章被收录于专栏:码字搬砖码字搬砖

上次我们讲到了 Flink Checkpoint Barrier 全流程 还有 Flink 消费消息的全流程

分类

Flink 处理 Barrier 分两种:

  1. barrier 对齐
  2. barrier 不对齐 对应的类

我们就以 BarrierBuffer ( barrier 对齐 ) 为例。

正文

关键就是 getNextNonBlocked 方法

代码语言:javascript
复制
	@Override
	// 从 ResultSubPartition 中获取数据并处理 barrier
	public BufferOrEvent getNextNonBlocked() throws Exception {
		while (true) {
			// process buffered BufferOrEvents before grabbing new ones
			Optional<BufferOrEvent> next;
			//barrier block 解除后 currentBuffered 不为 null,其他情况都是 null 了
			if (currentBuffered == null) {
				// 如果当前有堆积的消息,直接从 InputGate 中获取,否则从缓存中获取(通过 CachedBufferBlocker 缓存的数据)
				// 通过 inputGate 中的 inputChannel 来获取 ResultSubPartition 中的数据
				next = inputGate.getNextBufferOrEvent();
			}else {
				//barrier block 解除后 next 中的 value 不为 null
				next = Optional.ofNullable(currentBuffered.getNext());
				if (!next.isPresent()) {
					//完成缓冲数据的消费
					completeBufferedSequence();
					return getNextNonBlocked();
				}
			}

			if (!next.isPresent()) {
				if (!endOfStream) {
					// end of input stream. stream continues with the buffered data
					endOfStream = true;
					releaseBlocksAndResetBarriers();
					return getNextNonBlocked();
				}
				else {
					// final end of both input and buffered data
					return null;
				}
			}
			
			//当 barrier 全部对齐之后,先消费 bufferBlocker 中的 ArrayDeque<BufferOrEvent> currentBuffers 的数据
			BufferOrEvent bufferOrEvent = next.get();
			if (isBlocked(bufferOrEvent.getChannelIndex())) {
				// if the channel is blocked, we just store the BufferOrEvent
				//  barrier 对齐 缓存数据
				bufferBlocker.add(bufferOrEvent);
				checkSizeLimit();
			}
			else if (bufferOrEvent.isBuffer()) {
				return bufferOrEvent;
			}
			// 处理 barrier
			else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
				if (!endOfStream) {
					// process barriers only if there is a chance of the checkpoint completing
					//除 trigger task 外的 operator 都是在这里做的 checkpoint 只有通过 processInput 消费到才表示 barrier 经过了上游算子
					processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
				}
			}
			else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
				processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
			}
			else {
				if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
					processEndOfPartition();
				}
				return bufferOrEvent;
			}
		}
	}

当没有发生 barrier 对齐完成 这个动作时,currentBuffered == null,currentBuffered 就是当前要处理的 buffer,当 buffer 是数据的时候它就正常消费数据走 Flink 消费消息的全流程,当遇到 barrier 时,开始处理 barrier

代码语言:javascript
复制
// 一个 opertor 必须收到从每个 inputchannel 发过来的同一序号的 barrier 之后才能发起本节点的 checkpoint,
	//  如果有的 channel 的数据处理的快了,那该 barrier 后的数据还需要缓存起来,
	//  如果有的 inputchannel 被关闭了,那它就不会再发送 barrier 过来了
	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
		final long barrierId = receivedBarrier.getId();

		// fast path for single channel cases
		if (totalNumberOfInputChannels == 1) {
			if (barrierId > currentCheckpointId) {
				// new checkpoint
				currentCheckpointId = barrierId;
				// 触发 checkpoint
				notifyCheckpoint(receivedBarrier);
			}
			return;
		}

		// -- general code path for multiple input channels --
		// 大于等于第二次处理 barrier 的时候
		if (numBarriersReceived > 0) {
			// this is only true if some alignment is already progress and was not canceled

			if (barrierId == currentCheckpointId) {
				// regular case
				//阻塞 channelIndex 对应的 channel 其实就是 blockedChannels[channelIndex] = true;
				onBarrier(channelIndex);
			}else if (barrierId > currentCheckpointId) {
				// we did not complete the current checkpoint, another started before
				LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
						"Skipping current checkpoint.",
					inputGate.getOwningTaskName(),
					barrierId,
					currentCheckpointId);

				// let the task know we are not completing this
				notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));

				// abort the current checkpoint
				releaseBlocksAndResetBarriers();

				// begin a the new checkpoint
				beginNewAlignment(barrierId, channelIndex);
			}else {
				// ignore trailing barrier from an earlier checkpoint (obsolete now)
				return;
			}
		}else if (barrierId > currentCheckpointId) {
			// first barrier of a new checkpoint
			beginNewAlignment(barrierId, channelIndex);
		}else {
			// either the current checkpoint was canceled (numBarriers == 0) or
			// this barrier is from an old subsumed checkpoint
			return;
		}

		// check if we have all barriers - since canceled checkpoints always have zero barriers
		// this can only happen on a non canceled checkpoint
		if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
			// actually trigger checkpoint
			if (LOG.isDebugEnabled()) {
				LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
					inputGate.getOwningTaskName(),
					receivedBarrier.getId(),
					receivedBarrier.getTimestamp());
			}

			releaseBlocksAndResetBarriers();
			// 当收到全部的 barrier 之后,就会触发 notifyCheckpoint(),
			// 该方法又会调用 StreamTask 的 triggerCheckpoint ,和之前的operator是一样的
			notifyCheckpoint(receivedBarrier);
		}
	}

numBarriersReceived 的默认值是0,所以第一个 barrier 进来后,会进入 beginNewAlignment 方法

代码语言:javascript
复制
private void beginNewAlignment(long checkpointId, int channelIndex) throws IOException {
		currentCheckpointId = checkpointId;
		//numBarriersReceived++ 并设置 channelIndex 对应的 channel 为 block channel
		onBarrier(channelIndex);

		startOfAlignmentTimestamp = System.nanoTime();

		if (LOG.isDebugEnabled()) {
			LOG.debug("{}: Starting stream alignment for checkpoint {}.",
				inputGate.getOwningTaskName(),
				checkpointId);
		}
	}

当再有其他相同的 barrier 进入时,barrierId == currentCheckpointId 为 true,直到 numBarriersReceived + numClosedChannels == totalNumberOfInputChannels 时,触发 notifyCheckpoint,并报告 alignment buffer 以及 alignment time。(彩蛋: 稍后会更新 checkpoint 全流程欢迎关注 )。

如果其他的 channel 中的 barrier 延迟了,即 numBarriersReceived + numClosedChannels != totalNumberOfInputChannels,已经 receive barrier 对应的 channel 数据会进入 bufferBlocker。

bufferBlocker 是通过 ArrayDeque currentBuffers 来存储数据的,也就是说默认情况下 bufferBlocker.currentBuffers 会无限增大。

当 numBarriersReceived + numClosedChannels == totalNumberOfInputChannels 时,会先进行 releaseBlocksAndResetBarriers() 在进行 notifyCheckpoint。

releaseBlocksAndResetBarriers 主要的目的是要先消费已加入缓存中的数据。

代码语言:javascript
复制
/**
	 * Releases the blocks on all channels and resets the barrier count.
	 * Makes sure the just written data is the next to be consumed.
	 */
	// 将 bufferBlocker 里面缓存的数据 bufferOrEvent 赋值给 currentBuffered
	private void releaseBlocksAndResetBarriers() throws IOException {
		LOG.debug("{}: End of stream alignment, feeding buffered data back.",
			inputGate.getOwningTaskName());

		for (int i = 0; i < blockedChannels.length; i++) {
			blockedChannels[i] = false;
		}

		if (currentBuffered == null) {
			// common case: no more buffered data
			currentBuffered = bufferBlocker.rollOverReusingResources();
			if (currentBuffered != null) {
				currentBuffered.open();
			}
		}else {
			// uncommon case: buffered data pending
			// push back the pending data, if we have any
			LOG.debug("{}: Checkpoint skipped via buffered data:" +
					"Pushing back current alignment buffers and feeding back new alignment data first.",
				inputGate.getOwningTaskName());

			// since we did not fully drain the previous sequence, we need to allocate a new buffer for this one
			BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources();
			if (bufferedNow != null) {
				bufferedNow.open();
				queuedBuffered.addFirst(currentBuffered);
				numQueuedBytes += currentBuffered.size();
				currentBuffered = bufferedNow;
			}
		}

		if (LOG.isDebugEnabled()) {
			LOG.debug("{}: Size of buffered data: {} bytes",
				inputGate.getOwningTaskName(),
				currentBuffered == null ? 0L : currentBuffered.size());
		}

		// the next barrier that comes must assume it is the first
		numBarriersReceived = 0;

		if (startOfAlignmentTimestamp > 0) {
			latestAlignmentDurationNanos = System.nanoTime() - startOfAlignmentTimestamp;
			startOfAlignmentTimestamp = 0;
		}
	}

当执行完 releaseBlocksAndResetBarriers 方法时,currentBuffered!=null 了,会进入

代码语言:javascript
复制
//barrier block 解除后 next 中的 value 不为 null
				next = Optional.ofNullable(currentBuffered.getNext());

然后直接消费数据

代码语言:javascript
复制
//当 barrier 全部对齐之后,先消费 bufferBlocker 中的 ArrayDeque<BufferOrEvent> currentBuffers 的数据
			BufferOrEvent bufferOrEvent = next.get();
			if (isBlocked(bufferOrEvent.getChannelIndex())) {
				// if the channel is blocked, we just store the BufferOrEvent
				//  barrier 对齐 缓存数据
				bufferBlocker.add(bufferOrEvent);
				checkSizeLimit();
			}
			else if (bufferOrEvent.isBuffer()) {
				return bufferOrEvent;
			}

一直消费缓存中的数据( 此过程会阻塞不会继续消费 inputGate 中的数据),直至消耗完成

代码语言:javascript
复制
next = Optional.ofNullable(currentBuffered.getNext());
				if (!next.isPresent()) {
					//完成缓冲数据的消费
					completeBufferedSequence();
					return getNextNonBlocked();
				}

完成了之后,就跟程序第一次运行至此一样,循环往复。

总结
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-07-03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 分类
  • 正文
  • 总结
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档