首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往
您找到你想要的搜索结果了吗?
是的
没有找到

Flink处理函数实战之二:ProcessFunction

欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos Flink处理函数实战系列链接 深入了解ProcessFunction...的状态操作(Flink-1.10); ProcessFunction; KeyedProcessFunction类; ProcessAllWindowFunction(窗口处理); CoProcessFunction...关于ProcessFunction类 处理函数有很多种,最基础的应该ProcessFunction类,来看看它的类图,可见有RichFunction的特性open、close,然后自己有两个重要的方法processElement...的匿名子类,里面可以处理上游发来的每个元素,并且还能取得每个元素的时间戳(这个能力很重要),然后将f1字段为奇数的元素过滤掉; 最后将ProcessFunction处理过的数据打印出来,验证处理结果是否符合预期...demo : sideoutput"); } } 这里对上述代码做个介绍: 数据源是个集合,类型是Tuple2,f0字段是字符串,f1字段是整形; ProcessFunction的匿名子类中,

34710

flink时间系统系列之ProcessFunction 使用分析

使用分析 六、实例讲解:如何做定时输出 ProcessFunction 是flink 提供面向用户low-level 层级的api,通过ProcessFunction可以访问state...目前在flink中,提供了ProcessFunction与KeyedProcessFunction 这两个面向用户的api,但是ProcessFunction却无法帮助我们注册定时器,透过源码(ProcessOperator...首先以官方文档为例来了解其用法,完成单词计数,并且定时输出功能,文档里面是定义了一个继承ProcessFunction 的的类,猜想这里应该是很早之前的版本文档。...做一个简单的代码流程分析:首先得到一个Tuple2[String,String]类型的数据流,然后按照第一个位置的字段进行分组,那么相同的字段发送到下游相同的节点,后面使用继承ProcessFunction...以上就是关于ProcessFunction 对于定时器的使用分析。

56320

深入了解ProcessFunction的状态操作(Flink-1.10)

学习Flink的ProcessFunction过程中,官方文档中涉及状态处理的时候,不止一次提到只适用于keyed stream的元素,如下图红框所示: ?...如上图,keyed stream的元素是具有key的特征,与ProcessFunction的操作状态时要求匹配,其他steam的元素由于没有key的特征,所以也就没有状态一说了; 另一种状态是Operator...State,如下图,这是和多并行度计算时的算子实例绑定的,例如当前算子消费kafka的某个分区的最新offset,而ProcessFunction是用来处理stream元素的,不会涉及到Operator...官方demo 为了学习ProcessFunction就去看官方demo,地址是:https://ci.apache.org/projects/flink/flink-docs-release-1.10/...要消除这种不适应,要做的第一件事就是提醒自己:processElement是在框架内运行的,很多数据在之前已经由框架准备好了; 接下来要做的,就是把框架准备数据的逻辑看一遍,除了弄明白自己的问题,由于ProcessFunction

87930

ProcessFunction:Flink最底层API使用案例详解

如果想获取数据流中Watermark的时间戳,或者在时间上前后穿梭,需要使用ProcessFunction系列函数,它们是Flink体系中最底层的API,提供了对数据流更细粒度的操作权限。...目前,这个系列函数主要包括KeyedProcessFunction、ProcessFunction、CoProcessFunction、KeyedCoProcessFunction、ProcessJoinFunction...状态的介绍可以参考我的文章:Flink状态管理详解,这里我们重点讲解一下的使用ProcessFunction其他几个特色功能。...ProcessFunction有两个重要的接口processElement和onTimer,其中processElement函数在源码中的Java签名如下: // 处理数据流中的一条元素 public...使用ProcessFunction实现Join 如果想从更细的粒度上实现两个数据流的Join,可以使用CoProcessFunction或KeyedCoProcessFunction。

1.6K43

Flink处理函数实战之一:ProcessFunction

关于ProcessFunction类 处理函数有很多种,最基础的应该ProcessFunction类,来看看它的类图,可见有RichFunction的特性open、close,然后自己有两个重要的方法processElement...中取得 mainDataStream.print(); env.execute("processfunction demo : simple"); } } 这里对上述代码做个介绍...的匿名子类,里面可以处理上游发来的每个元素,并且还能取得每个元素的时间戳(这个能力很重要),然后将f1字段为奇数的元素过滤掉; 最后将ProcessFunction处理过的数据打印出来,验证处理结果是否符合预期...demo : sideoutput"); } } 这里对上述代码做个介绍: 数据源是个集合,类型是Tuple2,f0字段是字符串,f1字段是整形; ProcessFunction的匿名子类中,...至此,处理函数中最简单的ProcessFunction类的学习和实战就完成了,接下来的文章我们会尝试更多了类型的处理函数

95950

ProcessFunction:Flink最底层API使用踩坑记录

ProcessFunction和CoProcessFunction 说明 DataStream与KeyedStreamd都有Process方法, DataStream接收的是ProcessFunction...,而KeyedStream接收的是KeyedProcessFunction(原本也支持ProcessFunction,现在已被废弃) 0.AbstractRichFunction介绍 1.ProcessFunction...每次有事件流入processFunction算子就会触发处理。 为了容错,ProcessFunction可以使用RuntimeContext访问flink内部的keyed state。...5.ProcessFunction与状态的结合使用案例 WordCount,如果某一个key一分钟(事件时间)没有更新,就直接输出。...基本思路: // 1.ValueState内部包含了计数、key和最后修改时间 // 2.对于每一个输入的记录,ProcessFunction都会增加计数,并且修改时间戳 // 3.该函数会在事件时间的后续

2.5K20

Flink处理函数实战之一:深入了解ProcessFunction的状态(Flink-1.10)

欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos Flink处理函数实战系列链接 深入了解ProcessFunction...的状态操作(Flink-1.10); ProcessFunction; KeyedProcessFunction类; ProcessAllWindowFunction(窗口处理); CoProcessFunction...(双流处理); 关于ProcessFunction状态的疑惑 学习Flink的ProcessFunction过程中,官方文档中涉及状态处理的时候,不止一次提到只适用于keyed stream的元素,如下图红框所示...的元素由于没有key的特征,所以也就没有状态一说了; 另一种状态是Operator State,如下图,这是和多并行度计算时的算子实例绑定的,例如当前算子消费kafka的某个分区的最新offset,而ProcessFunction...是用来处理stream元素的,不会涉及到Operator State: 官方demo 为了学习ProcessFunction就去看官方demo,地址是:https://ci.apache.org/

25130

编码方式实现Split Distinct Aggregation功能

Aggregation功能,其业务场景是实时计算广告位访客数,流量数据id(广告位ID)、devId(访问ID)、time(访问时间),实现思路: •首先通过对id、设备id分桶编号、小时级别时间分组,使用一个ProcessFunction...计算分桶后的去重数(与MapState方式相同)•然后通过对id、小时级别时间分组,使用另一个ProcessFunction做sum操作,但是这里面需要注意的一个问题是对于相同id与时间其数据可能会来源于上游不同的...task,而上游的每个task的数据都会以全量一直往下发送,如果直接做累加操作会导致重复计算,因此得实现一个类似于sql中retract撤回机制(可参考Flink SQL中可撤回机制解密),也就是上一个ProcessFunction...: class Distinct1ProcessFunction extends KeyedProcessFunction[AdKey1, AdData, Tuple2[Boolean, Tuple3[...聚合实现Distinct2ProcessFunction: class Distinct2ProcessFunction extends KeyedProcessFunction[Tuple2[Int,

43010
领券