本文字数:45723,阅读时长大约30分钟
导读:Flink是由德国几所大学发起的的学术项目,后来不断发展壮大,并于2014年末成为Apache顶级项目。Flink如何在流处理中多得王者地位?带着问题在文章寻找答案吧。
作者
857技术社区 刘浩
“Flink是由德国几所大学发起的的学术项目,后来不断发展壮大,并于2014年末成为Apache顶级项目。Flink主要面向流处理,如果说Spark是批处理界的王者,那么Flink就是流处理领域的冉冉升起的新星。在 Flink之前,不乏流式处理引擎,比较著名的有Storm、Spark Streaming,但某些特性远不如Flink。 ”
“第一代被广泛采用的流处理框架是Strom。在多项基准测试中,Storm的数据吞吐量和延迟都远逊于Flink。Storm只支持"at least once"和"at most once",即数据流里的事件投递只能保证至少一次或至多一次,不能保证只有一次。对于很多对数据准确性要求较高的应用,Storm有一定劣势。第二代非常流行的流处理框架是Spark Streaming。Spark Streaming使用mini-batch的思想,每次处理一小批数据,一小批数据包含多个事件,以接近实时处理的效果。因为它每次计算一小批数据,因此总有一些延迟。但Spark Streaming的优势是拥有Spark这个靠山,用户从Spark迁移到Spark Streaming的成本较低,因此能给用户提供一个批量和流式于一体的计算框架。 Flink是与上述两代框架都不太一样的新一代计算框架,它是一个支持在有界和无界数据流上做有状态计算的大数据引擎。它以事件为单位,并且支持SQL、State、WaterMark等特性。它支持"exactly once",即事件投递保证只有一次,不多也不少,这样数据的准确性能得到提升。比起Storm,它的吞吐量更高,延迟更低,准确性能得到保障;比起Spark Streaming,它以事件为单位,达到真正意义上的实时计算,且所需计算资源相对更少。 之前提到,数据都是以流的形式产生的。数据可以分为有界(bounded)和无界(unbounded),批量处理其实就是一个有界的数据流,是流处理的一个特例。Flink基于这种思想,逐步发展成一个可支持流式和批量处理的大数据框架。 经过几年的发展,Flink的API已经非常完善,可以支持Java、Scala和Python,并且支持SQL。Flink的Scala版API与Spark非常相似,有Spark经验的程序员可以用一个小时的时间熟悉Flink API。 与Spark类似,Flink目前主要面向计算,并且可以与Hadoop生态高度集成。Spark和Flink各有所长,也在相互借鉴,一边竞争,一边学习,究竟最终谁能一统江湖,我们拭目以待。 ”
“Apache Flink 是由 Apache 软件基金会开发的开源流处理框架,其核心是用 Java 和 Scala 编写的分布式流数据流引擎。Flink 以数据并行和流水线方式执行任意流数据程序,Flink 的 流水线运行时系统可以执行批处理和流处理程序。此外,Flink 的运行时本身也支持迭代算 法的执行。 ”
Flink的官网主页地址: https://flink.apache.org/
“流数据更真实的反映了我们的生活方式 传统的数据架构是基于有限数据集的 针对无限和有限数据流进行有状态计算的分布式执行引擎框架。集群部署,随意扩容;内存计算,速度快。 ”
“可以由流处理框架构建和执行的应用程序类型是由框架对 流、状态、时间 的支持程度来决定的。在下文中,我们将对上述这些流处理应用的基本组件逐一进行描述,并对 Flink 处理它们的方法进行细致剖析。 ”
流 | |
---|---|
状态 | 在一定时间内存储所接收的事件或中间结果 |
时间 | 事件时间,根据事件本身自带的时间戳进行结果的计算,保证结果的准确性和一致性。处理时间,根据处理引擎的机器时钟触发计算,低延迟需求,并且能够容忍近似结果。 |
1.3.1 应用场景
事件驱动型应用 | 从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。 |
---|---|
数据分析应用 | 从原始数据中提取有价值的信息和指标。 |
数据管道应用 | 数据管道和 ETL 作业的用途相似,都可以转换、丰富数据,并将其从某个存储系统移动到另一个。但数据管道是以持续流模式运行,而非周期性触发。 |
提取-转换-加载(ETL):一种在存储系统之间进行数据转换和迁移的常用方法。ETL 作业通常会周期性地触发,将数据从事务型数据库拷贝到分析型数据库或数据仓库。 |
“数据处理有不同的方式。 对于具体应用来说,有些场景数据是一个一个来的,是一组有序的数据序列,我们把它叫作“数据流”;而有些场景的数据,本身就是一批同时到来,是一个有限的数据集,这就是批量数据(有时也直接叫数据集)。 处理数据流,应该“来一个就处理一个”,这种数据处理模式就叫作流处理;因为这种处理是即时的,所以也叫实时处理。与之对应,处理批量数据自然就应该一批读入、一起计算,这种方式就叫作批处理,也叫作离线处理。 流数据更真实地反映了我们的生活方式。真实场景中产生的,一般都是数据流。 ”
“IT互联网公司往往会用不同的应用程序来处理各种业务。比如内部使用的企业资源规划(ERP)系统、客户关系管理(CRM)系统,还有面向客户的Web应用程序。 ”
“这些应用程序在处理数据的模式上有共同之处:接收的数据是持续生成的事件,比如用户的点击行为,客户下的订单,或者操作人员发出的请求。处理事件时,应用程序需要先读取远程数据库的状态,然后按照处理逻辑得到结果,将响应返回给用户,并更新数据库状态。一般来说,一个数据库系统可以服务于多个应用程序,它们有时会访问相同的数据库或表。这就是传统的“事务处理”架构。 ”
“我们可以把流处理需要的额外数据保存成一个“状态”,然后针对这条数据进行处理,并且更新状态。这就是所谓的“有状态的流处理”。 为了加快访问速度,我们可以直接将状态保存在本地内存。当应用收到一个新事件时,它可以从状态中读取数据,也可以更新状态。而当状态是从内存中读写的时候,这就和访问本地变量没什么区别了,实时性可以得到极大的提升。 另外,数据规模增大时,我们也不需要做重构,只需要构建分布式集群,各自在本地计算就可以了,可扩展性也变得更好。 因为采用的是一个分布式系统,所以还需要保护本地状态,防止在故障时数据丢失。我们可以定期地将应用状态的一致性检查点(checkpoint)存盘,写入远程的持久化存储,遇到故障时再去读取进行恢复,这样就保证了更好的容错性。 ”
有状态的流处理是一种通用而且灵活的设计架构,可用于许多不同的场景。具体来说,有以下几种典型应用。
“有状态的流处理架构上其实并不复杂,很多用户基于这种思想开发出了自己的流处理系统,这就是第一代流处理器。Apache Storm就是其中的代表。Storm 提供了低延迟的流处理,但很难实现高吞吐,而且无法保证结果的正确性。 ”
“与批处理器相比,第一代流处理器牺牲了结果的准确性,用来换取更低的延迟。而批处理器恰好反过来,牺牲了实时性,换取了结果的准确。 我们自然想到,如果可以让二者做个结合,不就可以同时提供快速和准确的结果了吗?正是基于这样的思想,产生了所谓的Lambda架构。 ”
“Lambda架构主体是传统批处理架构的增强。它的“批处理层”(Batch Layer)就是由传统的批处理器和存储组成,而“实时层”(Speed Layer)则由低延迟的流处理器实现。数据到达之后,一方面由流处理器进行实时处理,另一方面写入批处理存储空间,等待批处理器批量计算。流处理器快速计算出一个近似结果,并将它们写入“流处理表”中。而批处理器会定期处理存储中的数据,将准确的结果写入批处理表,并从快速表中删除不准确的结果。最终,应用程序会合并快速表和批处理表中的结果,并展示出来。 Lambda架构现在已经不再是最先进的,但仍在许多地方使用。它的优点非常明显,就是兼具了批处理器和第一代流处理器的特点,同时保证了低延迟和结果的准确性。而它的缺点同样非常明显。首先,Lambda架构本身就很难建立和维护;而且,它需要我们对一个应用程序,做出两套语义上等效的逻辑实现,因为批处理和流处理是两套完全独立的系统,它们的API也完全不同。 ”
“之前的分布式流处理架构,都有明显的缺陷,人们也一直没有放弃对流处理器的改进和完善。终于,在原有流处理器的基础上,新一代分布式开源流处理器诞生了。为了与之前的系统区分,我们一般称之为第三代流处理器,代表当然就是Flink。 第三代流处理器通过巧妙的设计,完美解决了乱序数据对结果正确性的影响。这一代系统还做到了精确一次(exactly-once)的一致性保障,是第一个具有一致性和准确结果的开源流处理器。另外,先前的流处理器仅能在高吞吐和低延迟中二选一,而新一代系统能够同时提供这两个特性。所以可以说,这一代流处理器仅凭一套系统就完成了Lambda架构两套系统的工作。 ”
“Flink为全球许多公司和企业的关键业务应用提供了强大的支持。以下是Flink官网列出的知名企业用户,如图所示,他们在生产环境中有各种各样有趣的应用。 ”
“可以看到,各种行业的众多公司都在使用Flink。具体来看,一些行业中的典型应用有:
举例:实时数据报表、广告投放、实时推荐
举例:传感器实时数据采集和显示、实时报警,交通运输业
举例:订单状态实时更新、通知信息推送
举例:实时结算和通知推送,实时检测异常行为 ”
“
”
Flink是第三代分布式流处理器,它的功能丰富而强大。主要特性如下。
“
”
除了上述这些特性之外,Flink还是一个非常易于开发的框架,因为它拥有易于使用的分层API,整体API分层如图所示。
“最底层级的抽象仅仅提供了有状态流,它通过处理函数(Process Function)嵌入到DataStream API中。底层处理函数(Process Function) 与 DataStream API 相集成,使其可以对某些特定的操作进行底层的抽象,它允许用户可以自由地处理来自一个或多个数据流的事件,并使用一致的容错的状态。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。 实际上,大多数应用并不需要底层抽象,而是直接针对核心API(Core APIs) 进行编程,比如DataStream API以及DataSet API。这些API为数据处理提供了通用的构建模块,比如转换(transformations),连接(joins),聚合(aggregations),窗口(windows)操作等。 Table API 是以表为中心的声明式编程,其中表可能会动态变化(在表达流数据时)。Table API遵循关系模型:表有二维数据结构,类似于关系数据库中的表;同时API提供可比较的操作,例如select、project、join、group-by、aggregate等。我们可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。 Flink提供的最高层级的抽象是SQL。这一层抽象在语法与表达能力上与 Table API 类似,但是是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API定义的表上执行。 目前Flink SQL和Table API还在开发完善的过程中,很多大厂都会二次开发符合自己需要的工具包。而DataSet作为批处理API实际应用较少,2020年12月8日发布的新版本1.12.0, 已经完全实现了真正的流批一体,DataSet API已处于软性弃用(soft deprecated)的状态。用Data Stream API写好的一套代码, 即可以处理流数据, 也可以处理批数据。这与之前版本处理有界流的方式是不一样的,Flink已专门对批处理数据做了优化处理。所以我们以介绍DataStream API为主,采用的是目前的最新版本Flink 1.13.0。 ”
“
也称之为 Master,用于协调分布式执行,它用来调度 task,协调检查点,协调失败时恢复 等。Flink 运行时至少存在一个 master,如果配置高可用模式则会存在多个 master,它们其 中有一个是 leader,而其他的都是 standby。
也称之为 Worker,用于执行一个 dataflow 的 task、数据缓冲和 Data Streams 的数据交换, Flink 运行时至少会存在一个 TaskManager。JobManager 和 TaskManager 可以直接运行在物理 机上,或者运行 YARN 这样的资源调度框架,TaskManager 通过网络连接到 JobManager,通 过 RPC 通信告知自身的可用性进而获得任务分配。
Flink 用来提交任务的客户端,可以用命令提交,也可以用浏览器提交
Task 是一个阶段多个功能相同 suntask 的集合,类似 spark 中的 taskset
Subtask 是 flink 中任务执行最小单元,是一个 java 类的实例,这份 java 类中有属性和方法, 完成具体的计算逻辑
没有 shuffle 的多个算子合并在一个 subtask 中就形成了 Operator chain,类似 spark 中的 pipeline
Flink 中计算资源进行隔离的单元,一个 slot 中可以运行多个 subtask,但是这些 subtask 必须 是来自同一个 job 的不同 task 的 subtask
Flink 任务运行过程中计算的中间结果
Flink 用来将中间结果持久化的指定的存储系统的一种定期执行的机制
Flink 用来存储中间计算结果的存储系统,flink 支持三种 statebackend。分别是 memory, fsbackend,rocksDB ”
“下面比较Spark和Flink的不同。一些方法在两个框架中都是相同的,而有些方法有很大不同。 ”
Spark Streaming | Flink |
---|---|
DStream | DataStream |
Trasnformation | Trasnformation |
Action | Sink |
Task | SubTask |
Pipeline | Oprator chains |
DAG | DataFlow Graph |
Master + Driver | JobManager |
Worker + Executor | TaskManager |
框架 | 优点 | 缺点 |
---|---|---|
Storm | 低延迟 | 吞吐量低、不能保证 exactly-once、编程 API 不 丰富 |
Spark Streaming | 吞吐量高、可以保证 exactly-once、编程 API 丰富 | 延迟较高 |
Flink | 低延迟、吞吐量高、可以保证 exactly-once、编程 API 丰富 | 快速迭代中,API 变化比较快 |
“Spark 就是为离线计算而设计的,在 Spark 生态体系中,不论是流处理和批处理都是底层引 擎都是 Spark Core,Spark Streaming 将微批次小任务不停的提交到 Spark 引擎,从而实现准 实时计算,SparkStreaming 只不过是一种特殊的批处理而已。 ”
“Flink 就是为实时计算而设计的,Flink 可以同时实现批处理和流处理,Flink 将批处理(即有 有界数据)视作一种特殊的流处理。 ”
“从根本上说,Spark和Flink采用了完全不同的数据处理方式。可以说,两者的世界观是截然不同的。 Spark以批处理为根本,并尝试在批处理之上支持流计算;在Spark的世界观中,万物皆批次,离线数据是一个大批次,而实时数据则是由一个一个无限的小批次组成的。所以对于流处理框架Spark Streaming而言,其实并不是真正意义上的“流”处理,而是“微批次”(micro-batching)处理。 ”
“而Flink则认为,流处理才是最基本的操作,批处理也可以统一为流处理。在Flink的世界观中,万物皆流,实时数据是标准的、没有界限的流,而离线数据则是有界限的流。 ”
“正因为这种架构上的不同,Spark和Flink在不同的应用领域上表现会有差别。一般来说, Spark 基于微批处理的方式做同步总有一个“攒批”的过程,所以会有额外开销,因此无法在流处理的低延迟上做到极致。在低延迟流处理场景,Flink 已经有明显的优势。而在海量数据的批处理领域,Spark能够处理的吞吐量更大,加上其完善的生态和成熟易用的API,目前同样优势比较明显。 ”
“Spark底层数据模型是弹性分布式数据集(RDD),Spark Streaming 进行微批处理的底层接口DStream,实际上处理的也是一组组小批数据RDD的集合。 而Flink的基本数据模型是数据流(DataFlow),以及事件(Event)序列。 数据模型不同,对应在运行处理的流程上,自然也会有不同的架构。Spark做批计算,需要将任务对应的DAG划分阶段(Stage),一个完成后经过shuffle再进行下一阶段的计算。而Flink是标准的流式执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理。 ”
“Spark和Flink可以说目前是各擅胜场,批处理领域Spark称王,而在流处理方面Flink当仁不让。具体到项目应用中,不仅要看是流处理还是批处理,还需要在延迟、吞吐量、可靠性,以及开发容易度等多个方面进行权衡。 如果在工作中需要从Spark和Flink这两个主流框架中选择一个来进行实时流处理,我们更加推荐使用Flink,主要的原因有:
当然,在海量数据的批处理方面,Spark还是具有明显的优势。而且Spark的生态更加成熟,也会使其在应用中更为方便。相信随着Flink的快速发展和完善,这方面的差距会越来越小。 另外,这两大框架也在不停地互相借鉴、取长补短。Spark 2.0之后新增的Structured Streaming流处理引擎借鉴DataFlow进行了大量优化,同样做到了低延迟、时间正确性以及精确一次性语义保证;Spark 2.3以后引入的连续处理(Continuous Processing)模式,更是可以在至少一次语义保证下做到1毫秒的延迟。而Flink自1.9版本合并Blink以来,在SQL的表达和批处理的能力上同样有了长足的进步。 ”
“在进行代码的编写之前,先将我们使用的开发环境和工具介绍一下: 系统环境为Windows 10。 需提前安装Java 8。 集成开发环境(IDE)使用IntelliJ IDEA,具体的安装流程参见IntelliJ官网。https://www.jetbrains.com/zh-cn/idea/download/#section=windows 另外需要特别说明的是: 课程中全部程序采用Java语言编写; 课程中使用的Flink版本为1.13.0。 ”
“在准备好所有的开发环境之后,我们就可以开始开发自己的第一个Flink程序了。首先我们要做的,就是在IDEA中搭建一个Flink项目的骨架。我们会使用Java项目中常见的Maven来进行依赖管理。 ”
“在项目的pom文件中,增加标签设置属性,然后增加标签引入需要的依赖。我们需要添加的依赖最重要的就是Flink的相关组件,包括flink-java、flink-streaming-java,以及flink-clients(客户端,也可以省略)。另外,为了方便查看运行日志,我们引入slf4j和log4j进行日志管理。 ”
<properties>
<flink.version>1.13.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<!-- 引入Flink相关依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 引入日志管理相关依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
</dependencies>
“这里做一点解释:在属性中,我们定义了<scala.binary.version>,这指代的是所依赖的Scala版本。这有一点奇怪:Flink底层是Java,而且我们也只用Java API,为什么还会依赖Scala呢?这是因为Flink的架构中使用了Akka来实现底层的分布式通信,而Akka是用Scala开发的。我们本书中用到的Scala版本为2.12。 ”
在目录src/main/resources下添加文件:log4j.properties,内容配置如下:
log4j.rootLogger=error, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
“接下来我们用一个最简单的示例来说明Flink代码怎样编写:统计一段文字中,每个单词出现的频次。这就是传说中的WordCount程序。 源码位于src/main/java目录下。首先新建一个包,命名为com.liuhao,在这个包下我们将编写Flink入门的WordCount程序。 ”
对于批处理而言,输入的应该是收集好的数据集。这里我们可以将要统计的文字,写入一个文本文档,然后读取这个文件处理数据就可以了。
hello flink
hello world
hello spark
hello hadoop
hello hive
我们进行单词频次统计的基本思路是:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。
具体代码实现如下:
package com.liuhao.wordcountP;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class wordcountP {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 2.读取文件数据
DataSource<String> DataSource = env.readTextFile("input/wordcount.txt");
// 3.处理转换,变成(hello,1)二元组
FlatMapOperator<String, Tuple2<String, Long>> stringTuple2FlatMapOperator = DataSource
.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] word = line.split(" ");
for (String s : word) {
out.collect(Tuple2.of(s, 1l));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4.按照key进行分组
UnsortedGrouping<Tuple2<String, Long>> groupBy = stringTuple2FlatMapOperator.groupBy(0);
// 5.按照第二个字段进行sum
AggregateOperator<Tuple2<String, Long>> sum = groupBy.sum(1);
// 6.输出
sum.print();
}
}
需要注意的是,这种代码的实现方式,是基于DataSet API的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:
$ bin/flink run -Dexecution.runtime-mode=BATCH WordCount.jar
这样,DataSet API就没什么用了,在实际应用中我们只要维护一套DataStream API就可以。这里只是为了方便大家理解,我们依然用DataSet API做了批处理的实现。
对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之后的DataStream API更加强大,可以直接处理批处理和流处理的所有场景。
我们就针对不同类型的输入数据源,用具体的代码来实现流处理。
我们同样试图读取文档words.txt中的数据,并统计每个单词出现的频次。整体思路与之前的批处理非常类似,代码模式也基本一致。
在com.liuhao包下新建Java类WordCountL,在静态main方法中编写测试代码。具体代码实现如下:
package com.liuhao.WordCountL;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class WordCountL {
public static void main(String[] args) throws Exception {
// 1. 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文件
DataStreamSource<String> lineStream =
env.readTextFile("input/words.txt");
// 3. 转换、分组、求和,得到统计结果
SingleOutputStreamOperator<Tuple2<String, Long>> sum =
lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy(data -> data.f0)
.sum(1);
// 4. 打印
sum.print();
// 5. 执行
env.execute();
}
}
主要观察与批处理程序WordCountP的不同:
“创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。 转换处理之后,得到的数据对象类型不同。 分组操作调用的是keyBy方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。 代码末尾需要调用env的execute方法,开始执行任务。 ”
在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要保持一个监听事件的状态,持续地处理捕获的数据。
为了模拟这种场景,我们就不再通过读取文件来获取数据了,而是监听数据发送端主机的指定端口,统计发送来的文本数据中出现过的单词的个数。具体实现上,我们只要对WordCountL代码中读取数据的步骤稍做修改,就可以实现对真正无界流的处理。
package com.liuhao.SocketWordCountL ;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class SocketWordCountL {
public static void main(String[] args) throws Exception {
// 1. 创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 2. 读取文本流
DataStreamSource<String> lineStream = env.socketTextStream("hadoop102", 7777);
// 3. 转换、分组、求和,得到统计结果
SingleOutputStreamOperator<Tuple2<String, Long>> sum =
lineStream.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG))
.keyBy(data -> data.f0)
.sum(1);
// 4. 打印
result.print();
// 5. 执行
env.execute();
}
}
代码说明和注意事项
socket文本流的读取需要配置两个参数:发送端主机名和端口号。这里代码中指定了主机“hadoop102”的7777端口作为发送数据的socket端口,读者可以根据测试环境自行配置。
在实际项目应用中,主机名和端口号这类信息往往可以通过配置文件,或者传入程序运行参数的方式来指定。
socket文本流数据的发送,可以通过Linux系统自带的netcat工具进行模拟。
[hadoop@hadoop ~]$ nc -lk 7777
“我们会发现程序启动之后没有任何输出、也不会退出。这是正常的——因为Flink的流处理是事件驱动的,当前程序会一直处于监听状态,只有接收到数据才会执行任务、输出统计结果。 ”
“我们会发现,输出的结果与之前读取文件的流处理非常相似。而且可以非常明显地看到,每输入一条数据,就有一次对应的输出。具体对应关系是:输入“hello flink”,就会输出两条统计结果(flink,1)和(hello,1);之后再输入“hello world”,同样会将hello和world的个数统计输出,hello的个数会对应增长为2。 ”
“Flink提交作业和执行任务,需要几个关键组件:客户端(Client)、作业管理器(JobManager)和任务管理器(TaskManager)。我们的代码由客户端获取并做转换,之后提交给JobManger。所以JobManager就是Flink集群里的“管事人”,对作业进行中央调度管理;而它获取到要执行的作业后,会进一步处理转换,然后分发任务给众多的TaskManager。这里的TaskManager,就是真正“干活的人”,数据的处理操作都是它们来做的。 ”
“Flink是一个非常灵活的处理框架,它支持多种不同的部署场景,还可以和不同的资源管理平台方便地集成。所以接下来我们会先做一个简单的介绍,让大家有一个初步的认识,之后再展开讲述不同情形下的Flink部署。 ”
Flink是一个分布式的流处理框架,所以实际应用一般都需要搭建集群环境。我们在进行Flink安装部署的学习时,需要准备3台Linux机器。具体要求如下:
“系统环境为CentOS 7.5版本。 安装Java 8。 安装Hadoop集群,Hadoop建议选择Hadoop 2.7.5以上版本。 配置集群节点服务器间时间同步以及免密登录,关闭防火墙。 本书中三台服务器的具体设置如下: 节点服务器1,IP地址为192.168.10.100,主机名为hadoop01。 节点服务器2,IP地址为192.168.10.101,主机名为hadoop02。 节点服务器3,IP地址为192.168.10.102,主机名为hadoop103。 ”
最简单的启动方式,其实是不搭建集群,直接本地启动。本地部署非常简单,直接解压安装包就可以使用,不用进行任何配置;一般用来做一些简单的测试。
具体安装步骤如下:
“进入Flink官网,下载1.13.0版本安装包flink-1.13.0-bin-scala_2.12.tgz,注意此处选用对应scala版本为scala 2.12的安装包。 ”
在hadoop100节点服务器上创建安装目录/opt/module,将flink安装包放在该目录下,并执行解压命令,解压至当前目录。
$ tar -zxvf flink-1.13.0-bin-scala_2.12.tgz -C /opt/module/
flink-1.13.0/
flink-1.13.0/log/
flink-1.13.0/LICENSE
flink-1.13.0/lib/
……
进入解压后的目录,执行启动命令,并查看进程。
$ cd flink-1.13.0/
$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host hadoop100.
Starting taskexecutor daemon on host hadoop100.
$ jps
10589 StandaloneSessionClusterEntrypoint
10697 TaskManagerRunner
10898 Jps
启动成功后,访问http://hadoop100:8081,可以对flink集群和任务进行监控管理,如图所示。
$ bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 10680) on host hadoop100.
Stopping standalonesession daemon (pid: 10369) on host hadoop100.
可以看到,Flink本地启动非常简单,直接执行start-cluster.sh就可以了。如果我们想要扩展成集群,其实启动命令是不变的,主要是需要指定节点之间的主从关系。
Flink是典型的Master-Slave架构的分布式数据处理框架,其中Master角色对应着JobManager,Slave角色则对应TaskManager。我们对三台节点服务器的角色分配如表所示。
集群角色分配
节点服务器 | hadoop100 | hadoop101 | hadoop102 |
---|---|---|---|
角色 | JobManager | TaskManager | TaskManager |
具体安装部署步骤如下:
具体操作与上节相同。
$ cd conf/
$ vim flink-conf.yaml
jobmanager.rpc.address: hadoop100
这就指定了hadoop100节点服务器为JobManager节点。
$ vim workers
hadoop101
hadoop102
这样就指定了hadoop101和hadoop102为TaskManager节点。
jobmanager.memory.process.size:对JobManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。
taskmanager.memory.process.size:对TaskManager进程可使用到的全部内存进行配置,包括JVM元空间和其他开销,默认为1600M,可以根据集群规模进行适当调整。
taskmanager.numberOfTaskSlots:对每个TaskManager能够分配的Slot数量进行配置,默认为1,可根据TaskManager所在的机器能够提供给Flink的CPU数量决定。所谓Slot就是TaskManager中具体运行一个任务所分配的计算资源。
parallelism.default:Flink任务执行的默认并行度,优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。
关于Slot和并行度的概念,我们会在下一章做详细讲解。
配置修改完毕后,将Flink安装目录发给另外两个节点服务器。
$ scp -r ./flink-1.13.0 atguigu@hadoop103:/opt/module
$ scp -r ./flink-1.13.0 atguigu@hadoop104:/opt/module
$ bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host hadoop100.
Starting taskexecutor daemon on host hadoop101.
Starting taskexecutor daemon on host hadoop102.
[hadoop@hadoop100 flink-1.13.0]$ jps
13859 Jps
13782 StandaloneSessionClusterEntrypoint
[hadoop@hadoop101 flink-1.13.0]$ jps
12215 Jps
12124 TaskManagerRunner
[hadoop@hadoop102 flink-1.13.0]$ jps
11602 TaskManagerRunner
11694 Jps
启动成功后,同样可以访问http://hadoop100:8081对flink集群和任务进行监控管理,如图所示。
这里可以明显看到,当前集群的TaskManager数量为2;由于默认每个TaskManager的Slot数量为1,所以总Slot数和可用Slot数都为2。
在上一章中,我们已经编写了词频统计的批处理和流处理的示例程序,并在开发环境的模拟集群上做了运行测试。现在既然已经有了真正的集群环境,那接下来我们就要把作业提交上去执行了。
本节我们将以流处理的程序为例,演示如何将任务提交到集群中进行执行。具体步骤如下。
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
[INFO] -----------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------------
[INFO] Total time: 21.665 s
[INFO] Finished at: 2021-06-01T17:21:26+08:00
[INFO] Final Memory: 141M/770M
[INFO] -----------------------------------------------------------------------
打包完成后,在target目录下即可找到所需JAR包,JAR包会有两个,FlinkTutorial-1.0-SNAPSHOT.jar和FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar,因为集群中已经具备任务运行所需的所有依赖,所以建议使用FlinkTutorial-1.0-SNAPSHOT.jar。
上传完成后,如图所示:
除了通过WEB UI界面提交任务之外,也可以直接通过命令行来提交任务。这里为方便起见,我们可以先把jar包直接上传到目录flink-1.13.0下
$ bin/start-cluster.sh
$ nc -lk 7777
$ bin/flink run -m hadoop100:8081 -c com.liuhao.WordCountL ./FlinkTutorial-1.0-SNAPSHOT.jar --host hadoop102 --port 7777
这里的参数 –m指定了提交到的JobManager,-c指定了入口类。
用netcat输入数据,可以在TaskManager的标准输出(Stdout)看到对应的统计结果。
$ cat flink-atguigu-taskexecutor-0-hadoop102.out
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/flink-1.13.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
(hello,1)
(hello,2)
(flink,1)
(hello,3)
(scala,1)
在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink为各种场景提供了不同的部署模式,主要有以下三种:
“
”
它们的区别主要在于:集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行——客户端(Client)还是JobManager。
会话模式其实最符合常规思维。我们需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定,所以所有提交的作业会竞争集群中的资源。
会话模式比较适合于单个规模小、执行时间短的大量作业。
会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群,这就是所谓的单作业(Per-Job)模式。
单作业模式也很好理解,就是严格的一对一,集群只为这个作业而生。同样由客户端运行应用程序,然后启动集群,作业被提交给JobManager,进而分发给TaskManager执行。作业作业完成后,集群就会关闭,所有资源也会释放。
这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式。
需要注意的是,Flink本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,比如YARN、Kubernetes。
前面提到的两种模式下,应用代码都是在客户端上执行,然后由客户端提交给JobManager的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager;加上很多情况下我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗。
所以解决办法就是,我们不要客户端了,直接把应用提交到JobManger上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个JobManager,也就是创建一个集群。这个JobManager只为执行这一个应用而存在,执行结束之后JobManager也就关闭了,这就是所谓的应用模式。
应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由JobManager执行应用程序的,并且即使应用包含了多个作业,也只创建一个集群。
这里我们所讲到的部署模式,相对是比较抽象的概念。实际应用时,一般需要和资源管理平台结合起来,选择特定的模式来分配资源、部署应用。接下来,我们就针对不同的资源提供者(Resource Provider)的场景,具体介绍Flink的部署方式。
独立模式(Standalone)是部署Flink最基本也是最简单的方式:所需要的所有Flink组件,都只是操作系统上运行的一个JVM进程。
独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:如果资源不足,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下。
我们在第7.1节用的就是独立(Standalone)集群的会话模式部署。
在7.4.2节中我们提到,Flink本身无法直接以单作业方式启动集群,一般需要借助一些资源管理平台。所以Flink的独立(Standalone)集群并不支持单作业模式部署。
应用模式下不会提前创建集群,所以不能调用start-cluster.sh脚本。我们可以使用同样在bin目录下的standalone-job.sh来创建一个JobManager。
具体步骤如下:
$ cp ./FlinkTutorial-1.0-SNAPSHOT.jar lib/
$ ./bin/standalone-job.sh start --job-classname com.liuhao.WordCountL
这里我们直接指定作业入口类,脚本会到lib目录扫描所有的jar包。
$ ./bin/taskmanager.sh start
$ ./bin/standalone-job.sh stop
$ ./bin/taskmanager.sh stop
YARN上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager, Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。
在将Flink任务部署至YARN集群之前,需要确认集群是否安装有Hadoop,保证Hadoop版本至少在2.2以上,并且集群中安装有HDFS服务。
具体配置步骤如下:
$ sudo vim /etc/profile.d/my_env.sh
HADOOP_HOME=/opt/module/hadoop-2.7.5
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`
[hadoop@hadoop100 ~]$ start-dfs.sh
[hadoop@hadoop101 ~]$ start-yarn.sh
YARN的会话模式与独立集群略有不同,需要首先申请一个YARN会话(YARN session)来启动Flink集群。具体步骤如下:
$ bin/yarn-session.sh -nm test
可用参数解读:
-d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。
-jm(--jobManagerMemory):配置JobManager所需内存,默认单位MB。
-nm(--name):配置在YARN UI界面上显示的任务名。
-qu(--queue):指定YARN队列名。
-tm(--taskManager):配置每个TaskManager所使用内存。
注意:Flink1.11.0版本不再使用-n参数和-s参数分别指定TaskManager数量和slot数量,YARN会按照需求动态分配TaskManager和slot。所以从这个意义上讲,YARN的会话模式也不会把集群资源固定,同样是动态分配的。
YARN Session启动之后会给出一个web UI地址以及一个YARN application ID,如下所示,用户可以通过web UI或者命令行两种方式提交作业。
2022-11-25 15:54:27,069 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - YARN application has been deployed successfully.
2022-11-25 15:54:27,070 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface hadoop104:39735 of application 'application_1622535605178_0003'.
JobManager Web Interface: http://hadoop104:39735
“这种方式比较简单,与上文所述Standalone部署模式基本相同。 ”
“
”
$ bin/flink run -c com.liuhao.WordCountL FlinkTutorial-1.0-SNAPSHOT.jar
客户端可以自行确定JobManager的地址,也可以通过-m或者-jobmanager参数指定JobManager的地址,JobManager的地址在YARN Session的启动页面中可以找到。
任务提交成功后,可在YARN的Web UI界面查看运行情况。
从图中可以看到我们创建的Yarn-Session实际上是一个Yarn的Application,并且有唯一的Application ID。
也可以通过Flink的Web UI页面查看提交任务的运行情况。
在YARN环境中,由于有了外部平台做资源调度,所以我们也可以直接向YARN提交一个单独的作业,从而启动一个Flink集群
$ bin/flink run -d -t yarn-per-job -c com.liuhao.WordCountL FlinkTutorial-1.0-SNAPSHOT.jar
$ bin/flink run -m yarn-cluster -c com.liuhao.WordCountL FlinkTutorial-1.0-SNAPSHOT.jar
注意这里是通过参数-m yarn-cluster指定向YARN集群提交任务。
点击可以打开Flink Web UI页面进行监控:
$ ./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
$ ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
这里的application_XXXX_YY是当前应用的ID,是作业的ID。注意如果取消作业,整个Flink集群也会停掉。
应用模式同样非常简单,与单作业模式类似,直接执行flink run-application命令即可。
$ bin/flink run-application -t yarn-application -c com.liuhao.WordCountL FlinkTutorial-1.0-SNAPSHOT.jar
$ ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
$ ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
$ ./bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir" hdfs://myhdfs/jars/my-application.jar
这种方式下jar可以预先上传到HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了。
容器化部署是如今业界流行的一项技术,基于Docker镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是Kubernetes(k8s),而Flink也在最近的版本中支持了k8s部署模式。基本原理与YARN是类似的,具体配置可以参见官网说明,这里我们就不做过多讲解了。
1. 提交应用
2. 启动并提交应用
3. 请求slots
4. 任务启动
5. 注册slots
6. 发出提供slot的指令
7. 提供slots
8. 提交要在slots中执行的任务
9. 交换数据
1. Flink任务提交后,Client向HDFS上传Flink的Jar包和配置
2. 随后向Yarn ResourceManager 提交任务,ResourceManager分配Container资源并通知对应的NodeManager启动
3. ApplicationMaster,ApplicationMaster 启动后加载Flink的Jar包和配置构建环境
4. 然后启动JobManager,之后ApplicationMaster向ResourceManager申请资源启动TaskManager
5. ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager
6. NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager
7. TaskManager启动后向JobManager发送心跳包,并等待JobManager向其分配任务。
在会话模式下,我们需要先启动一个YARN session,这个会话会创建一个Flink集群。
这里只启动了JobManager,而TaskManager可以根据需要动态地启动。在JobManager内部,由于还没有提交作业,所以只有ResourceManager和Dispatcher在运行。
可见,整个流程除了请求资源时要“上报”YARN的资源管理器,其他与7.2.1节所述抽象流程几乎完全一样。
在单作业模式下,Flink集群不会预先启动,而是在提交作业时,才启动新的JobManager。
可见,区别只在于JobManager的启动方式,以及省去了分发器。当第2步作业提交给JobMaster,之后的流程就与会话模式完全一样了。
应用模式与单作业模式的提交流程非常相似,只是初始提交给YARN资源管理器的不再是具体的作业,而是整个应用。一个应用中可能包含了多个作业,这些作业都将在Flink集群中启动各自对应的JobMaster。
1.Flink中每一个Taskmanageri都是一个JMM进程,它可能会在独立的线程上执行一个或多个subtask
2.为了控制一个Taskmanageri能接收多少个task, Taskmanager通过task slot来进行控制(一个Taskmanager至少有一个slot)
假如一个TaskManager有三个slot,那么它会将管理的内存平均分成三份,每个slot独自占据一份。这样一来,我们在slot上执行一个子任务时,相当于划定了一块内存“专款专用”,就不需要跟来自其他作业的任务去竞争内存资源了。所以现在我们只要2个TaskManager,就可以并行处理分配好的5个任务了。
3.默认情况下,Fink允许子任务共享slot,即使它们是不同任务的子任务。这样的结果是,一个slot可以保存作业的整个管道。
4.Task Slot是静态的概念,是指Taskmanager具有的并发执行能力
两者关系:
Slot和并行度都跟程序的并行执行有关,但两者是完全不同的概念。简单来说,task slot是静态的概念,是指TaskManager具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置;而并行度(parallelism)是动态概念,也就是TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置。
下面我们再举一个具体的例子。假设一共有3个TaskManager,每一个TaskManager中的slot数量设置为3个,那么一共有9个task slot,表示集群最多能并行执行9个任务。
而我们定义wordcount程序的处理操作是四个转换算子:
“source→ flatmap→ reduce→ sink ”
当所有算子并行度相同时,容易看出source和flatmap可以合并算子链,于是最终有三个任务节点。
如果我们没有任何并行度设置,而配置文件中默认parallelism.default=1,那么程序运行的默认并行度为1,总共有3个任务。由于不同算子的任务可以共享任务槽,所以最终占用的slot只有1个。9个slot只用了1个,有8个空闲,如图中的Example 1所示。
1. 所有的Flink程序都是由三部分组成的:Source、 Transformation和Sink
2. Source负责读取数据源,Transformation利用各种算子进行处理加工,Sink负责输出
3. 在运行时,Flink上运行的程序会被映射成 “逻辑数据流”( dataflows),它包含了这三部分
4. 每一个dataflow以一个或多个Sources开始以一个或多个sinks结束。dataflow类以于任意的有向无环图(DAG)
5. 在大部分情况下,程序中的转换运算( transformations)跟 dataflow中的算子
Flink中的执行图可以分成四层:
“Streamgraph -> Jobgraph -> Executiongraph -> 物理执行图 ”
1.Streamgraph:是根据用户通过Stream API编写的代码生成的最初的图。用来表示程序的拓扑结构。
2.Jobgraph: Streamgraph经过优化后生成了Jobgraph,提交给Jobmanager的数据结构。主要的优化为,将多个符合条件的节点chain在一起作为一个节点Execution Graph: Jobmanager根据Jobgraph生成
3.ExecutiongraphExecution Graph是 Job Graphi的并行化版本,是调度层最核心的数据结构。
4.物理执行图:Jobmanager根据Executiongraph对Job进行调度后,在各个Taskmanager上部署Task后形成的“ 图”,并不是一个具体的数据结构。
1. 特定算子的子任务( subtask)的个数被称之为其并行度( parallelism)般情况下,一个 stream的并行度,可以认为就是其所有算子中最大的并行度。
2. 一个程序中,不同的算子可能具有不同的并行度
3. 算子之间传输数据的形式可以是 one-to-one( (forwarding)的模式也可以是 redistributing的模式,具体是哪一种形式,取決于算子的种类
4. One-to-one: stream维护着分区以及元素的顺序(比如 Sources和map之间)。这意味着map算子的子任务看到的元素的个数以及顺序跟 Source算子的子任务生产的元素的个数、顺序相同。map、 fliter、flatmap等算子都是one-to-one的对应关系
5. Redistributing: stream的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如keyby基于 hash Code重分区、而broadcast和rebalance会随机重新分区,这些算子都会引起distributer过程,而redistribute过程就类似于Spark中的shuffle过程。
并行度的设置
“stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2); ”
这种方式设置的并行度,只针对当前算子有效。
另外,我们也可以直接调用执行环境的setParallelism()方法,全局设定并行度:
“env.setParallelism(2); ”
这样代码中所有算子,默认的并行度就都为2了。我们一般不会在程序中设置全局并行度,因为如果在程序中对全局并行度进行硬编码,会导致无法动态扩容。
这里要注意的是,由于keyBy不是算子,所以无法对keyBy设置并行度。
bin/flink run –p 2 –c com.atguigu.wc.StreamWordCount
./FlinkTutorial-1.0-SNAPSHOT.jar
如果我们直接在Web UI上提交作业,也可以在对应输入框中直接添加并行度。
“parallelism.default: 2这个设置对于整个集群上提交的所有作业有效,初始值为1。无论在代码中设置、还是提交时的-p参数,都不是必须的;所以在没有指定并行度的时候,就会采用配置文件中的集群默认并行度。在开发环境中,没有配置文件,默认并行度就是当前机器的CPU核心数。 ”
1. Flink采用了一种称为任务链的优化技术,可以在特定条件下减少本地通信的开销。为了满足任务链的要求 ,必须将两个或多个算子设为相同的并行度,并通过本地转发( ocal forward)的方式进行连接
2. 相同并行度的one-to-one操作, Flink这样相连的算子链接在一起形成一个task,原来的算子成为里面的subtask并行度相同、并且是one-to-one操作,两个条件缺一不可
算子间的数据传输
一个数据流在算子之间传输数据的形式可以是一对一(one-to-one)的直通 (forwarding)模式,也可以是打乱的重分区(redistributing)模式,具体是哪一种形式,取决于算子的种类。
这种模式下,数据流维护着分区以及元素的顺序。比如图中的source和map算子,source算子读取数据之后,可以直接发送给map算子做处理,它们之间不需要重新分区,也不需要调整数据的顺序。这就意味着map 算子的子任务,看到的元素个数和顺序跟source 算子的子任务产生的完全一样,保证着“一对一”的关系。map、filter、flatMap等算子都是这种one-to-one的对应关系。
这种关系类似于Spark中的窄依赖。
在这种模式下,数据流的分区会发生改变。比图中的map和后面的keyBy/window算子之间(这里的keyBy是数据传输算子,后面的window、apply方法共同构成了window算子),以及keyBy/window算子和Sink算子之间,都是这样的关系。
每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。例如,keyBy()是分组操作,本质上基于键(key)的哈希值(hashCode)进行了重分区;而当并行度改变时,比如从并行度为2的window算子,要传递到并行度为1的Sink算子,这时的数据传输方式是再平衡(rebalance),会把数据均匀地向下游子任务分发出去。这些传输方式都会引起重分区(redistribute)的过程,这一过程类似于Spark中的shuffle。
总体说来,这种算子间的关系类似于Spark中的宽依赖。
在Flink中,并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task),这样原来的算子就成为了真正任务里的一部分。每个task会被一个线程执行。这样的技术被称为“算子链”(Operator Chain)。
比如在图中的例子中,Source和map之间满足了算子链的要求,所以可以直接合并在一起,形成了一个任务;因为并行度为2,所以合并后的任务也有两个并行子任务。这样,这个数据流图所表示的作业最终会有5个任务,由5个线程并行执行。
将算子链接成task是非常有效的优化:可以减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提升吞吐量。
Flink默认会按照算子链的原则进行链接合并,如果我们想要禁止合并或者自行定义,也可以在代码中对算子做一些特定的设置:
.map(word -> Tuple2.of(word, 1L)).disableChaining();
.map(word -> Tuple2.of(word, 1L)).startNewChain()
我们已经彻底了解了由代码生成任务的过程,现在来做个梳理总结。
“由Flink程序直接映射成的数据流图(dataflow graph),也被称为逻辑流图(logical StreamGraph),因为它们表示的是计算逻辑的高级视图。到具体执行环节时,我们还要考虑并行子任务的分配、数据在任务间的传输,以及合并算子链的优化。为了说明最终应该怎样执行一个流处理程序,Flink需要将逻辑流图进行解析,转换为物理数据流图。 ”
在这个转换过程中,有几个不同的阶段,会生成不同层级的图,其中最重要的就是作业图(JobGraph)和执行图(ExecutionGraph)。Flink中任务调度执行的图,按照生成顺序可以分成四层:
“逻辑流图(StreamGraph)→ 作业图(JobGraph)→ 执行图(ExecutionGraph)→ 物理图(Physical Graph)。 ”
我们可以回忆一下之前处理socket文本流的WordCountL程序:
env.socketTextStream().flatMap(…).keyBy(0).sum(1).print();
如果提交时设置并行度为2:
bin/flink run –p 2 –c com.liuhao.WordCountL
./FlinkTutorial-1.0-SNAPSHOT.jar
那么根据之前的分析,除了socketTextStream()是非并行的Source算子,它的并行度始终为1,其他算子的并行度都为2。
这是根据用户通过 DataStream API编写的代码生成的最初的DAG图,用来表示程序的拓扑结构。这一步一般在客户端完成。
逻辑流图中的节点,完全对应着代码中的四步算子操作:
源算子Source(socketTextStream())→扁平映射算子Flat Map(flatMap()) →分组聚合算子Keyed Aggregation(keyBy/sum()) →输出算子Sink(print())
StreamGraph经过优化后生成的就是作业图(JobGraph),这是提交给 JobManager 的数据结构,确定了当前作业中所有任务的划分。主要的优化为: 将多个符合条件的节点链接在一起合并成一个任务节点,形成算子链,这样可以减少数据交换的消耗。JobGraph一般也是在客户端生成的,在作业提交时传递给JobMaster。
分组聚合算子(Keyed Aggregation)和输出算子Sink(print)并行度都为2,而且是一对一的关系,满足算子链的要求,所以会合并在一起,成为一个任务节点。
JobMaster收到JobGraph后,会根据它来生成执行图(ExecutionGraph)。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。与JobGraph最大的区别就是按照并行度对并行子任务进行了拆分,并明确了任务间数据传输的方式。
JobMaster生成执行图后, 会将它分发给TaskManager;各个TaskManager会根据执行图部署任务,最终的物理执行过程也会形成一张“图”,一般就叫作物理图(Physical Graph)。这只是具体执行层面的图,并不是一个具体的数据结构。
物理图主要就是在执行图的基础上,进一步确定数据存放的位置和收发的具体方式。有了物理图,TaskManager就可以对传递来的数据进行处理计算了。
所以我们可以看到,程序里定义了四个算子操作:源(Source)->转换(flatmap)->分组聚合(keyBy/sum)->输出(print);合并算子链进行优化之后,就只有三个任务节点了;再考虑并行度后,一共有5个并行子任务,最终需要5个线程来执行。
DataStream API是Flink的核心层API。一个Flink程序,其实就是对DataStream的各种转换。具体来说,代码基本上都由以下几部分构成。
获取执行环境(execution environment)
读取数据源(source)
定义基于数据的转换操作(transformations)
定义计算结果的输出位置(sink)
触发程序执行(execute)
其中,获取环境和触发执行,都可以认为是针对执行环境的操作。所以我们就从执行环境、数据源(source)、转换操作(transformation)、输出(sink)四大部分,对常用的DataStream API做基本介绍。
Flink程序可以在各种上下文环境中运行:我们可以在本地JVM中执行程序,也可以提交到远程集群上运行。
不同的环境,代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时,首先必须获取当前Flink的运行环境,从而建立起与Flink框架之间的联系。只有获取了环境上下文信息,才能将具体的任务调度到不同的TaskManager执行。
编写Flink程序的第一步,就是创建执行环境。我们要获取的执行环境,是StreamExecutionEnvironment类的对象,这是所有Flink程序的基础。在代码中创建执行环境的方式,就是调用这个类的静态方法,具体有以下三种。
最简单的方式,就是直接调用getExecutionEnvironment方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
这种“智能”的方式不需要我们额外做判断,用起来简单高效,是最常用的一种创建执行环境的方式。
这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数。
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
.createRemoteEnvironment(
"host", // JobManager主机名
1234, // JobManager进程端口号
"path/to/jarFile.jar" // 提交给JobManager的JAR包
);
在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制。
上节中我们获取到的执行环境,是一个StreamExecutionEnvironment,顾名思义它应该是做流处理的。那对于批处理,又应该怎么获取执行环境呢?
在之前的Flink版本中,批处理的执行环境与流处理类似,是调用类ExecutionEnvironment的静态方法,返回它的对象:
// 批处理环境
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
// 流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
基于ExecutionEnvironment读入数据创建的数据集合,就是DataSet;对应的调用的一整套转换方法,就是DataSet API。这些我们在第二章的批处理word count程序中已经有了基本了解。
而从1.12.0版本起,Flink实现了API上的流批统一。DataStream API新增了一个重要特性:可以支持不同的“执行模式”(execution mode),通过简单的设置就可以让一段Flink程序在流处理和批处理之间切换。这样一来,DataSet API也就没有存在的必要了。
“这是DataStream API最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是STREAMING执行模式。 ”
“专门用于批处理的执行模式, 这种模式下,Flink处理作业的方式类似于MapReduce框架。对于不会持续计算的有界数据,我们用这种模式处理会更方便。 ”
在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式。
由于Flink程序默认是STREAMING模式,我们这里重点介绍一下BATCH模式的配置。主要有两种方式:
bin/flink run -Dexecution.runtime-mode=BATCH ...
在提交作业时,增加execution.runtime-mode参数,指定值为BATCH。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
在代码中,直接基于执行环境调用setRuntimeMode方法,传入BATCH模式。
实际应用中一般不会在代码中配置,而是使用命令行。这同设置并行度是类似的:在提交作业时指定参数可以更加灵活,同一段应用程序写好之后,既可以用于批处理也可以用于流处理。而在代码中硬编码(hard code)的方式可扩展性比较差,一般都不推荐。
有了执行环境,我们就可以构建程序的处理流程了:基于环境读取数据源,进而进行各种转换操作,最后输出结果到外部系统。
需要注意的是,写完输出(sink)操作并不代表程序已经结束。因为当main()方法被调用时,其实只是定义了作业的每个执行操作,然后添加到数据流图中;这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的,只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”(lazy execution)。
所以我们需要显式地调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。
env.execute();
创建环境之后,就可以构建数据处理的业务逻辑了,如图5-2所示,本节将主要讲解Flink的源算子(Source)。想要处理数据,先得有数据,所以首要任务就是把数据读进来。
Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
Flink代码中通用的添加source的方式,是调用执行环境的addSource()方法:
DataStream<String> stream = env.addSource(...);
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的source function,通常情况下足以应对我们的实际需求。
为了更好地理解,我们先构建一个实际应用场景。比如网站的访问操作,可以抽象成一个三元组(用户名,用户访问的url,用户访问url的时间戳),所以在这里,我们可以创建一个类Event,将用户行为包装成它的一个对象。Event包含了以下一些字段,如表所示:
Event类字段设计
字段名 | 数据类型 | 说明 |
---|---|---|
user | String | 用户名 |
url | String | 用户访问的url |
timestamp | Long | 用户访问url的时间戳 |
具体代码如下:
import java.sql.Timestamp;
public class Event {
public String user;
public String url;
public Long timestamp;
public Event() {
}
public Event(String user, String url, Long timestamp) {
this.user = user;
this.url = url;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "Event{" +
"user='" + user + '\'' +
", url='" + url + '\'' +
", timestamp=" + new Timestamp(timestamp) +
'}';
}
}
这里需要注意,我们定义的Event,有这样几个特点:
类是公有(public)的
有一个无参的构造方法
所有属性都是公有(public)的
所有属性的类型都是可以序列化的
Flink会把这样的类作为一种特殊的POJO数据类型来对待,方便数据的解析和序列化。另外我们在类中还重写了toString方法,主要是为了测试输出显示更清晰。
我们这里自定义的Event POJO类会在后面的代码中频繁使用,所以在后面的代码中碰到Event,把这里的POJO类导入就好了。
最简单的读取数据的方式,就是在代码中直接创建一个Java集合,然后调用执行环境的fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
ArrayList<Event> clicks = new ArrayList<>();
clicks.add(new Event("Mary","./home",1000L));
clicks.add(new Event("Bob","./cart",2000L));
DataStream<Event> stream = env.fromCollection(clicks);
stream.print();
env.execute();
}
我们也可以不构建集合,直接将元素列举出来,调用fromElements方法进行读取数据:
DataStreamSource<Event> stream2 = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。
DataStream<String> stream = env.readTextFile("clicks.csv");
说明:
参数可以是目录,也可以是文件;还可以从hdfs目录下读取,使用路径hdfs://...;
路径可以是相对路径,也可以是绝对路径;
相对路径是从系统属性user.dir获取路径: idea下是project的根目录, standalone模式下是集群节点根目录;
不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的。
一个简单的方式,就是我们之前用到的读取socket文本流。这种方式由于吞吐量小、稳定性较差,一般也是用于测试。
DataStream<String> stream = env.socketTextStream("localhost", 7777);
Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者FlinkKafkaConsumer,它就是用来读取Kafka数据的SourceFunction。
所以想要以Kafka作为数据源获取数据,我们只需要引入Kafka连接器的依赖。Flink官方提供的是一个通用的Kafka连接器,它会自动跟踪最新版本的Kafka客户端。目前最新版本只支持0.10.0版本以上的Kafka。这里我们需要导入的依赖如下。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
然后调用env.addSource(),传入FlinkKafkaConsumer的对象实例就可以了。
public class SourceKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "hadoop102:9092");
properties.setProperty("group.id", "consumer-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>(
"clicks",
new SimpleStringSchema(),
properties
));
stream.print("Kafka");
env.execute();
}
}
创建FlinkKafkaConsumer时需要传入三个参数:
第一个参数topic,定义了从哪些主题中读取数据。可以是一个topic,也可以是topic列表,还可以是匹配所有想要读取的topic的正则表达式。当从多个topic中读取数据时,Kafka连接器将会处理所有topic的分区,将这些分区的数据放到一条流中去。
第二个参数是一个DeserializationSchema或者KeyedDeserializationSchema。Kafka消息被存储为原始的字节数据,所以需要反序列化成Java或者Scala对象。上面代码中使用的SimpleStringSchema,是一个内置的DeserializationSchema,它只是将字节数组简单地反序列化成字符串。DeserializationSchema和KeyedDeserializationSchema是公共接口,所以我们也可以自定义反序列化逻辑。
第三个参数是一个Properties对象,设置了Kafka客户端的一些属性。
接下来我们创建一个自定义的数据源,实现SourceFunction接口。主要重写两个关键方法:run()和cancel()。
run()方法:通过运行时上下文对象SourcContext循环生成数据,并发送到流中;
cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。
代码如下:
public class ClickSource implements SourceFunction<Event> {
private Boolean running = true;
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
Random random = new Random();
String[] users = {"Mary", "Bob"};
String[] urls = {"./home", "./cart", “./fav”, “./prod?id=1”, “./prod?id=2”};
while (running) {
sourceContext.collect(new Event(
users[random.nextInt(users.length)],
urls[random.nextInt(urls.length)],
Calendar.getInstance().getTimeInMillis()
));
}
}
@Override
public void cancel() {
running = false;
}
}
这个数据源,我们后面会频繁使用,所以在后面的代码中涉及到ClickSource()数据源,使用上面的代码就可以了。
下面的代码我们来读取一下自定义的数据源。有了自定义的source function,接下来只要调用addSource()就可以了:
env.addSource(new ClickSource())
下面是完整的代码:
public class SourceCustom {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//有了自定义的source function,调用addSource方法
DataStreamSource<Event> stream = env.addSource(new ClickSource());
stream.print("SourceCustom");
env.execute();
}
}
为了方便地处理数据,Flink有自己一整套类型系统。Flink使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
简单来说,对于常见的Java和Scala数据类型,Flink都是支持的。Flink在内部,Flink对支持不同的类型进行了划分,这些类型可以在Types工具类中找到:
“所有Java基本类型及其包装类,再加上Void、String、Date、BigDecimal和BigInteger。 ”
“包括基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY) ”
Java元组类型(TUPLE):这是Flink内置的元组类型,是Java API的一部分。最多25个字段,也就是从Tuple0~Tuple25,不支持空字段
Scala 样例类及Scala元组:不支持空字段
行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段
POJO:Flink自定义的类似于Java bean模式的类
“Option、Either、List、Map等 ”
Flink支持所有的Java类和Scala类。不过如果没有按照上面POJO类型的要求来定义,就会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由Flink本身序列化的,而是由Kryo序列化的。
在这些类型中,元组类型和POJO类型最为灵活,因为它们支持创建复杂类型。而相比之下,POJO还支持在键(key)的定义中直接使用字段名,这会让我们的代码可读性大大增加。所以,在项目实践中,往往会将流处理程序中的元素类型定为Flink的POJO类型。
Flink对POJO类型的要求如下:
类是公共的(public)和独立的(standalone,也就是说没有非静态的内部类);
类有一个公共的无参构造方法;
类中的所有字段是public且非final的;或者有一个公共的getter和setter方法,这些方法需要符合Java bean的命名规范。
所以我们看到,之前的UserBehavior,就是我们创建的符合Flink POJO定义的数据类型。
为了解决这类问题,Java API提供了专门的“类型提示”(type hints)。
回忆一下之前的word count流处理程序,我们在将String类型的每个词转换成(word, count)二元组后,就明确地用returns指定了返回的类型。因为对于map里传入的Lambda表达式,系统只能推断出返回的是Tuple2类型,而无法得到Tuple2<String, Long>。只有显式地告诉系统当前的返回类型,才能正确地解析出完整数据。
.map(word -> Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
Flink还专门提供了TypeHint类,它可以捕获泛型的类型信息,并且一直记录下来,为运行时提供足够的信息。我们同样可以通过.returns()方法,明确地指定转换之后的DataStream里元素的类型。
returns(new TypeHint<Tuple2<Integer, SomeType>>(){})
数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。一个Flink程序的核心,其实就是所有的转换操作,它们决定了处理的业务逻辑。
我们可以针对一条流进行转换处理,也可以进行分流、合流等多流转换操作,从而组合成复杂的数据流拓扑。在本节中,我们将重点介绍基本的单数据流的转换,多流转换的内容我们将在后续章节展开。
map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素,如图所示。
我们只需要基于DataStrema调用map()方法就可以进行转换处理。方法需要传入的参数是接口MapFunction的实现;返回值类型还是DataStream,不过泛型(流中的元素类型)可能改变。
下面的代码用不同的方式,实现了提取Event中的user字段的功能。
public class TransMap {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
// 传入匿名类,实现MapFunction
stream.map(new MapFunction<Event, String>() {
@Override
public String map(Event e) throws Exception {
return e.user;
}
});
// 传入MapFunction的实现类
stream.map(new UserExtractor()).print();
env.execute();
}
public static class UserExtractor implements MapFunction<Event, String> {
@Override
public String map(Event e) throws Exception {
return e.user;
}
}
}
上面代码中,MapFunction实现类的泛型类型,与输入数据类型和输出数据的类型有关。在实现MapFunction接口的时候,需要指定两个泛型,分别是输入事件和输出事件的类型,还需要重写一个map()方法,定义从一个输入事件转换为另一个输出事件的具体逻辑。
进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换需要传入的参数需要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相当于一个返回布尔类型的条件表达式。
下面的代码会将数据流中用户Mary的浏览行为过滤出来 。
public class TransFilter {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
// 传入匿名类实现FilterFunction
stream.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event e) throws Exception {
return e.user.equals("Mary");
}
});
// 传入FilterFunction实现类
stream.filter(new UserFilter()).print();
env.execute();
}
public static class UserFilter implements FilterFunction<Event> {
@Override
public boolean filter(Event e) throws Exception {
return e.user.equals("Mary");
}
}
}
flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生0到多个元素。flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理,如图5-7所示。我们此前WordCount程序的第一步分词操作,就用到了flatMap。
同map一样,flatMap也可以使用Lambda表达式或者FlatMapFunction接口实现类的方式来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。
flatMap操作会应用在每一个输入事件上面, FlatMapFunction接口中定义了flatMap方法,用户可以重写这个方法,在这个方法中对输入数据进行处理,并决定是返回0个、1个或多个结果数据。因此flatMap并没有直接定义返回值类型,而是通过一个“收集器”(Collector)来指定输出。希望输出结果时,只要调用收集器的.collect()方法就可以了;这个方法可以多次调用,也可以不调用。所以flatMap方法也可以实现map方法和filter方法的功能,当返回结果是0个的时候,就相当于对数据进行了过滤,当返回结果是1个的时候,相当于对数据进行了简单的转换操作。
flatMap的使用非常灵活,可以对结果进行任意输出,下面就是一个例子:
public class TransFlatmap {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
stream.flatMap(new MyFlatMap()).print();
env.execute();
}
public static class MyFlatMap implements FlatMapFunction<Event, String> {
@Override
public void flatMap(Event value, Collector<String> out) throws Exception {
if (value.user.equals("Mary")) {
out.collect(value.user);
} else if (value.user.equals("Bob")) {
out.collect(value.user);
out.collect(value.url);
}
}
}
}
在实际应用中,我们往往需要对大量的数据进行统计或整合,从而提炼出更有用的信息。比如之前word count程序中,要对每个词出现的频次进行叠加统计。这种操作,计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),也对应着MapReduce中的reduce操作。
对于Flink而言,DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。
keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)。
keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)。
基于不同的key,流中的数据将被分配到不同的分区中去,如图所示;这样一来,所有具有相同的key的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个slot中进行处理了。
在内部,是通过计算key的哈希值(hash code),对分区数进行取模运算来实现的。所以这里key如果是POJO的话,必须要重写hashCode()方法。
keyBy()方法需要传入一个参数,这个参数指定了一个或一组key。有很多不同的方法来指定key:比如对于Tuple数据类型,可以指定字段的位置或者多个位置的组合;对于POJO类型,可以指定字段的名称(String);另外,还可以传入Lambda表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取key的逻辑。
我们可以以id作为key做一个分区操作,代码实现如下:
public class TransKeyBy {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
// 使用Lambda表达式
KeyedStream<Event, String> keyedStream = stream.keyBy(e -> e.user);
// 使用匿名类实现KeySelector
KeyedStream<Event, String> keyedStream1 = stream.keyBy(new KeySelector<Event, String>() {
@Override
public String getKey(Event e) throws Exception {
return e.user;
}
});
env.execute();
}
}
需要注意的是,keyBy得到的结果将不再是DataStream,而是会将DataStream转换为KeyedStream。KeyedStream可以认为是“分区流”或者“键控流”,它是对DataStream按照key的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定key的类型。
KeyedStream也继承自DataStream,所以基于它的操作也都归属于DataStream API。但它跟之前的转换操作得到的SingleOutputStreamOperator不同,只是一个流的分区操作,并不是一个转换算子。KeyedStream是一个非常重要的数据结构,只有基于它才可以做后续的聚合操作(比如sum,reduce)。
有了按键分区的数据流KeyedStream,我们就可以基于它进行聚合操作了。Flink为我们内置实现了一些最基本、最简单的聚合API,主要有以下几种:
sum():在输入流上,对指定的字段做叠加求和的操作。
min():在输入流上,对指定的字段求最小值。
max():在输入流上,对指定的字段求最大值。
minBy():与min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。
maxBy():与max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。
简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称。
对于元组类型的数据,可以使用这两种方式来指定字段。需要注意的是,元组中字段的名称,是以f0、f1、f2、…来命名的。
如果数据流的类型是POJO类,那么就只能通过字段名称来指定,不能通过位置来指定了。
public class TransAggregation {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> stream = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
stream.keyBy(e -> e.user).max("timestamp"); // 指定字段名称
env.execute();
}
}
简单聚合算子返回的,同样是一个SingleOutputStreamOperator,也就是从KeyedStream又转换成了常规的DataStream。所以可以这样理解:keyBy和聚合是成对出现的,先分区、后聚合,得到的依然是一个DataStream。而且经过简单聚合之后的数据流,元素的数据类型保持不变。
一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作“状态”(state)。所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子,应该只用在含有有限个key的数据流上。
如果说简单聚合是对一些特定统计需求的实现,那么reduce算子就是一个一般化的聚合统计操作了。它可以对已有的数据进行归约处理,把每一个新输入的数据和当前已经归约出来的值,再做一个聚合计算。
与简单聚合类似,reduce操作也会将KeyedStream转换为DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
调用KeyedStream的reduce方法时,需要传入一个参数,实现ReduceFunction接口。接口在源码中的定义如下:
public interface ReduceFunction<T> extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}
ReduceFunction接口里需要实现reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件;所以,对于一组数据,我们可以先取两个进行合并,然后再将合并的结果看作一个数据、再跟后面的数据合并,最终会将它“简化”成唯一的一个数据,这也就是reduce“归约”的含义。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。
我们可以单独定义一个函数类实现ReduceFunction接口,也可以直接传入一个匿名类。当然,同样也可以通过传入Lambda表达式实现类似的功能。
与简单聚合类似,reduce操作也会将KeyedStream转换为DataStrema。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。
下面我们来看一个稍复杂的例子。
我们将数据流按照用户id进行分区,然后用一个reduce算子实现sum的功能,统计每个用户访问的频次;进而将所有统计结果分到一组,用另一个reduce算子实现maxBy的功能,记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。
public class TransReduce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 这里的ClickSource()使用了之前自定义数据源小节中的ClickSource()
env.addSource(new ClickSource())
// 将Event数据类型转换成元组类型
.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event e) throws Exception {
return Tuple2.of(e.user, 1L);
}
})
.keyBy(r -> r.f0) // 使用用户名来进行分流
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
// 每到一条数据,用户pv的统计值加1
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
})
.keyBy(r -> true) // 为每一条数据分配同一个key,将聚合结果发送到一条流中去
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
// 将累加器更新为当前最大的pv统计值,然后向下游发送累加器的值
return value1.f1 > value2.f1 ? value1 : value2;
}
})
.print();
env.execute();
}
}
reduce同简单聚合算子一样,也要针对每一个key保存状态。因为状态不会清空,所以我们需要将reduce算子作用在一个有限key的流上。
在前面的介绍我们可以发现,Flink的DataStream API编程风格其实是一致的:基本上都是基于DataStream调用一个方法,表示要做一个转换操作;方法需要传入一个参数,这个参数都是需要实现一个接口。很容易发现,这些接口有一个共同特点:全部都以算子操作名称 + Function命名,例如源算子需要实现SourceFunction接口,map算子需要实现MapFunction接口,reduce算子需要实现ReduceFunction接口。
这些接口又都继承自Function接口。所以我们不仅可以通过自定义函数类或者匿名类来实现接口,也可以直接传入Lambda表达式。这就是所谓的用户自定义函数(user-defined function,UDF)。
接下来我们就对这几种编程方式做一个梳理总结。
对于大部分操作而言,都需要传入一个用户自定义函数(UDF),实现相关操作的接口,来完成处理逻辑的定义。Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等
所以最简单直接的方式,就是自定义一个函数类,实现对应的接口。之前我们对于API的练习,主要就是基于这种方式。
下面例子实现了FilterFunction接口,用来从用户的点击数据中筛选包含“home”的内容:
public class TransFunctionUDF {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> clicks = env.readTextFile("clicks.csv");
DataStream<String> stream = clicks.filter(new FlinkFilter());
stream.print();
env.execute();
}
public static class FlinkFilter implements FilterFunction<String> {
@Override
public boolean filter(String value) throws Exception {
return value.contains("home");
}
}
当然还可以通过匿名类来实现FilterFunction接口:
DataStream<String> stream = clicks.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.contains("home");
}
});
为了类可以更加通用,我们还可以将用于过滤的关键字"home"抽象出来作为类的属性,调用构造方法时传进去。
DataStream<String> clicks = env.readTextFile("clicks.csv");
DataStream<String> stream = clicks.filter(new KeyWordFilter("home"));
public static class KeyWordFilter implements FilterFunction<String> {
private String keyWord;
KeyWordFilter(String keyWord) { this.keyWord = keyWord; }
@Override
public boolean filter(String value) throws Exception {
return value.contains(this.keyWord);
}
}
匿名函数(Lambda表达式)是Java 8 引入的新特性,方便我们更加快速清晰地写代码。Lambda 表达式允许以简洁的方式实现函数,以及将函数作为参数来进行传递,而不必声明额外的(匿名)类。
Flink 的所有算子都可以使用 Lambda 表达式的方式来进行编码,但是,当 Lambda 表达式使用 Java 的泛型时,我们需要显式的声明类型信息。
下例演示了如何使用Lambda表达式来实现一个简单的 map() 函数,我们使用 Lambda 表达式来计算输入的平方。在这里,我们不需要声明 map() 函数的输入 i 和输出参数的数据类型,因为 Java 编译器会对它们做出类型推断。
public class TransFunctionLambda {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> clicks = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);
//map函数使用Lambda表达式,不需要进行类型声明
DataStream<String> stream = clicks.map(event -> event.url);
stream.print();
env.execute();
}
}
由于 OUT 是 Integer 类型而不是泛型,所以 Flink 可以从函数签名 OUT map(IN value) 的实现中自动提取出结果的类型信息。
但是对于像 flatMap() 这样的函数,它的函数签名 void flatMap(IN value, Collectorout) 被 Java 编译器编译成了 void flatMap(IN value, Collector out),也就是说将Collector的泛型信息擦除掉了。这样 Flink 就无法自动推断输出的类型信息了。
同样地,使用map()时只要涉及到泛型擦除也会有同样的问题。比如返回Flink自定义的元组类型,在WordCount程序中就遇到了类似的问题。
一般来说,可以显式地调用.returns()方法,通过明确指定返回类型来解决这个问题:
DataStream<String> stream = clicks
.map(event -> Tuple2.of(event.user, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));
当然,也可以用类或者匿名类的方式来解决。
“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有生命周期的概念。典型的生命周期方法有:
open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。所以像文件IO的创建,数据库连接的创建,配置文件的读取等等这样一次性的工作,都适合在open()方法中完成。。
close()方法,是生命周期中的最后一个调用的方法,类似于解构方法。一般用来做一些清理工作。
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用。
来看一个例子:
public class RichFunctionExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env
.fromElements(1,2,3,4)
.map(new RichMapFunction<Integer, Integer>() {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");
}
@Override
public Integer map(Integer integer) throws Exception {
return integer + 1;
}
@Override
public void close() throws Exception {
super.close();
System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");
}
})
.print();
env.execute();
}
}
一个常见的应用场景就是,如果我们希望连接到一个外部数据库进行读写操作,那么将连接操作放在map()中显然不是个好选择——因为每来一条数据就会重新连接一次数据库;所以我们可以在open()中建立连接,在map()中读写数据,而在close()中关闭连接。
public class MyFlatMap extends RichFlatMapFunction<IN, OUT>> {
@Override
public void open(Configuration configuration) {
// 做一些初始化工作
// 例如建立一个和MySQL的连接
}
@Override
public void flatMap(IN in, Collector<OUT out) {
// 对数据库进行读写
}
@Override
public void close() {
// 清理工作,关闭和MySQL数据库的连接。
}
}
常见的物理分区策略有随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast)。
最简单的重分区方式就是直接“洗牌”。通过调用DataStream的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。
随机分区服从均匀分布(uniform distribution),所以可以把流中的数据随机打乱,均匀地传递到下游任务分区,如图5-9所示。因为是完全随机的,所以对于同样的输入数据, 每次执行得到的结果也不会相同。
经过随机分区之后,得到的依然是一个DataStream。
我们可以做个简单测试:将数据读入之后直接打印到控制台,将输出的并行度设置为2,中间经历一次shuffle。执行多次,观察结果是否相同。
public class ShuffleExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> stream = env.fromElements(1, 2, 3, 4).setParallelism(1);
stream.shuffle().print().setParallelism(2);
env.execute();
}
}
轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发,如图5-10所示。通过调用DataStream的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是Round-Robin负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。
重缩放分区和轮询分区非常相似。当调用rescale()方法时,其实底层也是使用Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中,如图5-11所示。也就是说,“发牌人”如果有多个,那么rebalance的方式是每个发牌人都面向所有人发牌;而rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌。
从底层实现上看,rebalance和rescale的根本区别在于任务之间的连接机制不同。rebalance将会针对所有上游任务(发送数据方)和所有下游任务(接收数据方)之间建立通信通道,这是一个笛卡尔积的关系;而rescale仅仅针对每一个任务和下游对应的部分任务之间建立通信通道,节省了很多资源。
这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用DataStream的broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去。
全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1,所以使用这个操作需要非常谨慎,可能对程序造成很大的。
当Flink提供的所有分区策略都不能满足用户的需求时,我们可以通过使用partitionCustom()方法来自定义分区策略。
“Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。 ”
Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。
与Source算子非常类似,除去一些Flink预实现的Sink,一般情况下Sink算子的创建是通过调用DataStream的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSource的参数需要实现一个SourceFunction接口;类似地,addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。
当然,SinkFuntion多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如图5-13所示,列出了Flink官方目前支持的第三方系统连接器:
我们可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、文件系统(FileSystem)、JDBC等数据存储系统,则只提供了输出写入的sink连接器。
除Flink官方之外,Apache Bahir作为给Spark和Flink提供扩展支持的项目,也实现了一些其他第三方系统与Flink的连接器,如图所示。
除此以外,就需要用户自定义实现sink连接器了。
Flink专门提供了一个流式文件系统的连接器:StreamingFileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。
StreamingFileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink的静态方法:
行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)。
批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)。
下面我们就以行编码为例,将一些测试数据直接写入文件:
public class SinkFile {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> input = env.fromElements("hello world", "hello flink");
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path("./output"),
new SimpleStringEncoder<String>("UTF-8")).withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();
input.addSink(sink);
env.execute();
}
}
这里我们创建了一个简单的文件Sink,通过.withRollingPolicy()方法指定了一个“滚动策略”。因为文件会有内容持续不断地写入,所以我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面的代码设置了在以下3种情况下,我们就会滚动分区文件:
至少包含15分钟的数据
最近5分钟没有收到新的数据
文件大小已达到1 GB
由于我们已经测试过从Kafka数据源读取数据,连接器相关依赖已经引入,这里就不重复介绍了。
我们可以直接将用户行为数据保存为文件clicks.csv,读取后不做转换直接写入Kafka,主题(topic)命名为“clicks”。
public class SinkKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties properties = new Properties();
properties.put("bootstrap.servers", "hadoop102:9092");
DataStreamSource<String> stream = env.readTextFile("input/clicks.csv");
stream
.addSink(new FlinkKafkaProducer<String>(
"clicks",
new SimpleStringSchema(),
properties
));
env.execute();
}
}
bin/kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic clicks
Flink没有直接提供官方的Redis连接器,不过Bahir项目还是担任了合格的辅助角色,为我们提供了Flink-Redis的连接工具。但版本升级略显滞后,目前连接器版本为1.0,支持的Scala版本最新到2.11。由于我们的测试不涉及到Scala的相关版本变化,所以并不影响使用。在实际项目应用中,应该以匹配的组件版本运行。
具体测试步骤如下:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
这里我们为方便测试,只启动了单节点Redis。
连接器为我们提供了一个RedisSink,它继承了抽象类RichSinkFunction,这就是已经实现好的向Redis写入数据的SinkFunction。我们可以直接将Event数据输出到Redis:
public class SinkRedis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop200").build();
env.addSource(new ClickSource())
.addSink(new RedisSink<Event>(conf, new MyRedisMapper()));
env.execute();
}
}
这里RedisSink的构造方法需要传入两个参数:
JFlinkJedisConfigBase:Jedis的连接配置
RedisMapper:Redis映射类接口,说明怎样将数据转换成可以写入Redis的类型
接下来主要就是定义一个Redis的映射类,实现RedisMapper接口。
public static class MyRedisMapper implements RedisMapper<Event> {
@Override
public String getKeyFromData(Event e) {
return e.user;
}
@Override
public String getValueFromData(Event e) {
return e.url;
}
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "clicks");
}
}
在这里我们可以看到,保存到Redis时调用的命令是HSET,所以是保存为哈希表(hash),表名为“clicks”;保存的数据以user为key,以url为value,每来一条数据就会做一次转换。
Flink为ElasticSearch专门提供了官方的Sink 连接器。
写入数据的ElasticSearch的测试步骤如下。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
public class SinkToEs {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("hadoop102", 9200, "http"));
ElasticsearchSink.Builder<Event> esBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<Event>() {
@Override
public void process(Event event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
HashMap<String, String> data = new HashMap<>();
data.put(event.user, event.url);
IndexRequest indexRequest = Requests
.indexRequest()
.index("clicks")
.type("type")
.source(data);
requestIndexer.add(indexRequest);
}
}
);
DataStream<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L));
stream.addSink(esBuilder.build());
env.execute();
}
}
与RedisSink类似,连接器也为我们实现了写入到Elasticsearch的SinkFunction——ElasticsearchSink。区别在于,这个类的构造方法是私有(private)的,我们需要使用ElasticsearchSink的Builder内部静态类,调用它的build()方法才能创建出真正的SinkFunction。
而Builder的构造方法中又有两个参数:
httpHosts:连接到的Elasticsearch集群主机列表
elasticsearchSinkFunction:这并不是我们所说的SinkFunction,而是用来说明具体处理逻辑、准备数据向Elasticsearch发送请求的函数
具体的操作需要重写中elasticsearchSinkFunction中的process方法,我们可以将要发送的数据放在一个HashMap中,包装成IndexRequest向外部发送HTTP请求。
写入数据的MySQL的测试步骤如下。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
mysql> create table clicks(
-> user varchar(20) not null,
-> url varchar(100) not null);
public class SinkToMySQL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
)
.addSink(
JdbcSink.sink(
"INSERT INTO clicks (user, url) VALUES (?, ?)",
(statement, r) -> {
statement.setString(1, r.user);
statement.setString(2, r.url);
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/test")
// 使用MySQL 5.7的话,没有cj
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("username")
.withPassword("password")
.build()
)
);
env.execute();
}
}
如果我们想将数据存储到我们自己的存储设备中,而Flink并没有提供可以直接使用的连接器,就只能自定义Sink进行输出了。与Source类似,Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
stream.addSink(new MySinkFunction<String>());
在实现SinkFunction的时候,需要重写的一个关键方法invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。
这种方式比较通用,对于任何外部存储系统都有效;不过自定义Sink想要实现状态一致性并不容易,所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现,而且在不断地扩充,因此自定义的场景并不常见。
在流数据处理应用中,一个很重要、也很常见的操作就是窗口计算。所谓的“窗口”,一般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的窗口计算。所以窗口和时间往往是分不开的。接下来我们就深入了解一下Flink中的时间语义和窗口的应用。
如图所示,在事件发生之后,生成的数据被收集起来,首先进入分布式消息队列,然后被Flink系统中的Source算子读取消费,进而向下游的转换算子(窗口算子)传递,最终由窗口算子进行计算处理。
很明显,这里有两个非常重要的时间点:一个是数据产生的时间,我们把它叫作“事件时间”(Event Time);另一个是数据真正被处理的时刻,叫作“处理时间”(Processing Time)。我们所定义的窗口操作,到底是以那种时间作为衡量标准,就是所谓的“时间语义”(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有所滞后。
“处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。处理时间是最简单的时间语义。 ”
事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。
数据一旦产生,这个时间自然就确定了,所以它可以作为一个属性嵌入到数据中。这其实就是这条数据记录的“时间戳”(Timestamp)。
在事件时间语义下,我们对于时间的衡量,就不看任何机器的系统时间了,而是依赖于数据本身。由于分布式系统中网络传输延迟的不确定性,实际应用中我们要面对的数据流往往是乱序的。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要用另外的标志来表示事件时间进展,在Flink中把它叫作事件时间的“水位线”(Watermarks)。
如图所示,我们会发现,看电影其实就是处理影片中数据的过程,所以影片的上映时间就相当于“处理时间”;而影片的数据就是所描述的故事,它所发生的背景时间就相当于“事件时间”。两种时间语义都有各自的用途,适用于不同的场景。
在计算机系统中,考虑数据处理的“时代变化”是没什么意义的,我们更关心的,显然是数据本身产生的时间。
所以在实际应用中,事件时间语义会更为常见。一般情况下,业务日志数据中都会记录数据生成的时间戳(timestamp),它就可以作为事件时间的判断基础。
在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从1.12版本开始,Flink已经将事件时间作为了默认的时间语义。
在窗口的处理过程中,我们可以基于数据的时间戳,自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝;它的时间进展,就是靠着新到数据的时间戳来推动的。
这样的好处在于,计算的过程可以完全不依赖处理时间(系统时间),不论什么时候进行统计处理,得到的结果都是正确的。而一般实时流处理的场景中,事件时间可以基本与处理时间保持同步,只是略微有一点延迟,同时保证了窗口计算的正确性。
在Flink中,用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。
具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
在理想状态下,数据应该按照它们生成的先后顺序、排好队进入流中;也就是说,它们处理的过程会保持原先的顺序不变,遵守先来后到的原则。这样的话我们从每个数据中提取时间戳,就可以保证总是从小到大增长的,从而插入的水位线也会不断增长、事件时钟不断向前推进。
实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间戳、插入水位线就做了大量的无用功。而且即使时间戳不同,同时涌来的数据时间差会非常小(比如几毫秒),往往对处理计算也没什么影响。所以为了提高效率,一般会每隔一段时间生成一个水位线,这个水位线的时间戳,就是当前最新数据的时间戳,如图6-6所示。所以这时的水位线,其实就是有序流中的一个周期性出现的时间标记。
在分布式系统中,数据在节点间传输,会因为网络传输延迟的不确定性,导致顺序发生改变,这就是所谓的“乱序数据”。
这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,主要就是基于数据的产生时间而言的。如图6-7所示,一个7秒时产生的数据,生成时间自然要比9秒的数据早;但是经过数据缓存和传输之后,处理任务可能先收到了9秒的数据,之后7秒的数据才姗姗来迟。这时如果我们希望插入水位线,来指示当前的事件时间进展,又该怎么做呢?
解决思路也很简单:我们还是靠数据来驱动,每来一个数据就提取它的时间戳、插入一个水位线。不过现在的情况是数据乱序,所以插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线,如图6-8所示。也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。
如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线,如下图所示。
但是这样做会带来一个非常大的问题:我们无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据,我们也可以等上一段时间,比如2秒;也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳,如下图所示。这样的话,9秒的数据到来之后,事件时钟不会直接推进到9秒,而是进展到了7秒;必须等到11秒的数据到来之后,事件时钟才会进展到9秒,这时迟到数据也都已收集齐,0~9秒的窗口就可以正确计算结果了。
现在我们可以知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。
总结一下水位线的特性:
水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
水位线是基于数据的时间戳生成的
水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
水位线可以通过设置延迟,来保证正确处理乱序数据
一个水位线Watermark(t),表示在当前流中事件时间已经达到了时间戳t, 这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t’ ≤ t的数据
水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。
水位线是用来保证窗口处理结果的正确性的,如果不能正确处理所有乱序数据,可以尝试调大延迟的时间。
完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确,就必须等足够长的时间,这会带来更高的延迟。
如果我们希望计算结果能更加准确,那可以将水位线的延迟设置得更高一些,等待的时间越长,自然也就越不容易漏掉数据。不过这样做的代价是处理的实时性降低了,我们可能为极少数的迟到数据增加了很多不必要的延迟。
如果我们希望处理得更快、实时性更强,那么可以将水位线延迟设得低一些。这种情况下,可能很多迟到数据会在水位线之后才到达,就会导致窗口遗漏数据,计算结果不准确。当然,如果我们对准确性完全不考虑、一味地追求处理速度,可以直接使用处理时间语义,这在理论上可以得到最低的延迟。
所以Flink中的水位线,其实是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权力交给了程序员,我们可以在代码中定义水位线的生成策略。
在Flink的DataStream API中,有一个单独用于生成水位线的方法:.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
WatermarkStrategy<T> watermarkStrategy)
具体使用时,直接用DataStream调用该方法即可,与普通的transform方法完全一样。
DataStream<Event> stream = env.addSource(new ClickSource());
DataStream<Event> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(*<watermark strategy>*);
.assignTimestampsAndWatermarks()方法需要传入一个WatermarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy中包含了一个“时间戳分配器” TimestampAssigner和一个“水位线生成器” WatermarkGenerator。
public interface WatermarkStrategy<T>
extends TimestampAssignerSupplier<T>,
WatermarkGeneratorSupplier<T>{
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。
在WatermarkGenerator接口中,主要又有两个方法:onEvent()和onPeriodicEmit()。
onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作
onPeriodicEmit:周期性调用的方法,可以由WatermarkOutput发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms。
env.getConfig().setAutoWatermarkInterval(60 * 1000L);
WatermarkStrategy这个接口是一个生成水位线策略的抽象,而Flink提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程,而且也为我们自定义水位线策略提供了模板。
这两个生成器可以通过调用WatermarkStrategy的静态辅助方法来创建。它们都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景。
对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。
stream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
上面代码中我们调用.withTimestampAssigner()方法,将数据中的timestamp字段提取出来,作为时间戳分配给数据元素;然后用内置的有序流水位线生成器构造出了生成策略。这样,提取出的数据时间戳,就是我们处理计算的事件时间。
这里需要注意的是,时间戳和水位线的单位,必须都是毫秒。
由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个maxOutOfOrderness参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
代码示例如下:
public class WatermarkExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.addSource(new ClickSource())
// 插入水位线的逻辑
.assignTimestampsAndWatermarks(
// 针对乱序流插入水位线,延迟时间设置为5s
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
// 抽取时间戳的逻辑
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
)
.print();
env.execute();
}
}
上面代码中,我们同样提取了timestamp字段作为时间戳,并且以5秒的延迟时间创建了处理乱序流的水位线生成器。
一般来说,Flink内置的水位线生成器就可以满足应用需求了。不过有时我们的业务逻辑可能非常复杂,这时对水位线生成的逻辑也有更高的要求,就必须自定义实现水位线策略WatermarkStrategy了。
在WatermarkStrategy中,时间戳分配器TimestampAssigner都是大同小异的,指定字段提取时间戳就可以了;而不同策略的关键就在于WatermarkGenerator的实现。整体说来,Flink有两种不同的生成水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated)。
WatermarkGenerator接口中有两个方法,onEvent()和onPeriodicEmit(),前者是在每个事件到来时调用,而后者由框架周期性调用。周期性调用的方法中发出水位线,自然就是周期性生成水位线;而在事件触发的方法中发出水位线,自然就是断点式生成了。两种方式的不同就集中体现在这两个方法的实现上。
周期性生成器一般是通过onEvent()观察判断输入的事件,而在onPeriodicEmit()里发出水位线。
下面是一段自定义周期性生成水位线的代码:
import com.atguigu.bean.Event;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.addSource(new ClickSource())
.assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
.print();
env.execute();
}
public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
@Override
public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段
}
};
}
@Override
public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new CustomBoundedOutOfOrdernessGenerator();
}
}
public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {
private Long delayTime = 5000L; // 延迟时间
private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳
@Override
public void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {
// 每来一条数据就调用一次
maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 发射水位线,默认200ms调用一次
output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
}
}
}
我们在onPeriodicEmit()里调用output.emitWatermark(),就可以发出水位线了;这个方法由系统框架周期性地调用,默认200ms一次。
断点式生成器会不停地检测onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过onPeriodicEmit()发出水位线。
自定义的断点式水位线生成器代码如下:
public class PunctuatedGenerator implements WatermarkGenerator<Event> {
@Override
public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {
// 只有在遇到特定的itemId时,才发出水位线
if (r.user.equals("Mary")) {
output.emitWatermark(new Watermark(r.timestamp - 1));
}
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 不需要做任何事情,因为我们在onEvent方法中发射了水位线
}
}
我们在onEvent()中判断当前事件的user字段,只有遇到“Mary”这个特殊的值时,才调用output.emitWatermark()发出水位线。这个过程是完全依靠事件来触发的,所以水位线的生成一定在某个数据到来之后。
我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。示例程序如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Random;
public class EmitWatermarkInSourceFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.addSource(new ClickSource()).print();
env.execute();
}
// 泛型是数据源中的类型
public static class ClickSource implements SourceFunction<Event> {
private boolean running = true;
@Override
public void run(SourceContext<Event> sourceContext) throws Exception {
Random random = new Random();
String[] userArr = {"Mary", "Bob", "Alice"};
String[] urlArr = {"./home", "./cart", "./prod?id=1"};
while (running) {
long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳
String username = userArr[random.nextInt(userArr.length)];
String url = urlArr[random.nextInt(urlArr.length)];
Event event = new Event(username, url, currTs);
// 使用collectWithTimestamp方法将数据发送出去,并指明数据中的时间戳的字段
sourceContext.collectWithTimestamp(event, event.timestamp);
// 发送水位线
sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
running = false;
}
}
}
在自定义水位线中生成水位线相比assignTimestampsAndWatermarks方法更加灵活,可以任意的产生周期性的、非周期性的水位线,以及水位线的大小也完全由我们自定义。所以非常适合用来编写Flink的测试程序,测试Flink的各种各样的特性。
在流处理中,上游任务处理完水位线、时钟改变之后,要把当前的水位线再次发出,广播给所有的下游子任务。而当一个任务接收刀多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。
如图所示,当前任务的上游,有四个并行子任务,所以会接收到来自四个分区的水位线;而下游有三个并行子任务,所以会向三个分区发出水位线。具体过程如下:
(1)上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线”(Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最小的那个。
(2)当有一个新的水位线(第一分区的4)从上游传来时,当前任务会首先更新对应的分区时钟;然后再次判断所有分区时钟中的最小值,如果比之前大,说明事件时间有了进展,当前任务的时钟也就可以更新了。这里要注意,更新后的任务时钟,并不一定是新来的那个分区水位线,比如这里改变的是第一分区的时钟,但最小的分区时钟是第三分区的3,于是当前任务时钟就推进到了3。当时钟有进展时,当前任务就会将自己的时钟以水位线的形式,广播给下游所有子任务。
(3)再次收到新的水位线(第二分区的7)后,执行同样的处理流程。首先将第二个分区时钟更新为7,然后比较所有分区时钟;发现最小值没有变化,那么当前任务的时钟也不变,也不会向下游任务发出水位线。
(4)同样道理,当又一次收到新的水位线(第三分区的6)之后,第三个分区时钟更新为6,同时所有分区时钟最小值变成了第一分区的4,所以当前任务的时钟推进到4,并发出时间戳为4的水位线,广播到下游各个分区任务。
水位线在上下游任务之间的传递,非常巧妙地避免了分布式系统中没有统一时钟的问题,每个任务都以“处理完之前所有数据”为标准来确定自己的时钟,就可以保证窗口处理的结果总是正确的。10.3 窗口(Window)