前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink SQL中可撤回机制解密

Flink SQL中可撤回机制解密

作者头像
Flink实战剖析
发布2022-04-18 12:57:04
8310
发布2022-04-18 12:57:04
举报
文章被收录于专栏:Flink实战剖析

场景案例

先从一个实际业务场景理解Flink SQL中的撤回机制:设备状态上线/下线数量统计,上游采集设备状态发送到Kafka中,最开始是一个上线状态,此时统计到上线数量+1,过了一段时间该设备下线了,收到的下线的状态,那么此时应该是上线数量-1,下线数量+1,现在需要实现这样一个需求,看一下在Flink SQL里面如何实现

代码语言:javascript
复制
val env=StreamExecutionEnvironment.getExecutionEnvironment

    val tabEnv=TableEnvironment.getTableEnvironment(env)

    tabEnv.registerFunction("latestTimeUdf",newLatestTimeUdf())

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)



    val kafkaConfig=newProperties()

    kafkaConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092")

    kafkaConfig.put(ConsumerConfig.GROUP_ID_CONFIG,"test1")



    val consumer=newFlinkKafkaConsumer011[String]("topic1",newSimpleStringSchema,kafkaConfig)

    val ds=env.addSource(consumer)

.map(x=>{

        val a=x.split(",")

DevData(a(0),a(1).toInt,a(2).toLong)

}).assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractor[DevData](Time.milliseconds(1000)){

overridedef extractTimestamp(element:DevData):Long= element.times

})



    tabEnv.registerDataStream("tbl1",ds,'devId,'status,'times,'rt.rowtime)

    val dw=tabEnv.sqlQuery(

"""

        select st,count(*) from (

                select latestTimeUdf(status,times) st,devId from tbl1 group by devId

                ) a group by st

      """.stripMargin)

    dw.writeToSink(newPaulRetractStreamTableSink)

        env.execute()
 

自定义udf获取最新的设备状态

代码语言:javascript
复制
publicclassLatestTimeUdfextendsAggregateFunction<Integer,TimeAndStatus>{



@OverridepublicTimeAndStatus createAccumulator(){

returnnewTimeAndStatus();

}



publicvoid accumulate(TimeAndStatus acc,Integer status,Long time){

if(time > acc.getTimes()){

            acc.setStatus(status);

            acc.setTimes(time);

}

}



@OverridepublicInteger getValue(TimeAndStatus timeAndStatus){

return timeAndStatus.getStatus();

}

}
 

看一组测试数据: 输入数据 dev1,1,1574347472000 得到结果:

代码语言:javascript
复制
2>(true,1,1) 

继续输入dev1,0,1574347474000 得到结果:

代码语言:javascript
复制
2>(false,1,1)//撤回

2>(true,0,1)

第二条数据输入dev1新的状态数据,导致最后结果的变更。

源码分析

首先分析一下上述得到结果编码Ture或者False是如何确定的: 内部sql1: select latestTimeUdf(times,status) st,devId from tbl1 group by devId,这是一个聚合操作,目的是求出设备当前的状态; 对于外部sql2: select st,count(*) from (sql1) a group by st,同样是一个聚合操作,用于求出不同状态对应的设备数量 输入第一条数据dev1,1,1574347472000对于sql1 来说会产生(true,dev1,1) 的结果,sql2 接受到该结果生成(true,1,1) 就是是结果数据;接着输入第二条数据dev1,0,1574347474000 ,由于dev1的设备状态发生变更,sql1首先发送一条撤回数据(false,dev1,1),sql2收到该条数据判断是撤回数据会将之前的结果撤回产生一条(false,1,1)的数据,sql1同时还会产生一条(true,dev1,0) dev1当前的最新状态,sql2收到该条数据重新计算得到(true,0,1)

