专栏首页kk大数据Flink(14) 窗口函数(window function) 详解

Flink(14) 窗口函数(window function) 详解

一、概念

在定义好了窗口之后,需要指定对每个窗口的计算逻辑。

Window Function 有四种:

ReduceFunction

AggregateFunction

FoldFunction

ProcessWindowFunction

前面两个会执行的更加有效率,因为在元素到来时,Flink 可以增量的把元素聚合到每个窗口上。

ProcessWindowFunction 提供了一个 Iterable 迭代器,可以获得一个窗口的所有元素以及元素的元数据信息。

ProcessWindowFunction 执行效率不是很好,因为 Flink 内部需要缓存窗口所有元素。

可以结合 ReduceFunction 、 AggregateFunction、FoldFunction ,来增量的获取部分结果,结合 ProcessWindowFunction 提供的元数据信息做综合处理。

二、ReduceFunction

使用 reduce 函数,让两个元素结合起来,产生一个相同类型的元素,它是增量的

env.addSource(consumer)
    .map(f => {
        println(f)
            User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
        })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
        override def extractAscendingTimestamp(element: User): Long = element.timestamp
        })
    .keyBy(_.userId)
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    // reduce 返回的类型,应该和输入的类型一样
    // 这里统计的是每个窗口,每个userId 出现的次数,timestamp 是没用的,给了0值
    .reduce { (v1, v2) => User(v1.userId, v1.count + v2.count, 0) }
    .print()

三、AggregateFunction

AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数,一个输入类型(IN),一个累加器(ACC),一个输出类型(OUT)。

输入类型,就是输入流的类型。接口中有一个方法,可以把输入的元素和累加器累加。并且可以初始化一个累加器,然后把两个累加器合并成一个累加器,获得输出结果。

我们可以自己定义一个聚合器:

  class MyAggregateFunction extends AggregateFunction[User, User, (String, Int)] {
    override def createAccumulator(): User = User("", 0, 0)

    override def add(value: User, accumulator: User): User = User(value.userId, value.count + accumulator.count, 0)

    override def getResult(accumulator: User): (String, Int) = (accumulator.userId, accumulator.count)

    override def merge(a: User, b: User): User = User(a.userId, a.count + b.count, 0)
  }

然后应用到计算里:

env.addSource(consumer)
      .map(f => {
        println(f)
        User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
      })
      .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
        override def extractAscendingTimestamp(element: User): Long = element.timestamp
      })
      .keyBy(_.userId)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      // 使用 aggregate 来计算
      .aggregate(new MyAggregateFunction)
      .print()

四、FoldFunction

官方已经不建议用 Fold 了,使用 aggregate 来代替

五、ProcessWindowFunction

ProcessWindowFunction 有一个 Iterable 迭代器,用来获得窗口中所有的元素。

有一个上下文对象用来获得时间和状态信息,比其他的窗口函数有更大的灵活性。

但是这样做损耗了一部分性能和资源,因为元素不能增量聚合,相反 ,在触发窗口计算时,Flink 需要在内部缓存窗口的所有元素。

自己定义一个 ProcessWindowFunction

class MyProcessFunction extends ProcessWindowFunction[User, String, String, TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[User], out: Collector[String]): Unit = {
      var count = 0
      // 遍历,获得窗口所有数据
      for (user <- elements) {
        println(user)
        count += 1
      }
      out.collect(s"Window ${context.window} , count : ${count}")
    }
  }

在算子中计算:

env.addSource(consumer)
      .map(f => {
        println(f)
        User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
      })
      .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
        override def extractAscendingTimestamp(element: User): Long = element.timestamp
      })
      .keyBy(_.userId)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    // 使用 ProcessFunction 来处理整个窗口数据
      .process(new MyProcessFunction())
      .print()

六、ProcessWindowFunction 结合 其他 函数一起计算

使用 ReduceFunction 和 AggregateFunction 进行增量计算,计算的结果输出给 ProcessWindowFunction,然后可以使用 context 附加输出一些元数据信息,比如当前窗口信息、当前水印、当前的processTime等等。

如下:我们使用 ReduceFunction 来计算 每个窗口的 count 最小值,然后输出最小值和这个窗口的开始时间:

 class MyReduceFunction extends ReduceFunction[User] {
    override def reduce(value1: User, value2: User): User = {
      if (value1.count > value2.count) value2
      else value1
    }
  }

  class MyProcessFunction extends ProcessWindowFunction[User, (Long, User), String, TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[User], out: Collector[(Long, User)]): Unit = {
      val min = elements.iterator.next
      out.collect((context.window.getStart, min))
    }
  }
env.addSource(consumer)
    .map(f => {
        println(f)
            User(f.split(",")(0), f.split(",")(1).toInt, f.split(",")(2).toLong)
        })
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[User] {
        override def extractAscendingTimestamp(element: User): Long = element.timestamp
        })
    .keyBy(_.userId)
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    // 使用 reduce 和 processWindowFunction
    .reduce(new MyReduceFunction, new MyProcessFunction)
    .print()

本文分享自微信公众号 - kk大数据(kkbigdata)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-11-15

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Python 统计次数,我的方法和高手的方法

    kk大数据
  • 贝壳网流式数据的平台化实践与挑战

    (文末有福利!) 今天为大家分享贝壳找房流式数据的平台化实践与挑战,具体介绍下如何建设流式数据平台来满足业务方的需求。

    kk大数据
  • 深度理解 Flink 的 parallelism 和 slot

    (1)比如 kafka 某个 topic 数据量太大,设置了10个分区,但 source 端的算子并行度却为1,只有一个 subTask 去同时消费10个分区,...

    kk大数据
  • JDK8新特性之方法引用

    什么是方法引用 方法引用是只需要使用方法的名字,而具体调用交给函数式接口,需要和Lambda表达式配合使用。 如: List<String> list = Ar...

    Java技术栈
  • Python中的ORM工具:Peewee

    上一篇文章介绍了Pyhton中的ORM工具:SQLAlchemy。本文延续之前的风格,介绍另一个ORM模块:Peewee,希望通过简单的CRUD示例可以帮助大家...

    happyJared
  • 杂谈 论实例化类的第六种方式

    最后一种通过Unsafe实例化的类,里面的age的值竟然是0,而不是10或者20。

    彤哥
  • 设计模式:单例模式

    简介 单例模式使得一个类的实例是唯一的,外部对它的访问都针对同一个对象。 单例模式的使用可以是业务上的原因,比如一个User对象需要全局唯一,或者是性能上的考...

    用户1172465
  • java面试| 精选基础题(1)

    JAVA反射机制是在运行状态中,对于任意一个类,都能够知道这个类的所有属性和方法;对于任意一个对象,都能够调用它的任意一个方法。

    KEN DO EVERTHING
  • MyBatis之foreach

         foreach 元素是非常强大的,它允许你指定一个集合,声明集合项和索引变量,它们可以用在元素体内。它也允许你指定开放和关闭的字符串,在迭代之间放置分...

    Arebirth
  • 地图| 高德地图源码级使用大全

    高德地图提供包括:web前端、Android、iOS、服务器、小程序等平台的地图服务, 地图功能众多,本文记载的只是自己遇到的一些问题,绝大部分功能只要参照官...

    進无尽

扫码关注云+社区

领取腾讯云代金券