有两个数据流与时间戳分配和水印生成器定义如下。
val streamA: DataStream[A] = kafkaStreamASourceOutput.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[A] {
override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
element.lastUpdatedMs
}
})
)
val streamA: DataStream[B] = kafkaStreamBSourceOutput.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[B](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[B] {
override def extractTimestamp(element: B, recordTimestamp: Long): Long = {
element.lastUpdatedMs
}
})
)当这两个流在一个算子中连接时,来自streamA或streamB的最小水印作为连接算子的水印。
class CombineAB extends CoProcessFunction[A, B, C] {
override def processElement1(elem: A, ctx:Context, out: Collector[C]) {
out.collect(C(elem.x, elem.y, time.Now()))
}
override def processElement2(elem: B, ctx:Context, out: Collector[C]) {
out.collect(C(elem.x, elem.y, time.Now()))
}
}
val streamC: DataStream[C] = streamA.connect(streamB)
.process(new CombineAB)CombineAB算子的水印是A或B的最小值。在此基础上,将C类型的元素标记为迟或不晚。
但是,由于我们没有附加任何分配给C的时间戳,这是否意味着CombineAB运算符中的任何元素都没有延迟标记?因此,在C上加窗不会有任何延迟记录被删除吗?
假设我们将指定的时间戳和水印生成器附加到C中,这是否意味着完全忽略了来自A和B的水印,而CombineAB的水印仅取决于C的时间戳字段和用C定义的延迟性。
streamC.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[C](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[C] {
override def extractTimestamp(element: C, recordTimestamp: Long): Long = {
element.updatedTime
}
})
)难道我没有办法将时间戳分配器附加到C上,而CombineAB的水印仍然是A和B的最小值,C的元素根据C指定的时间戳和CombineAB的wartermark被标记得很晚。
更新:改进CombineAB的实现
发布于 2021-10-22 19:40:57
以下几点:
forBoundedOutOfOrderness[A](Duration.ofSeconds(0))是不寻常的。任何不正常的事件都会迟到。为什么不使用forMonotonousTimestamps()
CombineAB生成的记录将具有时间戳;不需要将assignTimestampsAndWatermarks应用于此流。Collector生成的任何记录的时间戳都是传入记录的时间戳。
如果您确实在流C上调用了assignTimestampsAndWatermarks,则传入的水印将被过滤掉,并且需要生成新的水印。
https://stackoverflow.com/questions/69675498
复制相似问题