那么关于这一整套逻辑在Flink中是如何实现的?代码入口是:DataStreamGroupAggregate 聚合操作的物理执行计划,另外说明在table/SQL api里面数据流动的格式是CRow,包含两个字段:一个是Boolean类型,表示是否是撤回,另外一个是Row类型,真正的数据。

  1. 具体的执行逻辑是通过其translateToPlan来生成,通过AggregateUtil.createGroupAggregateFunction方法动态生成具体的Function,在生成Function 会判断上游消费的数据是否是可撤回来决定是否生成retract方法,比喻说sql1上游是消费kafka 非撤回流,所以在定义LatestTimeUdf 并没有定义retract,sql2 消费sql1的输出,sql1会产生可撤回消息,那么在其内部会生成retract方法,这部分是代码自动生成的
  2. 生成的Function被GroupAggProcessFunction包装,最主要的就是这里面processElement方法的逻辑
  • registerProcessingCleanupTimer注册状态的过期时间,过期配置通过StreamQueryConfig获取,后面定时触发会调用onTimer方法
代码语言:javascript
复制
val currentTime = ctx.timerService().currentProcessingTime()

// register state-cleanup timer

registerProcessingCleanupTimer(ctx, currentTime)
 
  • state 存储中间结果状态、cntState存储流入对应key数量,获取当前中间结果accumulators,如果为空则,通过createAccumulators创建,获取当前对应key数量inputCnt,如果为空,则初始化为0
代码语言:javascript
复制
val input = inputC.row

// get accumulators and input counter

var accumulators = state.value()

var inputCnt = cntState.value()

if(null== accumulators){

      firstRow =true

      accumulators =function.createAccumulators()

}else{

      firstRow =false

}

if(null== inputCnt){

      inputCnt =0L

}
 
  • newRow/prevRow 分别对应新产生结果(撤回标识True)与之前的结果(撤回标识False),setForwardedFields 设置输出的key, setAggregationResults将之前的结果设置到prevRow中
代码语言:javascript
复制
// Set group keys value to the final output

function.setForwardedFields(input, newRow.row)

function.setForwardedFields(input, prevRow.row)

// Set previous aggregate result to the prevRow

function.setAggregationResults(accumulators, prevRow.row)
 
  • 如果输入的是insert即True, 则inputCnt+1, 调用accumulate 将当前流入数据添加到中间结果accumulators中得到新的结果,调用setAggregationResults设置新的结果到newRow结果中, 如果输入的是retract即False, 则inputCnt-1,调用retract从accumulators撤回当前的输入得到新的结果,调用setAggregationResults设置新的结果到newRow结果中
代码语言:javascript
复制
// update aggregate result and set to the newRow

if(inputC.change){

      inputCnt +=1

// accumulate input

function.accumulate(accumulators, input)

function.setAggregationResults(accumulators, newRow.row)

}else{

      inputCnt -=1

// retract input

function.retract(accumulators, input)

function.setAggregationResults(accumulators, newRow.row)

}
 
  • 如果当前的inputCnt!=0, 表明当前中间状态还有数据,那么就更新当前state/cntState, 接下来判断是否发送撤回数据,如果当前没有中间状态,那么就表示需要撤回之前的数据,然后清空状态
代码语言:javascript
复制
if(inputCnt !=0){

// we aggregated at least one record for this key



// update the state

      state.update(accumulators)//更新状态操作

      cntState.update(inputCnt)



// if this was not the first row

if(!firstRow){

if(prevRow.row.equals(newRow.row)&&!stateCleaningEnabled){

//如果处理前后的结果是一致的并且也没有TTL那么就没有发送一条数据到下游,

//这里前后一致不发送很好理解,假如说有ttl, 那也是需要针对下游需要做状态过期时间的更新

return

}else{

// retract previous result

if(generateRetraction){//是否生成撤回数据

out.collect(prevRow)

}

}

}

// emit the new result

out.collect(newRow)//发出新的结果



}else{

// we retracted the last record for this key

// sent out a delete message

out.collect(prevRow)

// and clear all state

      state.clear()//清空状态

      cntState.clear()

}
 

总结

总的来说撤回机制是需要状态、撤回操作的支持,状态是为了保存当前的数据,下次如果需要发生撤回,就将该数据发出去,撤回操作可以理解为function里面的retract方法,能够支持这个数据撤回的计算操作。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-11-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 场景案例
  • 源码分析
  • 总结
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档