前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >一文搞懂 Flink Stream Join原理

一文搞懂 Flink Stream Join原理

作者头像
shengjk1
发布2020-12-02 14:55:26
7160
发布2020-12-02 14:55:26
举报
文章被收录于专栏:码字搬砖
总括
在这里插入图片描述
在这里插入图片描述
详解

一般情况下,我们会写如下的代码

代码语言:javascript
复制
DataStreamSource<Tuple2<Long, Long>> addSource = env.addSource(new WordSource());
		
		addSource.join(addSource).where(new KeySelector<Tuple2<Long, Long>, Long>() {
			@Override
			public Long getKey(Tuple2<Long, Long> value) throws Exception {
//				System.out.println("where "+value.f0);
				return value.f0;
			}
		}).equalTo(new KeySelector<Tuple2<Long, Long>, Long>() {
			@Override
			public Long getKey(Tuple2<Long, Long> value) throws Exception {
				System.out.println("equalTo "+value.f0);
				return value.f0;
			}
		}).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
			.apply(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {
				@Override
				public Tuple2<Long, Long> join(Tuple2<Long, Long> first, Tuple2<Long, Long> second) throws Exception {
//						System.out.println("vvvvv "+first+second);
					return new Tuple2<>(first.f0,first.f1+second.f1);
				}
			})
			.print("join====");

点进去可以得到 join 的入口方法

代码语言:javascript
复制
//join 的入口方法  otherStream 为 stream2,生成 joinedStream
	public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
		return new JoinedStreams<>(this, otherStream);
	}

然后

代码语言:javascript
复制
//对 stream1 应用 keySelector
	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
		requireNonNull(keySelector);
		final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
		return where(keySelector, keyType);
	}

然后调用 Where 类的 equalTo 方法,保证了 stream1 stream2 相同的 key 进入到同一个窗口

代码语言:javascript
复制
//对 stream2 应用 keySelector 保证 stream1 和 stream2 相同的 key 或者说要关联的 key 在同一个窗口内
		public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
			requireNonNull(keySelector);
			final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
			return equalTo(keySelector, otherKey);
		}

再往下调用 EqualTo 类的 window 方法

代码语言:javascript
复制
@PublicEvolving
			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
			}

然后会调用 WithWindow 的 apply 方法

代码语言:javascript
复制
//应用 apply 方法
		public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {
			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(
				function,
				JoinFunction.class,
				0,
				1,
				2,
				TypeExtractor.NO_INDEX,
				input1.getType(),
				input2.getType(),
				"Join",
				false);

			return apply(function, resultType);
		}

public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
			//clean the closure
			function = input1.getExecutionEnvironment().clean(function);

			coGroupedWindowedStream = input1.coGroup(input2)
				.where(keySelector1)
				.equalTo(keySelector2)
				.window(windowAssigner)
				.trigger(trigger)
				.evictor(evictor)
				.allowedLateness(allowedLateness);

			return coGroupedWindowedStream
					.apply(new JoinCoGroupFunction<>(function), resultType);
		}

至此为止,关键性的方法 apply 出现了,通过 apply 的实现,我们可以知道,join 底层是通过 coGroup 实现的,得到 coGroupedWindowedStream,其中的 function 即为我们自定义的 function.

coGroupedWindowedStream 的 apply 方法最终调用了 WindowStream 的 apply 方法

代码语言:javascript
复制
// 转化为 operator
	private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {

		final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
		KeySelector<T, K> keySel = input.getKeySelector();

		WindowOperator<K, T, Iterable<T>, R, W> operator;

		if (evictor != null) {
			@SuppressWarnings({"unchecked", "rawtypes"})
			TypeSerializer<StreamRecord<T>> streamRecordSerializer =
					(TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
			
			// 窗口中 state ttl long_max_value
			ListStateDescriptor<StreamRecord<T>> stateDesc =
					new ListStateDescriptor<>("window-contents", streamRecordSerializer);

			operator =
				new EvictingWindowOperator<>(windowAssigner,
					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
					keySel,
					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
					stateDesc,
					function,
					trigger,
					evictor,
					allowedLateness,
					lateDataOutputTag);

		} else {
			// 窗口中 state ttl long_max_value
			ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
				input.getType().createSerializer(getExecutionEnvironment().getConfig()));

			operator =
				new WindowOperator<>(windowAssigner,
					windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
					keySel,
					input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
					stateDesc,
					function,
					trigger,
					allowedLateness,
					lateDataOutputTag);
		}

		// StreamOperator 转化为 dataStream
		return input.transform(opName, resultType, operator);
	}

转化为了 windowOperator。当 window 执行的时候,调用了 CoGroupWindowFunction 的 apply 方法

代码语言:javascript
复制
@Override
		// window 在执行的时候,即 userFunction.process
		public void apply(KEY key,
				W window,
				Iterable<TaggedUnion<T1, T2>> values,
				Collector<T> out) throws Exception {
			//会将两个 stream 的数据,添加到 list 当中
			List<T1> oneValues = new ArrayList<>();
			List<T2> twoValues = new ArrayList<>();

			for (TaggedUnion<T1, T2> val: values) {
				if (val.isOne()) {
					oneValues.add(val.getOne());
				} else {
					twoValues.add(val.getTwo());
				}
			}
			wrappedFunction.coGroup(oneValues, twoValues, out);
		}

而 wrappedFunction.coGroup 调用了 JoinCoGroupFunction.coGroup,从而实现双流 join

代码语言:javascript
复制
@Override
		// join 最终执行的地方,其中 first、second 都是窗口中的数据
		public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {
			for (T1 val1: first) {
				for (T2 val2: second) {
					//这里执行用户定义的 join 方法
					out.collect(wrappedFunction.join(val1, val2));
				}
			}
		}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/11/30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 总括
  • 详解
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档