首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >附代码|Flink实时计算TopN

附代码|Flink实时计算TopN

作者头像
小晨说数据
发布2022-03-10 10:36:51
1.2K0
发布2022-03-10 10:36:51
举报
文章被收录于专栏:小晨讲Flink小晨讲Flink

这一章从实际的需求TopN来学习Flink中的窗口知识。

在上一章代码中使用了timeWindow,使得我们可以操作Flink流中的一个时间段内的数据,这就引出了Flink中的"窗口"概念:在大多数场景下,数据流都是"无限的",因引我们无法等待数据流终止后才进行一些统计计算,而通常的需求是对一段时间或是一定范围内的数据进行分析。

Flink提供了两种窗口:Time Window和Count Window,而本章涉及到Time Window的部分概念和用法。

package all.in.one.c03

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.extensions._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.{SlidingProcessingTimeWindows, TumblingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

import scala.util.Random

object Chapter03 extends App {

  // 使用createLocalEnvironmentWithWebUI可以在本地查看WebUI,在集群提交任务无需此方法
  val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
  // Flink的输入为Source,这里我们构建一个定义Source:C03Source
  val sourceDataStream = env.addSource(new C03Source())
  // 接下来以品类做为key,计算每个品类的总价格
  // 同样keyingBy来自org.apache.flink.streaming.api.scala.extensions._包,这里使用keyBy也可以
  // keyBy操作后会返回一个KeyedStream,保存了key信息
  sourceDataStream
    .keyingBy(_._1)
    // 与Chapter 02不同,这里我们调用window来设置窗口
    // 以下代码说明参见README
    .window(SlidingProcessingTimeWindows.of(Time.seconds(60L), Time.seconds(10L)))
    // 计算交易额的总和
    .sum(1)
    .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10L)))
    .process(new ProcessAllWindowFunction[(String, Long), String, TimeWindow] {
      override def process(context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit = {
        val top3 = elements.toSeq
          .sortBy(-_._2)
          .take(3)
          .zipWithIndex
          .map { case ((item, price), idx) => s"   ${idx + 1}. $item: $price" }
          .mkString("\n")
        out.collect(("-" * 16) + "\n" + top3)
      }
    })
    .print()
  env.execute("Chapter 03")

  /**
    * 每100ms产生一条"交易"数据,最终输出品类+价格(随机产生)
    */
  class C03Source extends SourceFunction[(String, Long)] {
    private val items = Array(
      // 男装
      "卫衣",
      "T恤",
      "牛仔裤",
      "西服",
      "风衣",
      // 女装
      "连衣裙",
      "卫衣",
      "衬衫",
      "针织衫",
      "休闲裤",
      // 手机数码
      "手机",
      "手机配件",
      "摄影摄像",
      "影音娱乐",
      "数码配件",
      "智能设备",
      "电子教育",
      // 电脑办公
      "电脑整机",
      "电脑组件",
      "外设",
      "网络产品",
      "办公设备",
      "文具耗材",
      // 家用电器
      "电视",
      "空调",
      "洗衣机",
      "冰箱",
      "厨卫",
      "生活电器",
      // 户外运动
      "运动鞋包",
      "运行服饰",
      "户外鞋服",
      "户外装备",
      "骑行",
      "健身",
      // 家具家装
      "厨房卫浴",
      "灯饰照明",
      "五金工具",
      "客厅家具",
      "餐厅家具",
      // 图书文娱
      "少儿读物",
      "文学",
      "动漫",
      "专业"
    )
    var running = true

    /**
      * Flink会调用run来收集数据
      */
    override def run(sourceContext: SourceFunction.SourceContext[(String, Long)]): Unit = {
      val random = new Random()
      do {
        val item = items(random.nextInt(items.length))
        val price = random.nextInt(3333) + 33
        // context.collect通知Flink新元素进入系统
        sourceContext.collect(item -> price.toLong)
        Thread.sleep(1000)
      } while (running)
    }

    override def cancel(): Unit = running = false

  }

}

TopN需求

假设电商网站有这样一个榜单:展示1分钟内当前用户购买品类交易额的Top3,并且榜单要每10秒刷新一次。而我们现在可以拿到一个交易流,里面记录了交易品类和交易额,要如何实现呢?先看代码效果,启动all.in.one.c03.Chapter03后会看到输出如:

