ProcessFunction ProcessFunction 函数是低阶流处理算子,可以访问流应用程序所有(非循环)基本构建块: 事件 (数据流元素) 状态 (容错和一致性) 定时器 (事件时间和处理时间...) ProcessFunction 可以被认为是一种提供了对 KeyedState 和定时器访问的 FlatMapFunction。...如果要访问 KeyedState 和定时器,那必须在 KeyedStream 上使用 ProcessFunction。...; import org.apache.flink.streaming.api.functions.ProcessFunction.Context; import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext...import org.apache.flink.streaming.api.functions.ProcessFunction.Context import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
序 本文主要研究一下flink的ProcessFunction apache-flink-training-datastream-api-processfunction-5-638.jpg 实例 import...; import org.apache.flink.streaming.api.functions.ProcessFunction.Context; import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext...that maintains the count and timeouts */ public class CountWithTimeoutFunction extends ProcessFunction...里头使用keyed state以及timer;process方法使用的ProcessFunction是CountWithTimeoutFunction CountWithTimeoutFunction的...的FlatMapFunction,当要使用keyed state或者timer的时候,可以使用ProcessFunction ProcessFunction继承了AbstractRichFunction
; import org.apache.flink.streaming.api.functions.ProcessFunction.Context; import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext...that maintains the count and timeouts */ public class CountWithTimeoutFunction extends ProcessFunction...里头使用keyed state以及timer;process方法使用的ProcessFunction是CountWithTimeoutFunction CountWithTimeoutFunction的.../org/apache/flink/streaming/api/functions/ProcessFunction.java @PublicEvolving public abstract class...的FlatMapFunction,当要使用keyed state或者timer的时候,可以使用ProcessFunction ProcessFunction继承了AbstractRichFunction
第一次执行processElement,时间是12:01:01,因此state中记录的是12:01:01,registerEventTimeTimer入参就是1...
欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos Flink处理函数实战系列链接 深入了解ProcessFunction...的状态操作(Flink-1.10); ProcessFunction; KeyedProcessFunction类; ProcessAllWindowFunction(窗口处理); CoProcessFunction...关于ProcessFunction类 处理函数有很多种,最基础的应该ProcessFunction类,来看看它的类图,可见有RichFunction的特性open、close,然后自己有两个重要的方法processElement...的匿名子类,里面可以处理上游发来的每个元素,并且还能取得每个元素的时间戳(这个能力很重要),然后将f1字段为奇数的元素过滤掉; 最后将ProcessFunction处理过的数据打印出来,验证处理结果是否符合预期...demo : sideoutput"); } } 这里对上述代码做个介绍: 数据源是个集合,类型是Tuple2,f0字段是字符串,f1字段是整形; ProcessFunction的匿名子类中,
使用分析 六、实例讲解:如何做定时输出 ProcessFunction 是flink 提供面向用户low-level 层级的api,通过ProcessFunction可以访问state...目前在flink中,提供了ProcessFunction与KeyedProcessFunction 这两个面向用户的api,但是ProcessFunction却无法帮助我们注册定时器,透过源码(ProcessOperator...首先以官方文档为例来了解其用法,完成单词计数,并且定时输出功能,文档里面是定义了一个继承ProcessFunction 的的类,猜想这里应该是很早之前的版本文档。...做一个简单的代码流程分析:首先得到一个Tuple2[String,String]类型的数据流,然后按照第一个位置的字段进行分组,那么相同的字段发送到下游相同的节点,后面使用继承ProcessFunction...以上就是关于ProcessFunction 对于定时器的使用分析。
学习Flink的ProcessFunction过程中,官方文档中涉及状态处理的时候,不止一次提到只适用于keyed stream的元素,如下图红框所示: ?...如上图,keyed stream的元素是具有key的特征,与ProcessFunction的操作状态时要求匹配,其他steam的元素由于没有key的特征,所以也就没有状态一说了; 另一种状态是Operator...State,如下图,这是和多并行度计算时的算子实例绑定的,例如当前算子消费kafka的某个分区的最新offset,而ProcessFunction是用来处理stream元素的,不会涉及到Operator...官方demo 为了学习ProcessFunction就去看官方demo,地址是:https://ci.apache.org/projects/flink/flink-docs-release-1.10/...要消除这种不适应,要做的第一件事就是提醒自己:processElement是在框架内运行的,很多数据在之前已经由框架准备好了; 接下来要做的,就是把框架准备数据的逻辑看一遍,除了弄明白自己的问题,由于ProcessFunction
自定义processFunction函数: // 3.2 添加任务,使用{@link ProcessFunction}方便控制: 1. 忽略null数据,2....DetaiEventRuleExecutingProcessor(); 函数内部实现,进行数据的简单四舍五入: public class DetaiEventRuleExecutingProcessor extends ProcessFunction...collector.collect(detailData); } } } 将处理的数据以及旁路数据写入到文件,4一下写入u4, 5以及以上写入b5: //自定义processFunction...sideOutStream(DataStream rawLogStream) { return rawLogStream .process(new ProcessFunction
关于ProcessFunction类 处理函数有很多种,最基础的应该ProcessFunction类,来看看它的类图,可见有RichFunction的特性open、close,然后自己有两个重要的方法processElement...中取得 mainDataStream.print(); env.execute("processfunction demo : simple"); } } 这里对上述代码做个介绍...的匿名子类,里面可以处理上游发来的每个元素,并且还能取得每个元素的时间戳(这个能力很重要),然后将f1字段为奇数的元素过滤掉; 最后将ProcessFunction处理过的数据打印出来,验证处理结果是否符合预期...demo : sideoutput"); } } 这里对上述代码做个介绍: 数据源是个集合,类型是Tuple2,f0字段是字符串,f1字段是整形; ProcessFunction的匿名子类中,...至此,处理函数中最简单的ProcessFunction类的学习和实战就完成了,接下来的文章我们会尝试更多了类型的处理函数
如果想获取数据流中Watermark的时间戳,或者在时间上前后穿梭,需要使用ProcessFunction系列函数,它们是Flink体系中最底层的API,提供了对数据流更细粒度的操作权限。...目前,这个系列函数主要包括KeyedProcessFunction、ProcessFunction、CoProcessFunction、KeyedCoProcessFunction、ProcessJoinFunction...状态的介绍可以参考我的文章:Flink状态管理详解,这里我们重点讲解一下的使用ProcessFunction其他几个特色功能。...ProcessFunction有两个重要的接口processElement和onTimer,其中processElement函数在源码中的Java签名如下: // 处理数据流中的一条元素 public...使用ProcessFunction实现Join 如果想从更细的粒度上实现两个数据流的Join,可以使用CoProcessFunction或KeyedCoProcessFunction。
ProcessFunction和CoProcessFunction 说明 DataStream与KeyedStreamd都有Process方法, DataStream接收的是ProcessFunction...,而KeyedStream接收的是KeyedProcessFunction(原本也支持ProcessFunction,现在已被废弃) 0.AbstractRichFunction介绍 1.ProcessFunction...每次有事件流入processFunction算子就会触发处理。 为了容错,ProcessFunction可以使用RuntimeContext访问flink内部的keyed state。...5.ProcessFunction与状态的结合使用案例 WordCount,如果某一个key一分钟(事件时间)没有更新,就直接输出。...基本思路: // 1.ValueState内部包含了计数、key和最后修改时间 // 2.对于每一个输入的记录,ProcessFunction都会增加计数,并且修改时间戳 // 3.该函数会在事件时间的后续
至此已经学习了DataStream API ,ProcessFunction API 是Flink中最底层的API,可以访问时间戳、watermark 以及注册定时事件。还可以输出特定的一些事件。...、 Process Function 用来构建事件驱动的应用以及实现自定义的业务逻辑,若窗口函数以及转换算子都无法满足业务的要求时,需要请出ProcessFunction 去完成开发任务。...Flink 提供了 8 个 Process Function如下:ProcessFunction、KeyedProcessFunction、CoProcessFunction、ProcessJoinFunction...自定义测输出流实现分流操作 SingleOutputStreamOperator highTempStream = dataStream.process(new ProcessFunction
[CRow, CRow] with ResultTypeQueryable[CRow] with Compiler[ProcessFunction[Row, Row]] with Logging...= _ override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling ProcessFunction...,它的code为CRowProcessRunner的构造器参数,由DataStreamCalc在translateToPlan方法中创建CRowProcessRunner的时候生成 ProcessFunction.../org/apache/flink/streaming/api/functions/ProcessFunction.java @PublicEvolving public abstract class...ProcessFunction extends AbstractRichFunction { private static final long serialVersionUID
欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos Flink处理函数实战系列链接 深入了解ProcessFunction...的状态操作(Flink-1.10); ProcessFunction; KeyedProcessFunction类; ProcessAllWindowFunction(窗口处理); CoProcessFunction...(双流处理); 关于ProcessFunction状态的疑惑 学习Flink的ProcessFunction过程中,官方文档中涉及状态处理的时候,不止一次提到只适用于keyed stream的元素,如下图红框所示...的元素由于没有key的特征,所以也就没有状态一说了; 另一种状态是Operator State,如下图,这是和多并行度计算时的算子实例绑定的,例如当前算子消费kafka的某个分区的最新offset,而ProcessFunction...是用来处理stream元素的,不会涉及到Operator State: 官方demo 为了学习ProcessFunction就去看官方demo,地址是:https://ci.apache.org/
InternalTimerService}. */ private long currentWatermark = Long.MIN_VALUE; public ProcessOperator(ProcessFunction...String, collectorCode: String, @transient var returnType: TypeInformation[CRow]) extends ProcessFunction...CRow] with ResultTypeQueryable[CRow] with Compiler[Any] with Logging { private var function: ProcessFunction...TableFunctionCollector[_]] this.cRowWrapper = new CRowWrappingCollector() LOG.debug(s"Compiling ProcessFunction...constructor = processClazz.getConstructor(classOf[TableFunctionCollector[_]]) LOG.debug("Instantiating ProcessFunction
Aggregation功能,其业务场景是实时计算广告位访客数,流量数据id(广告位ID)、devId(访问ID)、time(访问时间),实现思路: •首先通过对id、设备id分桶编号、小时级别时间分组,使用一个ProcessFunction...计算分桶后的去重数(与MapState方式相同)•然后通过对id、小时级别时间分组,使用另一个ProcessFunction做sum操作,但是这里面需要注意的一个问题是对于相同id与时间其数据可能会来源于上游不同的...task,而上游的每个task的数据都会以全量一直往下发送,如果直接做累加操作会导致重复计算,因此得实现一个类似于sql中retract撤回机制(可参考Flink SQL中可撤回机制解密),也就是上一个ProcessFunction...: class Distinct1ProcessFunction extends KeyedProcessFunction[AdKey1, AdData, Tuple2[Boolean, Tuple3[...聚合实现Distinct2ProcessFunction: class Distinct2ProcessFunction extends KeyedProcessFunction[Tuple2[Int,
我们可以分为如下三种: 无状态算子 有状态算子 定时处理算子(ProcessFunction) 2....testHarness = new KeyedOneInputStreamOperatorTestHarness( new KeyedProcessOperator(processFunction...), testHarness.extractOutputStreamRecords() ); } } 考虑到 ProcessFunction...的重要性,除了上面可以直接用于测试 ProcessFunction 的 TestHarness 之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的 TestHarness...如下所示: public class MyProcessFunction extends ProcessFunction { @Override public
(processMap)); } private static java.util.MapProcessFunction...extends org.apache.thrift.TBase>> getProcessMap(java.util.MapProcessFunction...(processMap)); } private static java.util.MapProcessFunction...extends org.apache.thrift.TBase>> getProcessMap(java.util.MapProcessFunction...; result.success = iface.helloString(args.what); return result; } } 处理类,继承ProcessFunction
---- The ProcessFunction ProcessFunction是一个低级的流处理操作,可以访问所有(非循环)流应用程序的基本组件: Events(流中的事件) state (容错, 一致性...,只在Keyed Stream) timers (事件时间和处理时间,仅仅适用于keyed Stream) 可以将ProcessFunction看做是具备访问keyed状态和定时器的FlatMapFunction...注意:想要访问keyed状态和定时器,则必须在键控流上应用ProcessFunction: stream.keyBy(...).process(new MyProcessFunction()) KeyedProcessFunction...return The transformed {@link DataStream}. ---- /** {@link org.apache.flink.streaming.api.functions.ProcessFunction
领取专属 10元无门槛券
手把手带您无忧上云