前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文搞懂 flink state key 的设置方式

一文搞懂 flink state key 的设置方式

作者头像
shengjk1
发布2020-05-29 15:49:10
7090
发布2020-05-29 15:49:10
举报
文章被收录于专栏:码字搬砖码字搬砖

1. 疑问

前一篇文章 一文搞懂 Flink window 元素的顺序问题 我们已经知道了,state 的获取、更新、清除等都与 key 相关。那么 key 是如何设置的呢?

2.解释

这需要从 StreamTask 的 run 方法说起。以 OneInputStreamTask 为例,当程序启动开始消费消息时,会进行 OneInputStreamTask 的 run 方法,

代码语言:javascript
复制
@Override
	protected void run() throws Exception {
		// cache processor reference on the stack, to make the code more JIT friendly
		final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
		//处理输入的消息
		while (running && inputProcessor.processInput()) {
			// all the work happens in the "processInput" method
		}
	}

最终调用的是 inputProcessor.processInput() 方法,除了生成 watermark 之外,就是往下游发送记录

代码语言:javascript
复制
						// now we can do the actual processing
						StreamRecord<IN> record = recordOrMark.asRecord();
						synchronized (lock) {
							numRecordsIn.inc();
							//set KeyContext setCurrentKey
			//设置 keyContext (提供了用来查询和设置 keyed operation 的 current key 的接口)
							streamOperator.setKeyContextElement1(record);
			//这里开始调用用户自己的代码
							streamOperator.processElement(record);
						}

一路追踪下去,到 AbstractStreamOperator

代码语言:javascript
复制
//自定义的 KeySelector 在此处起作用
	private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
		if (selector != null) {
			Object key = selector.getKey(record.getValue());
			setCurrentKey(key);
		}
	}

	@SuppressWarnings({"unchecked", "rawtypes"})
	public void setCurrentKey(Object key) {
		if (keyedStateBackend != null) {
			try {
				// need to work around type restrictions
				@SuppressWarnings("unchecked,rawtypes")
				AbstractKeyedStateBackend rawBackend = (AbstractKeyedStateBackend) keyedStateBackend;
				
				//设置 keyedStateBackend currentKey
				rawBackend.setCurrentKey(key);
			} catch (Exception e) {
				throw new RuntimeException("Exception occurred while setting the current key context.", e);
			}
		}
	}

然后会调用 RocksDBKeyedStateBackend

代码语言:javascript
复制
@Override
	public void setCurrentKey(K newKey) {
		super.setCurrentKey(newKey);
		// 每个 key 都会调用一次 将 key group and key 写入 byte[] 中,每次开始写入前都会清空,后续 state 的操作都会从这个 byte[] 中读
		sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), getCurrentKeyGroupIndex());
	}

当查询、更新以及清除 state 时,由 一文搞懂 Flink window 元素的顺序问题 我们可以知道 ,有一个 serializeCurrentKeyWithGroupAndNamespace() 方法,最终进入 buildCompositeKeyNamespace

代码语言:javascript
复制
@Nonnull
	public <N> byte[] buildCompositeKeyNamespace(@Nonnull N namespace, @Nonnull TypeSerializer<N> namespaceSerializer) {
		try {
			// 每次真正操作时候,serializeNamespace
			serializeNamespace(namespace, namespaceSerializer);
			// 将已序列化的 key_group,key,namespace 作为一个整体 copy 出来,这也就是 state key
			final byte[] result = keyOutView.getCopyOfBuffer();
			// 重置,类似于重置游标,去除 namespace bytes
			resetToKey();
			return result;
		} catch (IOException shouldNeverHappen) {
			throw new FlinkRuntimeException(shouldNeverHappen);
		}
	}

至此 state key 就设置完成了,然后就可以按照新设置的 key 进行查询了。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-05-28 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 疑问
  • 2.解释
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档