前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >写给大忙人看的 Flink Window原理

写给大忙人看的 Flink Window原理

作者头像
shengjk1
发布2020-06-11 10:21:11
6780
发布2020-06-11 10:21:11
举报
文章被收录于专栏:码字搬砖码字搬砖

Window 可以说是 Flink 中必不可少的 operator 之一,在很多场合都有很非凡的表现。今天呢,我们就一起来看一下 window 是如何实现的。

window 分类

Tumbling Window

Sliding Window

Session Window

Global Window

window operator

evictor

evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行用户代码之后,更详细的描述可以参考 org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter 两个方法。

trigger

trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的 trigger,如果默认的 trigger 不能满足你的需求,则可以自定义一个类,继承自 Trigger 即可,我们详细描述下 Trigger 的接口以及含义:

  • onElement() 每次往 window 增加一个元素的时候都会触发
  • onEventTime() 当 event-time timer 被触发的时候会调用
  • onProcessingTime() 当 processing-time timer 被触发的时候会调用
  • onMerge() 对两个 trigger 的 state 进行 merge 操作
  • clear() window 销毁的时候被调用

上面的接口中前三个会返回一个 TriggerResult,TriggerResult 有如下几种可能的选择:

  • CONTINUE 不做任何事情
  • FIRE 触发 window
  • PURGE 清空整个 window 的元素并销毁窗口
  • FIRE_AND_PURGE 触发窗口,然后销毁窗口
window code
代码语言:javascript
复制
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.slf4j.LoggerFactory;

import java.util.Properties;

/**
 * @author shengjk1
 * @date 2019/9/4
 */
public class Main {
	protected final static org.slf4j.Logger logger = LoggerFactory.getLogger(Main.class);
	
	public static void main(String[] args) throws Exception {
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
		
		env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
		env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
		env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
		env.getCheckpointConfig().setCheckpointTimeout(60000);
		env.getCheckpointConfig().setMaxConcurrentCheckpoints(5);
		env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
		env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
		
		env.setParallelism(1);
		
		StateBackend backend =
			new RocksDBStateBackend("file:////Users/iss/sourceCode/spark/flink/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/checkpoints", true);
		env.setStateBackend(backend);
		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "bigdata-dev-mq:9092");
		properties.setProperty("group.id", "test");
		properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "1000");
		
		
		FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
		consumer.setStartFromEarliest();
		
		env.addSource(consumer).uid("orderAndRegisterUserIdSource")
				.rebalance()
				.keyBy(new KeySelector<String, String>() {
					@Override
					public String getKey(String value) throws Exception {
						return value;
					}
				})
				.timeWindow(Time.seconds(2))
				.trigger(new CountAndTimeTrigger(2L)
				.process(new ProcessWindowFunctionImp()).uid("process");
		
		
		// execute program
		env.execute("realTimeDataWareHouse");
	}
}

其中的 CountAndTimeTrigger 可参考 Flink 自定义触发器实现带超时时间的 countAndTimeTrigger

window 原理剖析

首先,当此程序开始消费消息时( 可参考 一文搞定 Flink 消费消息的全流程) 进入 WindowOperator processElement 方法

