前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文搞懂 Flink OperatorChain 对象重用

一文搞懂 Flink OperatorChain 对象重用

作者头像
shengjk1
发布2022-06-18 10:35:22
5310
发布2022-06-18 10:35:22
举报
文章被收录于专栏:码字搬砖

OperatorChain 的对象重用,可以提高效率,但什么情况下可以重用,什么情况下不可以重用,我们一起来看你一下代码:

首先,在OperatorChain 类的 createChainedOperator 方法

代码语言:javascript
复制
private <IN, OUT> WatermarkGaugeExposingOutput<StreamRecord<IN>> createChainedOperator(
			StreamTask<?, ?> containingTask,
			StreamConfig operatorConfig,
			Map<Integer, StreamConfig> chainedConfigs,
			ClassLoader userCodeClassloader,
			Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
			List<StreamOperator<?>> allOperators,
			OutputTag<IN> outputTag) {
		...
		//chainingoutput
		if (containingTask.getExecutionConfig().isObjectReuseEnabled()) {
			currentOperatorOutput = new ChainingOutput<>(chainedOperator, this, outputTag);
		}
		else {
			// deep copy
			TypeSerializer<IN> inSerializer = operatorConfig.getTypeSerializerIn1(userCodeClassloader);
			currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
		}
...
	}

如果启用了对象重用,即 isObjectReuseEnabled==true,创建的 outPut 为 ChainingOutput,如果没有启用对象重用,则 outPut 为 CopyingChainingOutput。

需要明确的是 currentOperatorOutput 是为给下游算子输入数据的。而 ChainingOutput 和 CopyingChainingOutput 的区别是 ChainingOutput 是值传递,而 CopyingChainingOutput 是深拷贝。看到这里我们应该就已经明确了什么情况下可以启动对象重用什么情况下不可以启用对象重用。

我们需要明确的一个点对应 java bean 来说,在启动对象重用情况下,如果下游的算子更改了某个属性值,会直接影响上游,以及其下游,这点还是要特别注意的

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-06-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档