9> ----------------
   1. 厨卫: 3956
   2. 文具耗材: 3174
   3. 摄影摄像: 2738
10> ----------------
   1. 厨卫: 3956
   2. 文具耗材: 3174
   3. 智能设备: 3108
11> ----------------
   1. 影音娱乐: 4304
   2. 风衣: 4286
   3. 厨卫: 3956
12> ----------------
   1. 牛仔裤: 5261
   2. 衬衫: 5155
   3. 厨卫: 4629
  ...
  ...

输入

之前的章节中,我们的输入是监听一个Socket地址读取数据(socketTextStream),这些都是Flink内置简单的输入方式,而本质上Flink Stream的输入就是实现相应的接口来接收数据:SourceFunction,它包括run(Flink调用run方法收集数据)和cancel(任务停止时调用),如socketTextStream就是创建了一个org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction

在代码中,我们实现了一个C03Source,它会约每100ms随机输出品类和价格数据。然后使用env.addSource(new C03Source())来得到相应的数据流DataStream。

窗口操作

根据需求,我们要计算过去60秒内的交易额,所以很容易想到:将时间窗口的时长设置为60秒,然后计算这段时间内每个品类的交易额的和,最后计算Top3就可以了。假设使用上一章的方法timeWindow(Time.seconds(60)),计算的结果是没有问题的,但是你会发现它是每60秒计算一次,无法满足需求每10秒更新一次榜单。此时会引出时间窗口的两个类型(这一章只介绍这两种):滚动(Tumbling)与滑动(Sliding)。

见上图,在定义窗口时指定它的大小,同时再指定触发窗口的间隔或者说滑动距离,这样创建的窗口就是滑动窗口。(timeWindow(Time.seconds(60))的方法实现就是创建一个滚动窗口)

在代码中,我们使用window(SlidingProcessingTimeWindows.of(Time.seconds(60L), Time.seconds(10L)))创建窗口然后计算sum,此时得到了每一个品类过去一分钟内的总交易额。

在这之后,代码中使用windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10L)))方法指定了大小为10秒的滚动窗口。那么windowAllwindow的区别是什么呢?我们不能忘记一件事:Flink是分布式处理引擎,所以计算是同时发生在各个节点的,当使用windowAll时,数据会汇集一个节点去执行我们指定的计算。

windowAll方法返回的是AllWindowedStream类型的对象,使用process方法指定对数据进行何种操作。在process中,我们创建了ProcessAllWindowFunction的匿名子类对象,并将所有元素的Top3拼为字符串并交给Flink。

思考

  1. 计算TopN时我们用到了WindowAll,实际上它就是全局并发为1的操作,那么它的计算受单台机器的限制,且在实际的业务中业务的复杂和量级都可能会出现数据热点,这时要怎么解决呢?
  2. 观察创建时间窗口的类名称:SlidingProcessingTimeWindowsTumblingProcessingTimeWindows,时间窗口的“时间”是什么时间?假如某些数据有延迟很晚才出现在数据流中,如果你来设计Flink会怎么做?

以上问题会在后续的章节中找到答案。

WebUI

可以看到,本章我们使用以下代码创建了ENV:

val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()

主要目的是为了在本地启动时可以看到WebUI(在集群提交任务无需此方法)。在启动后,日志中会输出类似以下内容:

[Chapter 03 - main] 17:07:13.338 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint(RestServerEndpoint.java:139) - Starting rest endpoint.
[Chapter 03 - main] 17:07:13.862 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint(RestServerEndpoint.java:242) - Rest endpoint listening at localhost:8081
[Chapter 03 - main] 17:07:13.865 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint(WebMonitorEndpoint.java:702) - Web frontend listening at http://localhost:8081.
[Chapter 03 - mini-cluster-io-thread-1] 17:07:13.866 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint(WebMonitorEndpoint.java:758) - http://localhost:8081 was granted leadership with leaderSessionID=30a5533d-fbf0-4b70-95d3-cd8813bb6492

说明WebUI启动成功,并且监听本地的8081端口,此时在浏览器中打开http://localhost:8081,在RunningJob选择刚刚启动的Job,可以看到类似以下页面:

可以先在页面上熟悉Flink WebUI提供的模块和可获取信息,后续会根据相应功能介绍页面的使用。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-11-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小晨说数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TopN需求
    • 输入
      • 窗口操作
        • 思考
        • WebUI
        相关产品与服务
        大数据
        全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档