代码语言:javascript
复制
// window operator 的 processElement
	public void processElement(StreamRecord<IN> element) throws Exception {
		final Collection<W> elementWindows = windowAssigner.assignWindows(
			element.getValue(), element.getTimestamp(), windowAssignerContext);

		//if element is handled by none of assigned elementWindows
		boolean isSkippedElement = true;

		final K key = this.<K>getKeyedStateBackend().getCurrentKey();

		if (windowAssigner instanceof MergingWindowAssigner) {
			MergingWindowSet<W> mergingWindows = getMergingWindowSet();

			for (W window: elementWindows) {

				// adding the new window might result in a merge, in that case the actualWindow
				// is the merged window and we work with that. If we don't merge then
				// actualWindow == window
				W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
					@Override
					public void merge(W mergeResult,
							Collection<W> mergedWindows, W stateWindowResult,
							Collection<W> mergedStateWindows) throws Exception {

						if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
							throw new UnsupportedOperationException("The end timestamp of an " +
									"event-time window cannot become earlier than the current watermark " +
									"by merging. Current watermark: " + internalTimerService.currentWatermark() +
									" window: " + mergeResult);
						} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
							throw new UnsupportedOperationException("The end timestamp of a " +
									"processing-time window cannot become earlier than the current processing time " +
									"by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
									" window: " + mergeResult);
						}

						triggerContext.key = key;
						triggerContext.window = mergeResult;

						triggerContext.onMerge(mergedWindows);

						for (W m: mergedWindows) {
							triggerContext.window = m;
							triggerContext.clear();
							deleteCleanupTimer(m);
						}

						// merge the merged state windows into the newly resulting state window
						windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
					}
				});

				// drop if the window is already late
				if (isWindowLate(actualWindow)) {
					mergingWindows.retireWindow(actualWindow);
					continue;
				}
				isSkippedElement = false;

				W stateWindow = mergingWindows.getStateWindow(actualWindow);
				if (stateWindow == null) {
					throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
				}

				windowState.setCurrentNamespace(stateWindow);
				windowState.add(element.getValue());

				triggerContext.key = key;
				triggerContext.window = actualWindow;

				TriggerResult triggerResult = triggerContext.onElement(element);

				if (triggerResult.isFire()) {
					// RockdbListState  RocksDBReducingState
					ACC contents = windowState.get();
					if (contents == null) {
						continue;
					}
					emitWindowContents(actualWindow, contents);
				}

				if (triggerResult.isPurge()) {
					windowState.clear();
				}
				registerCleanupTimer(actualWindow);
			}

			// need to make sure to update the merging state in state
			mergingWindows.persist();
		} else {
			for (W window: elementWindows) {

				// drop if the window is already late
				if (isWindowLate(window)) {
					continue;
				}
				isSkippedElement = false;

				windowState.setCurrentNamespace(window);
				//数据过来之后会先存入 windowState 直至 window fire
				windowState.add(element.getValue());

				triggerContext.key = key;
				triggerContext.window = window;

				//调用用户定义的 onElement 代码
				TriggerResult triggerResult = triggerContext.onElement(element);
				//当触发窗口时,从 windowState 中获取数据,在本样例中 windowState 为 RocksDBListState
				if (triggerResult.isFire()) {
					//RocksDBListState RocksDBReducingState
					//
					ACC contents = windowState.get();
					if (contents == null) {
						continue;
					}
					//当窗口触发时,会将 window 中数据发送到下游,调用用户的 process 方法。
					emitWindowContents(window, contents);
				}

				if (triggerResult.isPurge()) {
					windowState.clear();
				}
				// 注册 timer,其实就是定时调度任务。底层通过 ScheduledThreadPoolExecutor.schedule(...)来实现的
				// 每个窗口中的每个 key 会有且仅有一个 timer( 判断方式的一部分是通过 map 来实现的)
				registerCleanupTimer(window);
			}
		}

关于 window 消息顺序性问题,可以参考 一文搞懂 Flink window 元素的顺序问题

当注册的 timer 到期之后开始调用 onProcessingTime

代码语言:javascript
复制
// 这个是通过 timer 来调用的,
	// processElement 的时候 registerCleanupTimer(window) 会创建相应的 timer
	public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
		triggerContext.key = timer.getKey();
		triggerContext.window = timer.getNamespace();

		MergingWindowSet<W> mergingWindows;

		if (windowAssigner instanceof MergingWindowAssigner) {
			mergingWindows = getMergingWindowSet();
			W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
			if (stateWindow == null) {
				// Timer firing for non-existent window, this can only happen if a
				// trigger did not clean up timers. We have already cleared the merging
				// window and therefore the Trigger state, however, so nothing to do.
				return;
			} else {
				windowState.setCurrentNamespace(stateWindow);
			}
		} else {
			windowState.setCurrentNamespace(triggerContext.window);
			mergingWindows = null;
		}

		TriggerResult triggerResult = triggerContext.onProcessingTime(timer.getTimestamp());

		if (triggerResult.isFire()) {
			ACC contents = windowState.get();
			if (contents != null) {
				emitWindowContents(triggerContext.window, contents);
			}
		}

		if (triggerResult.isPurge()) {
			windowState.clear();
		}

		if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
			// 会清空所有的 state
			// 先 windowState.clear() 调用用户定义的 clear 方法,然后再清除 windowContext 内部的状态:
			// 仅仅是通过 onProcessingTime or onEventTime method fire window 才可能会触发 clearAllState 操作
			// 否则会可以理解为还是一个窗口虽然 fire 了。
			// 先增量增量的 fire 然后再全量的 fire ( onProcessingTime and  onEventTime  导致的 fire ,未指定 purge)
			clearAllState(triggerContext.window, windowState, mergingWindows);
		}

		if (mergingWindows != null) {
			// need to make sure to update the merging state in state
			mergingWindows.persist();
		}
	}

需要注意的是 window 跟 key 有关

总结

整个 window 流程

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-06-10 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • window 分类
  • window operator
  • window code
  • window 原理剖析
  • 总结
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档