前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink getRuntimeContext().getMapState的时候发生了什么?

Flink getRuntimeContext().getMapState的时候发生了什么?

作者头像
shengjk1
发布2019-12-20 15:51:22
1.2K0
发布2019-12-20 15:51:22
举报
文章被收录于专栏:码字搬砖码字搬砖
我们都知道,当使用 获取 Mapstate 的时候
代码语言:javascript
复制
public void open(Configuration cfg) {
	          state = getRuntimeContext().getMapState(
	                  new MapStateDescriptor<>("sum", MyType.class, Long.class));
	      }

跟进,进入 DefaultKeyedStateStore

代码语言:javascript
复制
@Override
	public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
		requireNonNull(stateProperties, "The state properties must not be null");
		try {
			stateProperties.initializeSerializerUnlessSet(executionConfig);
           //关键性方法,获得到原始的 state
			MapState<UK, UV> originalState = getPartitionedState(stateProperties);
          //返回一个包装之后的 MapState
			return new UserFacingMapState<>(originalState);
		} catch (Exception e) {
			throw new RuntimeException("Error while getting state", e);
		}
	}

我们一起看一下,它是如何获取原始 state 的,跟进到 AbstractKeyedStateBackend

代码语言:javascript
复制
@Override
	public <N, S extends State> S getPartitionedState(
			final N namespace,
			final TypeSerializer<N> namespaceSerializer,
			final StateDescriptor<S, ?> stateDescriptor) throws Exception {

		checkNotNull(namespace, "Namespace");

		/*
		如果 stateDescriptor name 与最新的 lastName 相同,则将最新的 state 返回
		如若第一次访问,lastName==null
		 */
		if (lastName != null && lastName.equals(stateDescriptor.getName())) {
			lastState.setCurrentNamespace(namespace);
			return (S) lastState;
		}

		/*
		第一次 previous ==null ,再次获取直接从缓冲中返回
		 */
		InternalKvState<K, ?, ?> previous = keyValueStatesByName.get(stateDescriptor.getName());
		if (previous != null) {
			lastState = previous;
			lastState.setCurrentNamespace(namespace);
			lastName = stateDescriptor.getName();
			return (S) previous;
		}

		//第一次会创建 对应的columnFamily,并返回相应的Rockdb State 对象 如 RocksDBListState
		final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
		final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;

		//对 lastName 赋值
		lastName = stateDescriptor.getName();
		lastState = kvState;
		kvState.setCurrentNamespace(namespace);

		return state;
	}

如果不是第一次访问,则直接从缓存中获取,若为第一个则创建。我们一起来看一下,具体的创建方法

代码语言:javascript
复制
@Override
	@SuppressWarnings("unchecked")
	public <N, S extends State, V> S getOrCreateKeyedState(
			final TypeSerializer<N> namespaceSerializer,
			StateDescriptor<S, V> stateDescriptor) throws Exception {
		checkNotNull(namespaceSerializer, "Namespace serializer");
		checkNotNull(keySerializerProvider, "State key serializer has not been configured in the config. " +
				"This operation cannot use partitioned state.");

		InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
		if (kvState == null) {
			if (!stateDescriptor.isSerializerInitialized()) {
				stateDescriptor.initializeSerializerUnlessSet(executionConfig);
			}
             //创建 state
			kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
				namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
             //添加至缓存		
	        keyValueStatesByName.put(stateDescriptor.getName(), kvState);
            //将 state 注册到相应的 task 中,具体是 task run的时候用的
			publishQueryableStateIfEnabled(stateDescriptor, kvState);
		}
		return (S) kvState;
	}

继续看一下 kvState 具体是如何创建的

代码语言:javascript
复制
public static <K, N, SV, TTLSV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(
		TypeSerializer<N> namespaceSerializer,
		StateDescriptor<S, SV> stateDesc,
		KeyedStateBackend<K> stateBackend,
		TtlTimeProvider timeProvider) throws Exception {
		Preconditions.checkNotNull(namespaceSerializer);
		Preconditions.checkNotNull(stateDesc);
		Preconditions.checkNotNull(stateBackend);
		Preconditions.checkNotNull(timeProvider);
		return  stateDesc.getTtlConfig().isEnabled() ?
			new TtlStateFactory<K, N, SV, TTLSV, S, IS>(
				namespaceSerializer, stateDesc, stateBackend, timeProvider)
				.createState() :
			stateBackend.createInternalState(namespaceSerializer, stateDesc);
	}

咱们就以 stateBackend.createInternalState 为例,二者有很多公用的逻辑 继续跟进至 RocksDBMapState

代码语言:javascript
复制
@SuppressWarnings("unchecked")
	static <UK, UV, K, N, SV, S extends State, IS extends S> IS create(
		StateDescriptor<S, SV> stateDesc,
		Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult,
		RocksDBKeyedStateBackend<K> backend) {
		return (IS) new RocksDBMapState<>(
			registerResult.f0,
			registerResult.f1.getNamespaceSerializer(),
			(TypeSerializer<Map<UK, UV>>) registerResult.f1.getStateSerializer(),
			(Map<UK, UV>) stateDesc.getDefaultValue(),
			backend);
	}

至此为止 RocksDBMapState 创建完成,也就是说至此,第一次调用生成的 MapState 已完成。即

代码语言:javascript
复制
mapState = getRuntimeContext().getMapState(
	                  new MapStateDescriptor<>("sum", MyType.class, Long.class));

对应的 MapState 已生成,该方法调用完毕。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档