在Flink-作业目前,我有两个流,一个主要数据流每分钟更新一次从卡夫卡主题,另一个流(广播流),这是用于KeyedBroadcastProcessFunction的进程元素功能的一些计算与主流数据。
现在,我有一个新的要求,增加一个流,这是完全不同的结构与其他两个流。
1)如何通过必须在Flink状态下可用的第三流,以便与主数据和广播状态数据一起进行计算?在keyedBroadcastProcess函数中?
( 2)对于主要数据,我们能有两个广播流吗?
3)由于流数据是完全不同的数据,广播和第三数据流没有更频繁的变化,所以加入不会起作用。它就像一个主数据,在计算中和主数据流一起使用,还没有找到任何解决方案,请帮助。请分享一些链接,我可以参考。
发布于 2020-06-03 21:33:08
Flink不提供具有三个输入的任何类型的流程功能。
你可以把这两个广播流结合在一起(在广播之前)。我很欣赏它们是非常不同的类型,但是你总能找到一些方法让它们共存。如果没有更自然的方法来统一这两种类型,您可以使用任一。要将两个完全不同的类型合并到一个流中,您可以这样做:
DataStream<String> strings = env.fromElements("one", "two", "three");
DataStream<Integer> ints = env.fromElements(1, 2, 3);
DataStream<Either<String, Integer>> stringsOnTheLeft = strings
.map(new MapFunction<String, Either<String, Integer>>() {
@Override
public Either<String, Integer> map(String s) throws Exception {
return Either.Left(s);
}
});
DataStream<Either<String, Integer>> intsOnTheRight = ints
.map(new MapFunction<Integer, Either<String, Integer>>() {
@Override
public Either<String, Integer> map(Integer i) throws Exception {
return Either.Right(i);
}
});
DataStream<Either<String, Integer>> stringsAndInts = stringsOnTheLeft.union(intsOnTheRight);
或者,如果您可以在不同的阶段将广播流应用到主流,那么您可以有两个KeyedBroadcastProcessFunctions序列,其中一个输入到另一个:
events
.keyBy(x -> x.foo)
.connect(broadcast1)
.process(new process1())
.keyBy(x -> x.foo)
.connect(broadcast2)
.process(new process2())
更新:
如果我们像这样合并和广播,如果有任何更新,流将更新广播状态,或者它将在广播状态中创建一个新条目?
这完全在你的控制之下。广播状态总是映射状态;我想您会选择一些简单的键来使用,这样您就可以得到类似于MapState<String, Either<T1, T2>>
的东西了。Map状态与任何hashmap一样工作:如果您重用一个键,它将替换条目,如果您引入一个新键,它将创建一个新条目。
..。我如何提供关键的公共这些广播流?
钥匙不一定是一样的,它们必须是同一类型的。
https://stackoverflow.com/questions/62165508
复制相似问题