前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文搞懂 Flink window 元素的顺序问题

一文搞懂 Flink window 元素的顺序问题

作者头像
shengjk1
发布2020-05-28 17:28:16
9040
发布2020-05-28 17:28:16
举报
文章被收录于专栏:码字搬砖码字搬砖

1. 起因

在我们使用 evictor 算子的时候,官网有这样的一句话:

Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last

大概的意思就是说,evictor 尽管可以从窗口开始时移除元素但是并不保证,这个元素是第一个或最后一个到达窗口的。 why?

2. 解释

代码语言:javascript
复制
env.addSource(consumer).uid("orderAndRegisterUserIdSource")
				
				.keyBy(new KeySelector<String, String>() {
					@Override
					public String getKey(String value) throws Exception {
						return "a";
					}
				})
				.timeWindow(Time.seconds(1000))
			.evictor(new Evictor<String, TimeWindow>() {
				@Override
				public void evictBefore(Iterable<TimestampedValue<String>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
					System.out.println("evictBefore");
				}

				@Override
				public void evictAfter(Iterable<TimestampedValue<String>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
					System.out.println("evictAfter");
				}
			})
				.trigger(new CountAndTimeTrigger(2L))
				.process(new ProcessWindowFunctionImp()).uid("process");

通常我们会写出这样的代码,但当我们在 evictBefore 或者 evictAfter输出第一个元素时,它竟然保证是第一个或者最后一个进入 window 的。

我们一起来看一下源码,进入 EvictingWindowOperator

代码语言:javascript
复制
public void processElement(StreamRecord<IN> element) throws Exception {
		final Collection<W> elementWindows = windowAssigner.assignWindows(
				element.getValue(), element.getTimestamp(), windowAssignerContext);

		//if element is handled by none of assigned elementWindows
		boolean isSkippedElement = true;

		final K key = this.<K>getKeyedStateBackend().getCurrentKey();

		if (windowAssigner instanceof MergingWindowAssigner) {
			MergingWindowSet<W> mergingWindows = getMergingWindowSet();

			for (W window : elementWindows) {

				// adding the new window might result in a merge, in that case the actualWindow
				// is the merged window and we work with that. If we don't merge then
				// actualWindow == window
				W actualWindow = mergingWindows.addWindow(window,
						new MergingWindowSet.MergeFunction<W>() {
							@Override
							public void merge(W mergeResult,
									Collection<W> mergedWindows, W stateWindowResult,
									Collection<W> mergedStateWindows) throws Exception {

								if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
									throw new UnsupportedOperationException("The end timestamp of an " +
											"event-time window cannot become earlier than the current watermark " +
											"by merging. Current watermark: " + internalTimerService.currentWatermark() +
											" window: " + mergeResult);
								} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
									throw new UnsupportedOperationException("The end timestamp of a " +
											"processing-time window cannot become earlier than the current processing time " +
											"by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
											" window: " + mergeResult);
								}

								triggerContext.key = key;
								triggerContext.window = mergeResult;

								triggerContext.onMerge(mergedWindows);

								for (W m : mergedWindows) {
									triggerContext.window = m;
									triggerContext.clear();
									deleteCleanupTimer(m);
								}

								// merge the merged state windows into the newly resulting state window
								evictingWindowState.mergeNamespaces(stateWindowResult, mergedStateWindows);
							}
						});

				// drop if the window is already late
				if (isWindowLate(actualWindow)) {
					mergingWindows.retireWindow(actualWindow);
					continue;
				}
				isSkippedElement = false;

				W stateWindow = mergingWindows.getStateWindow(actualWindow);
				if (stateWindow == null) {
					throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
				}

				evictingWindowState.setCurrentNamespace(stateWindow);
				evictingWindowState.add(element);

				triggerContext.key = key;
				triggerContext.window = actualWindow;
				evictorContext.key = key;
				evictorContext.window = actualWindow;

				TriggerResult triggerResult = triggerContext.onElement(element);

				if (triggerResult.isFire()) {
					Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
					if (contents == null) {
						// if we have no state, there is nothing to do
						continue;
					}
					emitWindowContents(actualWindow, contents, evictingWindowState);
				}

				if (triggerResult.isPurge()) {
					evictingWindowState.clear();
				}
				registerCleanupTimer(actualWindow);
			}

			// need to make sure to update the merging state in state
			mergingWindows.persist();
		} else {
			for (W window : elementWindows) {

				// check if the window is already inactive
				if (isWindowLate(window)) {
					continue;
				}
				isSkippedElement = false;

				evictingWindowState.setCurrentNamespace(window);
				evictingWindowState.add(element);

				triggerContext.key = key;
				triggerContext.window = window;
				evictorContext.key = key;
				evictorContext.window = window;

				TriggerResult triggerResult = triggerContext.onElement(element);

				if (triggerResult.isFire()) {
					Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
					if (contents == null) {
						// if we have no state, there is nothing to do
						continue;
					}
					emitWindowContents(window, contents, evictingWindowState);
				}

				if (triggerResult.isPurge()) {
					evictingWindowState.clear();
				}
				registerCleanupTimer(window);
			}
		}

		// side output input event if
		// element not handled by any window
		// late arriving tag has been set
		// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
		if (isSkippedElement && isElementLate(element)) {
			if (lateDataOutputTag != null){
				sideOutput(element);
			} else {
				this.numLateRecordsDropped.inc();
			}
		}
	}

这个里的 evictingWindowState 是一个 RocksDBListState。

Flink key state 为何仅与 key 有关的 ,我们知道 evictingWindowState.get 时也仅仅会得到当前 key 对应的值 。

当窗口触发时,传递给 emitWindowContents 时,也仅仅是当前 key 的值。( 对于同一个 key 而言是可以保证顺序的 )。故当 evictor 处理数据时也仅仅是当前 key 的值,而非整个 window 的值。故 evictor 处理的第一个数据不一定是 第一个或最后一个到达 window 的。而 window 也不保证元素顺序(进入window 窗口的顺序)

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-05-28 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 起因
  • 2. 解释
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档