前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink join终结者:SQL Join

Flink join终结者:SQL Join

作者头像
Flink实战剖析
发布2022-04-18 12:08:37
7750
发布2022-04-18 12:08:37
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

SQL是开发人员与数据分析师必备的技能,Flink也提供了Sql方式编写任务,能够很大程度降低开发运维成本,这篇是flink join的终极篇SQL Join, 首先介绍sql join使用方式、然后介绍global join带来的状态存储成本及解决方式、最后从源码角度分析sql join实现。

一、SQL JOIN使用方式

对于sql join可以分为两类:Global Join、Time-windowed Join

  • Global Join Global Join表示全局join, 也可以称为无限流join, 由于没有时间限制,任何时候流入的数据都可以被关联上,支持inner join、left join、right join、full join 连接语法。使用语法遵循standard ANSI SQL。使用方式:
代码语言:javascript
复制
   SELECT *

   FROM Orders INNER/LEFT/RIGHT/FULL JOIN Product ON Orders.productId = Product.id
  • Time-windowed Join 基于时间窗口的join, 流表的数据关联必须在一定的时间范围内,同样支持inner join、left join、right join、full join,但是不同的是条件中带有时间属性条件,有以下几种使用方式:
代码语言:javascript
复制
 ltime = rtime

 ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE

 ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

ltime、rtime表示流表的时间属性字段。 其实现与interval join 使用了相同的实现方式,不同的是: a. Time-windowed Join 即可支持Event-Time,也可支持Processing-Time b. interval join 只支持inner join,Time-windowed Join支持多种类型join 以Flink intervalJoin 使用与原理分析 中订单流与地址流为例,sql实现:

代码语言:javascript
复制
   select o.userId,a.addrId from orders o left join address a on o.addrId=a.addrId

        and o.rtt BETWEEN a.rt - INTERVAL '5' SECOND AND a.rt - INTERVAL '1' SECOND

二、Idle State Retention Time 使用

global join 能够join 上任何时刻的数据,是由于状态中保存了两个流表的所有数据,这些数据都保存在状态中,默认情况下是不会被过期,但是两个流表又是持续输入的,待数日或者数月之后,状态数据会无限增大,但是很多时候我们数据关联具有时效性,例如只要求当天数据关联即可,那么这种方式会内存或者磁盘造成不必要浪费。那我们的目标就是能够设置状态ttl,在到达过期时间能够被自动清除,在DataStream API 可以通过StateTtlConfig 来设置状态的ttl, 但是sql方式就无法通过这种方式设置,好在flink 提供了Idle State Retention Time 空闲状态的保留时间,通过配置StreamQueryConfig来设置ttl时间,并且只能按照Processing-time来清理数据,从数据流入系统到当数据未被读写时间达到ttl 就会被自动清除。先看下其使用方式:

代码语言:javascript
复制
 val config=tabEnv.queryConfig.withIdleStateRetentionTime(Time.minutes(1),Time.minutes(6))

tabEnv.sqlUpdate('"',config)

tabEnv.sqlQuery("",config)

tab.writeToSink(sink,config)

withIdleStateRetentionTime(minTime: Time, maxTime: Time), minTime/maxTime 分别表示空闲保留最小/最大时间,但是必须满足maxTime-minTime>=5min,接下来看下数据的ttl设置: 初始默认的数据ttl = curProcessTime(数据流入当前系统时间) + maxRetentionTime(maxTime),之后每有相同的数据流入,只要满足curProcessTime + minRetentionTime > oldExpiredTime(上一次设置ttl的时间),就将其ttl设置为curProcessTime + maxRetentionTime。

另外还有两点需注意:

  • Idle State Retention Time 不是全局有效,需要在每一个使用sqlUpdate/sqlQuery中单独设置
  • 数据定时清理同样是依赖flink 定时机制,会将定时数据存储在内存状态中,会对内存造成比较大的压力,可以选择rocksDB 来代替内存作为stateBackend

三、源码分析

Flink SQL 中使用了apache calcite来完成sql解析、验证、逻辑计划/物理计划生成以及优化工作,物理计划都需要实现DataStreamRel接口,其中DataStreamWindowJoin与DataStreamJoin 分别对应Time-window join 与 global window的物理执行计划,由于Time-window join 与 interval-join的实现步骤大体相似,最终还是会调用到IntervalJoinOperator,这里不做分析。主要分析一下,Global window 的执行过程,从DataStreamJoin入手。

  • DataStreamJoin中translateToPlan方法。 该方法获取左右两个流表对应的DataStream, 根据不同join 类型选择不同的ProcessFunction,例如inner join 选择NonWindowInnerJoin,将leftDataStream 与 rightDataStream 进行connect 得到ConnectedStreams 然后执行对应的ProcessFunction
  • 以 inner join为例分析NonWindowInnerJoin, 继承了NonWindowJoin,而NonWindowJoin又继承了CoProcessFunction,与ProcessFunction针对一个流相反,CoProcessFunction是针对两个流的low level api, 可以访问状态、注册定时器。join 逻辑在其processElement方法中
代码语言:javascript
复制
override def processElement(

      value: CRow,

      ctx: CoProcessFunction[CRow, CRow, CRow]#Context,

      out: Collector[CRow],

      timerState: ValueState[Long],

      currentSideState: MapState[Row, JTuple2[Long, Long]],

      otherSideState: MapState[Row, JTuple2[Long, Long]],

      isLeft: Boolean): Unit = {



    val inputRow = value.row

    updateCurrentSide(value, ctx, timerState, currentSideState)



    cRowWrapper.setCollector(out)

    cRowWrapper.setChange(value.change)

    val otherSideIterator = otherSideState.iterator()

    // join other side data

    while (otherSideIterator.hasNext) {

      val otherSideEntry = otherSideIterator.next()

      val otherSideRow = otherSideEntry.getKey

      val otherSideCntAndExpiredTime = otherSideEntry.getValue

      // join

      cRowWrapper.setTimes(otherSideCntAndExpiredTime.f0)

      callJoinFunction(inputRow, isLeft, otherSideRow, cRowWrapper)

      // clear expired data. Note: clear after join to keep closer to the original semantics

      if (stateCleaningEnabled && curProcessTime >= otherSideCntAndExpiredTime.f1) {

        otherSideIterator.remove()

      }

    }

  }
 

两个MapState对应两个流的缓存数据,key表示具体的数据ROW,Value表示数据ROW的数量与过期时间,由于数据流入过程中可能会存在多条相同的记录,以数据ROW作为key这种方式可以减少内存使用. ValueState 用于存储数据的过期时间,以便任务失败恢复能够继续对数据执行过期操作。 processElement 执行流程: a. updateCurrentSide 保存数据与更新数据的count与ttl, 同时会注册数据的过期时间,数据的过期时间是根据Idle State Retention Time来设置的,从StreamQueryConfig可以获取到 b. 循环遍历另外一个状态,调用callJoinFunction输出数据,在callJoinFunction里面使用的joinFunction是通过FunctionCodeGenerator动态生成的在,在DataStreamJoin的translateToPlan方法中被调用到,有兴趣可以debug 方式copy下来研读一下。

  • 过期数据的清理定时是在updateCurrentSide注册的,其清理工作是在NonWindowJoin的onTimer方法完成,onTimer方法是从CoProcessFunction中继承过来的。在onTimer主要做过期时间判断并且清理。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-11-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、SQL JOIN使用方式
  • 二、Idle State Retention Time 使用
  • 三、源码分析
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档