前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink Time

Flink Time

作者头像
awwewwbbb
发布2022-05-15 09:10:30
2930
发布2022-05-15 09:10:30
举报
文章被收录于专栏:chaplinthink的专栏

基础概念

支持两种时间概念:

  • Processing Time 时间递增
  • Event Time 支持一定程度的乱序 上一个 checkpoint 或者 savepoint 进行重放,是不是希望结果完全相同。如果希望结果完全相同,就只能用 Event Time;如果接受结果不同,则可以用 Processing Time。

watermark

一个watermark 代表了 watermark所包含的timestamp 数值,表示后来的数据已经再也没有小于或等于这个时间的了.

Flink 支持两种 watermark 生成方式:

  • 在SourceFunction中产生

collectWithTimestamp 方法发送一条数据 第一个参数就是我们要发送的数据 第二个参数就是这个数据所对应的时间戳 emitWatermark 去产生一条 watermark: 表示接下来不会再有时间戳小于等于这个数值记录

  • 在使用DataStream API 的时候指定
代码语言:javascript
复制
DataStream.assignTimestampsAndWatermarks

建议生成的工作越靠近 DataSource 越好。这样会方便让程序逻辑里面更多的 operator 去判断某些数据是否乱序。

code demo:

代码语言:javascript
复制
object WaterMakerTest {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)

        val dataStream: DataStream[Order] = env.socketTextStream("localhost", 9999).map(item => {
            val itemArray = item.split(",")
            Order(itemArray(0).toLong, itemArray(1), itemArray(2).toDouble)
        })

        val outputStream: DataStream[Order] = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(0)) {
            override def extractTimestamp(element: Order): Long = element.timestamp * 1000L
        }).keyBy("category").timeWindow(Time.seconds(5)).apply(new MyWindowFunction)

        dataStream.print("Data")
        outputStream.print("Result")

        env.execute()
    }
}

class MyWindowFunction extends WindowFunction[Order, Order, Tuple, TimeWindow] {
    override def apply(key: Tuple, window: TimeWindow, input: Iterable[Order], out: Collector[Order]): Unit = {
        val timestamp = window.maxTimestamp()
        var sum: Double = 0
        for (elem <- input) {
            sum += elem.price
        }

        val category = key.asInstanceOf[Tuple1[String]].f0
        out.collect(Order(timestamp, category, sum))
    }
}

总结

主要了解Flink的时间概念以及Watermark的作用,它可以处理乱序数据,通过watermark来定义关窗的时间点. 可以在SourceFunction和DataStream API 指定生成 Watermark.

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-05-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 基础概念
  • watermark
  • 总结
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档