首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Flink SQL自定义聚合函数

本篇幅介绍Flink Table/SQL中如何自定义一个聚合函数,介绍其基本用法、撤回定义以及与源码结合分析每个方法的调用位置。...来说是一个很重要的特性,在Flink SQL中可撤回机制解密中详细分析了撤回的实现,其中retract是一个不可或缺的环节,其表示具体的回撤操作,对于自定义聚合函数,如果其接受到的是撤回流那么就必须实现该方法...,该方法表示将需要撤回的数据从中间结果中去除掉,Flink中默认实现了一些撤回的函数,例如SumWithRetractAggFunction: def retract(acc:SumWithRetractAccumulator...如果流入的数据是Insert类型就会调用accumulate方法,如果是Retract就调用retract方法,并且会调用getValue获取当前的结果数据 if(inputC.change){...input function.retract(accumulators, input) function.setAggregationResults(accumulators, newRow.row

1K20
您找到你想要的搜索结果了吗?
是的
没有找到

2021年大数据Flink(三十三):​​​​​​​Table与SQL相关概念

将表转换为三种不同编码方式的流 Flink中的Table API或者SQL支持三种不同的编码方式。分别是: Append-only流 Retract流 Upsert流 分别来解释下这三种流。...Retract流有几种类型的事件类型: ADD MESSAGE:这种消息对应的就是INSERT操作。 RETRACT MESSAGE:直译过来叫取消消息。这种消息对应的就是DELETE操作。...我们可以看到通过ADD MESSAGE和RETRACT MESSAGE可以很好的向外部系统表达删除和插入操作。那如何进行UPDATE呢?好办!...RETRACT MESSAGE + ADD MESSAGE即可。先把之前的数据进行删除,然后插入一条新的。...完美~ Upsert流 前面我们看到的RETRACT编码方式的流,实现UPDATE是使用DELETE + INSERT模式的。

91420

flink sql 知其所以然(十九):Table 与 DataStream 的转转转(附源码)

1.序篇 废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助: 背景及应用场景介绍:博主期望你能了解到,Flink 支持了 SQL 和 Table...import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment...; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment...3.3.2.Retract 语义 SQL 转 DataStream 注意事项 Retract 语义的 SQL 使用 toDataStream 转换会报错不支持。具体报错截图如下。...Retract error 如果要把 Retract 语义的 SQL 转为 DataStream,我们需要使用 toRetractStream。

2.1K20

一篇文章带你深入了解Flink SQL流处理中的特殊概念

这样得到的表,在 Flink Table API 概念里,就叫做动态表(Dynamic Tables)。...动态表是 Flink 对流数据的 Table API 和 SQL 支持的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。...Flink 的Table API 和 SQL 支持三种方式对动态表的更改进行编码: ① 仅追加(Append-only)流 仅通过插入(Insert)更改,来修改的动态表,可以直接转换为仅追加流...② 撤回(Retract)流 Retract 流是包含两类消息的流,添加(Add)消息和撤回(Retract)消息。...动态表通过将 INSERT 编码为 add 消息、DELETE 编码为 retract 消息、UPDATE 编码为被更改行(前一行)的 retract 消息和更新后行(新行)的 add 消息,转换为 retract

1.4K20

Flink SQL中可撤回机制解密

场景案例 先从一个实际业务场景理解Flink SQL中的撤回机制:设备状态上线/下线数量统计,上游采集设备状态发送到Kafka中,最开始是一个上线状态,此时统计到上线数量+1,过了一段时间该设备下线了...,收到的下线的状态,那么此时应该是上线数量-1,下线数量+1,现在需要实现这样一个需求,看一下在Flink SQL里面如何实现 val env=StreamExecutionEnvironment.getExecutionEnvironment...收到该条数据判断是撤回数据会将之前的结果撤回产生一条(false,1,1)的数据,sql1同时还会产生一条(true,dev1,0) dev1当前的最新状态,sql2收到该条数据重新计算得到(true,0,1) 那么关于这一整套逻辑在Flink...方法,比喻说sql1上游是消费kafka 非撤回流,所以在定义LatestTimeUdf 并没有定义retract,sql2 消费sql1的输出,sql1会产生可撤回消息,那么在其内部会生成retract...input function.retract(accumulators, input) function.setAggregationResults(accumulators, newRow.row

74010

FlinkSQL | 流处理中的特殊概念

这样得到的表,在Flink Table API 概念里,就叫做 “动态表” (Dynamic Tables) 动态表是 Flink 对流数据的 Table API 和 SQL 支持的核心概念。...撤回(Retract)流 Retract流是包含两类消息的流,添加(Add)消息和撤回(Retract)消息。...动态表通过将 INSERT 编码为 add 消息、DELETE 编码为retract消息、UPDATE 编码为被更改行(前一行)的 retract 消息和更新后行(新行)的 add 消息,转换为 retract...需要注意的是,在代码里将动态表转换为DataStream时,仅支持 Append 和Retract流 。...为了处理无序事件,并区分流中的准时和迟到事件;Flink需要从事件数据中,提取时间戳,并用来推进事件时间的进展(watermark)。

1.9K20

flink sql 知其所以然(十三):流 join 很难嘛???(下)

sql left join 数据不会互相等待,存在 retract 问题,会导致写入 kafka 的数据量变大, 然后转变思路为使用 flink sql interval join 的方式可以使得数据互相等待一段时间进行...join,这种方式不会存在 retract 问题 flink sql interval join 的解决方案以及原理的介绍:主要介绍 interval join 的在上述实战案例的运行结果及分析源码机制...5.总结与展望 本文主要介绍了 flink sql interval 是怎么避免出现 flink regular join 存在的 retract 问题的,并通过解析其实现说明了运行原理,博主期望你读完本文之后能了解到...sql left join 数据不会互相等待,存在 retract 问题,会导致写入 kafka 的数据量变大, 然后转变思路为使用 flink sql interval join 的方式可以使得数据互相等待一段时间进行...join,这种方式不会存在 retract 问题 flink sql interval join 的解决方案以及原理的介绍:主要介绍 interval join 的在上述实战案例的运行结果及分析源码机制

87720

flink sql 知其所以然(十):大家都用 cumulate window 计算累计指标啦

本文要介绍的就是周期内累计 PV,UV 指标在 flink 1.13 版本的最优解决方案。 3.预期的效果 先来一个实际案例来看看在具体输入值的场景下,输出值应该长啥样。...1分钟) 但是上述两种解决方案产出的都是 retract 流,关于 retract 流存在的缺点见如下文章: 踩坑记 | flink sql count 还有这种坑!...1.13 及之后 诞生了 cumulate window 解法,具体见官网链接: https://nightlies.apache.org/flink/flink-docs-release-1.13/...https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/timezone/ 4.3.cumulate window...问题1:tumble window + early-fire retract 流问题。 cumulate window 是 append 流,自然没有 retract 流的问题。

2.1K31
领券