首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >TwoInputStreamOperator算子的Apache水印行为

TwoInputStreamOperator算子的Apache水印行为
EN

Stack Overflow用户
提问于 2021-10-22 10:44:19
回答 1查看 182关注 0票数 0

有两个数据流与时间戳分配和水印生成器定义如下。

代码语言:javascript
运行
复制
val streamA: DataStream[A] = kafkaStreamASourceOutput.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[A](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[A] {
            override def extractTimestamp(element: A, recordTimestamp: Long): Long = {
              element.lastUpdatedMs
            }
          })
      )

val streamA: DataStream[B] = kafkaStreamBSourceOutput.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[B](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[B] {
            override def extractTimestamp(element: B, recordTimestamp: Long): Long = {
              element.lastUpdatedMs
            }
          })
      )

当这两个流在一个算子中连接时,来自streamA或streamB的最小水印作为连接算子的水印。

代码语言:javascript
运行
复制
class CombineAB extends CoProcessFunction[A, B, C] {
   override def processElement1(elem: A, ctx:Context, out: Collector[C]) {
        out.collect(C(elem.x, elem.y, time.Now()))
   }
   override def processElement2(elem: B, ctx:Context, out: Collector[C]) {
        out.collect(C(elem.x, elem.y, time.Now()))
   }
}

val streamC: DataStream[C] = streamA.connect(streamB)
      .process(new CombineAB)

CombineAB算子的水印是AB的最小值。在此基础上,将C类型的元素标记为迟或不晚。

但是,由于我们没有附加任何分配给C的时间戳,这是否意味着CombineAB运算符中的任何元素都没有延迟标记?因此,在C上加窗不会有任何延迟记录被删除吗?

假设我们将指定的时间戳和水印生成器附加到C中,这是否意味着完全忽略了来自A和B的水印,而CombineAB的水印仅取决于C的时间戳字段和用C定义的延迟性。

代码语言:javascript
运行
复制
     streamC.assignTimestampsAndWatermarks(
        WatermarkStrategy
          .forBoundedOutOfOrderness[C](Duration.ofSeconds(0))
          .withTimestampAssigner(new SerializableTimestampAssigner[C] {
            override def extractTimestamp(element: C, recordTimestamp: Long): Long = {
              element.updatedTime
            }
          })
      )

难道我没有办法将时间戳分配器附加到C上,而CombineAB的水印仍然是AB的最小值,C的元素根据C指定的时间戳和CombineAB的wartermark被标记得很晚。

更新:改进CombineAB的实现

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-10-22 19:40:57

以下几点:

forBoundedOutOfOrderness[A](Duration.ofSeconds(0))是不寻常的。任何不正常的事件都会迟到。为什么不使用forMonotonousTimestamps()

CombineAB生成的记录将具有时间戳;不需要将assignTimestampsAndWatermarks应用于此流。Collector生成的任何记录的时间戳都是传入记录的时间戳。

如果您确实在流C上调用了assignTimestampsAndWatermarks,则传入的水印将被过滤掉,并且需要生成新的水印。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69675498

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档