ProcessFunction 是flink 提供面向用户low-level 层级的api,通过ProcessFunction可以访问state、注册处理时间/事件时间定时器来帮助我们完成一些比较复杂的操作,但是其有一个限制那就是只用使用在keyedStream中,是由于根据getRuntimeContext 得到的StreamingRuntimeContext 只提供了KeyedStateStore的访问权限,所以只能访问keyd state, 另外根据前面的分析可知,注册的定时器必须是与key相关,也就解释了在ProcessFunction中只能在keyedStream做定时器注册。目前在flink中,提供了ProcessFunction与KeyedProcessFunction 这两个面向用户的api,但是ProcessFunction却无法帮助我们注册定时器,透过源码(ProcessOperator)可以发现,注册时会主动抛出UnsupportedOperationException异常。今天重点在于分析KeyedProcessFunction 是如何完成定时功能。
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/ProcessFunction.java
如下图,在常规的业务开发中,SQL、Table API、DataStream API比较常用,处于Low-level的Porcession相对用得较少,从本章开始,我们一起通过实战来熟悉处理函数(Process Function),看看这一系列的低级算子可以带给我们哪些能力?
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/ScalarFunction.scala
ProcessFunction 函数是低阶流处理算子,可以访问流应用程序所有(非循环)基本构建块:
DataStream与KeyedStreamd都有Process方法, DataStream接收的是ProcessFunction,而KeyedStream接收的是KeyedProcessFunction(原本也支持ProcessFunction,现在已被废弃)
学习Flink的ProcessFunction过程中,官方文档中涉及状态处理的时候,不止一次提到只适用于keyed stream的元素,如下图红框所示:
去重指标作为业务分析里面的一个重要指标,不管是在OLAP存储引擎还是计算引擎都对其实现做了大量工作,在面对不同的数据量、指标精确性要求,都有不同的实现方式,但是总体都逃脱不了硬算、两阶段方式、bitmap、hll等这些实现。本文将分析Split Distinct Aggregation实现原理与使用代码方式实现其功能。
Stub是一段代码,用来转换RPC过程中传递的参数。处理内容包括不同OS之间的大小端问题。另外,Client端一般叫Stub,Server端一般叫Skeleton。
flink-table_2.11-1.7.1-sources.jar!/org/apache/flink/table/functions/UserDefinedFunction.scala
编写单元测试是设计生产应用程序的基本任务之一。如果不进行测试,那么一个很小的代码变更都会导致生产任务的失败。因此,无论是清理数据、模型训练的简单作业,还是复杂的多租户实时数据处理系统,我们都应该为所有类型的应用程序编写单元测试。下面我们将提供有关 Apache Flink 应用程序的单元测试指南。Apache Flink 提供了一个强大的单元测试框架,以确保我们的应用程序在上线后符合我们的预期。
-------------------------------------------------------------------------------------------------------------------------------------------------------
摘要处理函数(ProcessFunction)了。处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。
本文是《Flink处理函数实战》系列的第三篇,上一篇《Flink处理函数实战之二:ProcessFunction类》学习了最简单的ProcessFunction类,今天要了解的KeyedProcessFunction,以及该类带来的一些特性;
ProcessFunction是一个低级的流处理操作,可以访问所有(非循环)流应用程序的基本组件:
TCompactProtocol:高效率,密集的二进制编码格式,使用了zigzag压缩算法,使用了类似于ProtocolBuffer的Variable-Length Quantity (VLQ) 编码方式;
Flink 的 API 大体上可以划分为三个层次:处于最底层的 ProcessFunction、中间一层的 DataStream API 和最上层的 SQL/Table API,这三层中的每一层都非常依赖于时间属性。时间属性是流处理中最重要的一个方面,是流处理系统的基石之一,贯穿这三层 API。在 DataStream API 这一层中因为封装方面的原因,我们能够接触到时间的地方不是很多,所以我们将重点放在底层的 ProcessFunction 和最上层的 SQL/Table API。
本文是《Flink处理函数实战》系列的第四篇,内容是学习以下两个窗口相关的处理函数:
本文介绍了在 Flink 中使用定时器的一些基本概念和注意事项。开发人员可以使用 Flink 的 ProcessFunction 算子来注册自己的定时器,该算子可以访问流应用程序的一些基本构建块,例如:
熟悉flink的同学(说明次系列篇幅不适合没有flink基础同学)都知道flink优于其他实时计算引擎的一个很重要的特点就是提供了Event Time这样一个概念,也就是我们所说的事件时间,能够让用户按照事件(数据)所发生的时间去处理,从而精确还原数据场景。flink 中提供了三种时间概念:处理时间、事件时间、注入时间,在次系列篇幅中主要分析在实际使用中用户常常关心的处理时间与事件时间,以及在flink runtime中是如何处理这两种时间机制的,将会按照以下几个篇幅介绍:
之前提到的一些算子和函数能够进行一些时间上的操作,但是不能获取算子当前的Processing Time或者是Watermark时间戳,调用起来简单但功能相对受限。如果想获取数据流中Watermark的时间戳,或者在时间上前后穿梭,需要使用ProcessFunction系列函数,它们是Flink体系中最底层的API,提供了对数据流更细粒度的操作权限。Flink SQL是基于这些函数实现的,一些需要高度个性化的业务场景也需要使用这些函数。
本文是《Flink处理函数实战》系列的第二篇,上一篇《Flink处理函数实战之一:ProcessFunction类》学习了最简单的ProcessFunction类,今天要了解的KeyedProcessFunction,以及该类带来的一些特性;
本篇幅介绍Flink Table/SQL中如何自定义一个表函数(TableFunction),介绍其基本用法以及与源码结合分析其调用流程。
运行时遇到如下异常,原因是由于hmget返回的List含有null成员,导致thrift编码时异常: 20160415 14:55:39 ERROR org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:484) [Thread-0] Unexpected throwable while invoking! java.lang.NullPointerException at org.apache.thrift.protocol.TBinaryProtocol.writeString(TBinaryProtocol.java:185) at com.test.redis_cluster_proxy.RedisClusterProxyService$hmget_result$hmget_resultStandardScheme.write(RedisClusterProxyService.java:19434) at com.test.redis_cluster_proxy.RedisClusterProxyService$hmget_result$hmget_resultStandardScheme.write(RedisClusterProxyService.java:1) at com.test.redis_cluster_proxy.RedisClusterProxyService$hmget_result.write(RedisClusterProxyService.java:19337) at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:53) at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39) at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:478) at org.apache.thrift.server.TNonblockingServer.requestInvoke(TNonblockingServer.java:115) at org.apache.thrift.server.AbstractNonblockingServer$AbstractSelectThread.handleRead(AbstractNonblockingServer.java:209) at org.apache.thrift.server.TNonblockingServer$SelectAcceptThread.select(TNonblockingServer.java:198) at org.apache.thrift.server.TNonblockingServer$SelectAcceptThread.run(TNonblockingServer.java:154) 当redis中没有相应的field时,hmget返回的List会包含null成员。解决此问题有两个办法: 1)保证查询的field一定存在 2)对hmget返回值做处理,null成员替换成空字符串""
在之前总结的文章中有提到过,Flink框架提供了三层API完成流处理任务。至此已经学习了DataStream API ,ProcessFunction API 是Flink中最底层的API,可以访问时间戳、watermark 以及注册定时事件。还可以输出特定的一些事件。、
上一篇幅中对processing Time的整个注册流程与调用流程做了整体分析,并且分析了Flink中时间系统管理涉及的核心类,此篇幅将会介绍Event Time如何注册定时、定时如何触发。
去重计算应该是数据分析业务里面常见的指标计算,例如网站一天的访问用户数、广告的点击用户数等等,离线计算是一个全量、一次性计算的过程通常可以通过distinct的方式得到去重结果,而实时计算是一种增量、长期计算过程,我们在面对不同的场景,例如数据量的大小、计算结果精准度要求等可以使用不同的方案。此篇介绍如何通过编码方式实现精确去重,以一个实际场景为例:计算每个广告每小时的点击用户数,广告点击日志包含:广告位ID、用户设备ID(idfa/imei/cookie)、点击时间。
本文介绍的内容是侧输出流(SideOutput),在平时大部分的 DataStream API 的算子的输出是单一输出,也就是某一种或者说某一类数据流,流向相同的地方。
SQL是开发人员与数据分析师必备的技能,Flink也提供了Sql方式编写任务,能够很大程度降低开发运维成本,这篇是flink join的终极篇SQL Join, 首先介绍sql join使用方式、然后介绍global join带来的状态存储成本及解决方式、最后从源码角度分析sql join实现。
在Flink中,EventTime即事件时间,能够反映事件在某个时间点发生的真实情况,即使在任务重跑情况也能够被还原,计算某一段时间内的数据,那么只需要将EventTime范围的数据聚合计算即可,但是数据在上报、传输过程中难免会发生数据延时,进而造成数据乱序,就需要考虑何时去触发这个计算,Flink使用watermark来衡量当前数据进度,使用时间戳表示,在数据流中随着数据一起传输,当到watermark达用户设定的允许延时时间,就会触发计算。但是在使用EventTime的语义中,会出现一些不可预知的问题,接下来会介绍笔者在使用过程中遇到的一些问题与解决办法。
宝贝们,还记得前几天博主去的火山引擎大数据场嘛,其中比较令大家感兴趣的就是最后一讲,字节一站式埋点平台的 flink 标准化清洗及拆流任务。
DataGrip连接Hive执行DDL操作报错:「FAILED: ParseException line 1:5 cannot recognize input near ‘show‘ ‘indexeson` in ddl statement」 ❝本文首发于「CSDN」 ❞ 📷 封面 写在前面 ❝搭建离线数仓项目中用DataGrip连接Hive,建立ODS业务表在hiveservice2客户端控制台报错 ❞ FAILED: ParseException line 1:5 cannot recognize i
在本系列的前几篇文章中,我们描述了如何基于动态更新的配置(一组欺诈检测规则)实现灵活的流分区,以及如何利用 Flink 的广播机制在运行时在相关算子之间分配处理配置.
Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
接下来咱们开发一个应用来体验CoProcessFunction,功能非常简单,描述如下:
array, map, struct 都有自己的定义方法,而他们的调用方法基本都相同:
storm客户端提交topology失败: java.lang.RuntimeException: org.apache.thrift7.transport.TTransportException at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:141) at backtype.storm.StormSubmitter.submitTopologyWithProgressBar(Storm
理解状态:《深入了解ProcessFunction的状态操作(Flink-1.10)》
在flink中窗口划分可以基于时间、基于数量,我们这里所涉及到的窗口是针对时间类型窗口:processing-time window与event-time window,时间系统在时间窗口应用主要用来注册窗口触发时间点,来决定窗口什么时候开始执行窗口函数。接下来从源码的角度分析窗口是如何使用时间系统的。
在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。
规则引擎通常对我们的理解就是用来做模式匹配的,在数据流里面检测满足规则要求的数据。有人会问为什么需要规则动态变更呢?直接修改了规则把服务重启一下不就可以了吗,这个当然是不行的,规则引擎里面通常会维护很多不同的规则,例如在监控告警的场景下,如果每个人修改一下自己的监控阈值,就重启一下服务,必然会影响其他人的使用,因此需要线上满足规则动态变更加载。本篇基于Flink-Cep 来实现规则动态变更加载,同时参考了Flink中文社区刘博老师的分享(https://developer.aliyun.com/article/738454),在这个分享里面是针对在处理流中每一个Key使用不同的规则,本篇的讲解将不区分key的规则。
Flink作为流批一体的计算引擎,其面对的是业务场景,面向的使用者是开发人员和运维管理人员。
在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》和《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》中,我们发现如果窗口中元素个数没有把窗口填满,则不会触发计算。
领取专属 10元无门槛券
手把手带您无忧上云