前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Apache Flink中的各个窗口时间的概念区分

Apache Flink中的各个窗口时间的概念区分

作者头像
CainGao
发布2020-04-14 14:44:46
7590
发布2020-04-14 14:44:46
举报
文章被收录于专栏:指尖数虫指尖数虫

Apache Flink中提供了基于时间的窗口计算,例如计算五分钟内的用户数量或每一分钟计算之前五分钟的服务器异常日志占比等。因此Apache Flink在流处理中提供了不同时间的支持。

处理时间(Processing Time)

处理时间是执行相应的操作时的系统时间。一般来说就是Apache Flink在执行某条数据的计算的时刻的系统时间。

处理时间是最简单的时间概念,基于处理时间能够实现最佳的性能与延迟,例如计算五分钟的用户数量,无需设置其他相关的项目直接可以通过系统的当前时间进行计算即可。但是也会有某些影响,例如基于网络或者其他原因造成某些数据无法按照预计的时间到到,或者说在Apache Flink任务重启时都会造成计算结果与预期的结果不符的情况出现。

摄取时间(Ingestion Time)

摄取时间是指Apache Flink读取某条数据的时间,摄取时间是基于事件时间与处理时间之间的,因为摄取时间会在数据到来的时候给予一次时间戳,基于时间的计算需要按照时间戳去进行。所以在操作时会把数据分配到不同的不同的窗口进行计算。但是相对于事件时间来说,它更加简单一些,不需要设置Watermarks。

事件时间(Event Time)

事件时间是比较好理解的一个时间,就是类似于上面展示的log4j输出到日志中的时间,在大部分的场景中我们在进行计算时都会利用这个时间。例如计算五分钟内的日志错误占比等。

Apache Flink能够支持基于事件的时间设置,事件时间是最接近于事实需求的时间。我们通常的数据处理大部分是基于事件时间的处理。 那么在流式计算中做事件时间的处理基于某些原因可能就会存在问题,流处理在事件产生过程中,通过消息队列,到Flink的Source获取、再到Operator。中间的过程都会产生时间消耗。还有一些其他的情况,例如网络抖动造成的数据延迟等就会存在数据乱序。

但是对于数据乱序我们又不能无限期的等待事件到来,(谁知道它还来不来)。那么Apache Flink就有一个Watermark用来解决该问题,Watermark就是保证在一个特定的时间后进行触发window计算的机制。

代码语言:javascript
复制
def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置引擎的执行为事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val text = env.socketTextStream("localhost",9999)
    //设置时间戳与Watermark
    val eventText = text.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {
      val maxOutOfOrderTime = 10000L  //设置10s的时间,意思是超过10s到达的数据将不会被处理
      var currentTimestamp:Long = _    // 从数据上获取到的当前时间
      override def getCurrentWatermark: Watermark = {
        //根据可容忍的最大延迟时间获取watermark
        new Watermark(currentTimestamp-maxOutOfOrderTime)
      }
      //从String中提取出事件时间
      override def extractTimestamp(str: String, l: Long): Long = {
        val sdf:SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,S")
        //获取到数据的事件时间
        currentTimestamp = sdf.parse(str.split("\\|")(0)).getTime
        currentTimestamp
      }
    })

    val count = eventText.map(res=>{
      val ress  = res.split("\\|")
      (ress(1),1)
    }).keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)
    //输出结果
    count.print()
    env.execute("Apache Flink Event Time Watermark")
  }

以上代码发布于 https://github.com/CainGao/flink_learn

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 指尖数虫 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档