前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink Session Window 六个灵魂拷问

Flink Session Window 六个灵魂拷问

作者头像
kk大数据
发布2019-11-15 20:25:51
2.8K0
发布2019-11-15 20:25:51
举报
文章被收录于专栏:kk大数据kk大数据

一、什么是flink 的 session window

与翻滚窗口(Tumbling Window)和滑动窗口(Sliding Window)相比,会话窗口(Session Window)不重叠并且没有固定的开始和结束时间。

当会话窗口在一段时间内没有接收到元素时,即当发生不活动的间隙时,会话窗口关闭

会话窗口分配器可以设置静态会话间隙和动态会话间隙

二、实际应用问题

每个用户在一个独立的session中平均页面访问时长,session 和 session 的间隔时间是15分钟

我们使用 flink 来解决这个问题

(1)读取 kafka 中的数据

(2)基于用户的 userId,设置 一个 session window 的 gap,在同一个session window 中的数据表示用户活跃的区间

(3)最后使用一个自定义的 window Function

大致实现如下:

val session : DataStream[UserVisit] = sEnv    .addSource(new FlinkKafkaConsumer010[String](...))    .keyBy(_.userId)                 .window(EventTimeSessionWindows.withGap(Time.minutes(15)))                .apply(new UserVisitPageCounts())    .print()

三、额外需要考虑的问题

(1)假如 15 分钟之后,突然来了一条之前5分钟的数据,怎么办?

Flink 提供了 allowedLateness 来处理延迟的数据,假设我们预计有些数据会延迟1个小时到来,那么我们可以通过 allowedLateness 这个参数,来使那些延迟的数据成功的分到某一个 session 的窗口中:

.allowedLateness(Time.minutes(60))

(2)假如由于某种原因,数据仍然延迟了1个小时之后,才到来,如何处理,不能总是一直调大 allowedLateness 参数

flink 提供了 sideOutputLateData 参数 让我们得以把延迟之后的数据保存下来,如:

val outputTag = new OutputTag[User]("late_data"){}val window : DataStream[UserVisit] = sEnv    .addSource(new FlinkKafkaConsumer010[String](...))    .keyBy(_.userId)                 .window(EventTimeSessionWindows.withGap(Time.minutes(15)))    .allowedLateness(Time.minutes(60))    .sideOutputLateData(outputTag)    .apply(new UserVisitPageCounts())
val output = window.getSideOutput(outputTag)// 过时数据可以存储在存储介质中,延后处理output.map(f => {println(s"过时数据:$f")})window.print()

(4)假如 用户对我们的app很感兴趣,一直在访问我们的app,导致 gap 一直没有产生,那么这个用户的数据就一直无法及时产生。这样对于结果反馈来说,时间太长了?

flink 为我们提供了 触发器,使得在用户产生访问日志的过程中,周期性的触发窗口计算

如:

val outputTag = new OutputTag[User]("late_data"){}val window : DataStream[UserVisit] = sEnv    .addSource(new FlinkKafkaConsumer010[String](...))    .keyBy(_.userId)                 .window(EventTimeSessionWindows.withGap(Time.minutes(15)))    .allowedLateness(Time.minutes(60))    .sideOutputLateData(outputTag)// 周期性的触发器,每隔15分钟触发一次窗口计算    .trigger(ContinuousEventTimeTrigger.of(Time.minutes(15)))    .apply(new UserVisitPageCounts())
val output = window.getSideOutput(outputTag)// 过时数据可以存储在存储介质中,延后处理output.map(f => {println(s"过时数据:$f")})window.print()

(5)如何重新处理数据?

如果我改了数据,想用以前的数据测试一下,更改前和更改后的程序的结果?但数据从kafka已经被消费掉了,这也能实现吗?

答案是:可以的。

flink 提供了 “savePoint" 功能。例如每天夜里 12:00 产生 savepoint,当你想重新消费数据时,就从那个 savepoint 开始重新消费 kafka 中的数据就行了。

(6)我在使用流的过程中,如何跟其他的 DataStream / DataSet join,获得其他维度数据?

两个流join 的时候,flink 提供了 双流的 join,DataStream Join DataStream

一个流和一个DataSet join时,我们的DataStream 通过实现 RichXXXFunction,重写 open 方法,在open 方法中,将 DataSet 信息写入一个集合容器。然后对DataStream的每个元素去匹配这个集合,即可。

总之,Flink 技术在不断的发展,为我们提供了很多的现成的解决方案,解决了很多其他开源框架无法解决的问题。善用技术,造福世界!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-11-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 KK架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档