流处理系统由于需要支持无限数据集的处理,一般采用一种数据驱动的处理方式。它会提前设置一些算子,然后等到数据到达后对数据进行处理。
为了表达复杂的逻辑,flink在内的分布式流处理引擎,一般采用 DAG(有向无环图) 图来表示整个计算逻辑,其中 DAG 图中的每一个点就代表一个基本的逻辑单元,也就是前面说的算子,由于计算逻辑被组织成有向图,数据会按照边的方向,从一些特殊的 Source 节点流入系统,然后通过网络传输、本地传输等不同的数据传输方式在算子之间进行发送和处理,最后会通过另外一些特殊的 Sink 节点将计算结果发送到某个外部系统或数据库中。
Environment -> Source -> Transform -> Sink 懒加载模式,需要手动执行。
1.1 getExecutionEnvironment 创建一个执行环境,表示当前执行程序的上下文。自动查询当前运行的方式,返回Local或Remote,调用底层方法。
1.2 createLocalEnvironment 返回本地执行环境,需要在调用时指定默认的并行度。
1.3 createRemoteEnvironment 返回集群执行环境,将Jar包提交到远程服务器。需要在调用时制定JM的IP和端口号,并指定要在集群中运行的Jar包(有变动需要修改源码)。
2.1 fromCollection 有界流:从自定义的集合中读取、从文件中读取
无界流:从Kafka中读取数据
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-${kafka.version}_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
先定义Kafka的Properties,env.addSource(new FlinkKafkaConsumer${version}[String](,,));
。Flink会将Kafka的Offset作为状态保存,并保证状态一致性。
自定义Source:自定义一个继承SourceFunction类
常见的转换算子:map、flatMap、Filter、KeyBy、(基本)滚动聚合算子、Reduce、(聚合)Split、Select、Connect、CoMap、Union(多流转换)。并行度可以在每个算子后设置。
(2)flatMap 将元素摊平,每个元素可以变为0个、1个、或者多个元素。
(3)Filter 过滤元素。
(4)KeyBy DataStream转换为KeyedStream,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素(内部hash),分区不分流。
(6)Reduce 归并操作,它可以将KeyedStream 转变为 DataStream,实质是按照key做叠加计算。
(8)Select(@deprecated:side output) 结合Split,将SplitStream数据提取出来,变为DataStream。
(9)Connect 两个DataStream(可以是不同类型流)合并为一个ConnectedStreams,但内部仍属于各自独立的DataStream。
(10)CoMap,CoFlatMap 结合Connect,将ConnectedStreams(可以是不同类型流)合并为一个DataStream。
(11)Union 一个或多个DataStream(是相同类型流)合并为一个DataStream。
(1)Java和Scala基础数据类型; (2)Java和Scala元组(Tuples); (3)Scala样例类(case classes) (4)Java简单对象(POJO); (5)其他(ArrayList、HashMap、Enums)。
Flink对外输出操作必须利用Sink完成(addSink(new SinkFunction(){})),print()实际调用的也是DataStreamSink方法,此外,官方提供了一部分框架的Sink。(Kafka提供了Source和Sink) (1)Kafka
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
env.addSink(new FlinkKafkaConsumer011[String]("${id:port}", "brokerList", new SimpleStringSchema()))
// 到这里就实现了Kafka进,Kafka出
(2)Redis
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis-2.11</artifactId>
<version>${bahir.version}</version>
</dependency>
(3)ES
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
(4)MySQL(JDBC连接)
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
需要自定义RichSinkFunction(),仅在初始化时调用连接
将无界数据流切分为有界数据流集进行处理,窗口(window)就是切分无界流的一种方式,将流数据分发到有限大小的桶(bucket)中进行分析。
(1)类型 Time Window:
Count Window
窗口分配器window()
方法,必须在keyBy之后才能用,再做聚合操作。flink还提供了.timeWindow
和.countWindow
方法。
(1)WindowAssigner window()方法接收的参数是一个WindowAssigner。 Flink提供了: 滚动窗口(.timeWindow(Time.secounds(15))); 滑动窗口(.timeWindow(Time.secounds(15), Time.secounds(15))); 会话窗口(.window(EventTimeSessionWindows.withGap(Time.minutes(10)))); 全局窗口(一个无界流); 滚动计数窗口(.countWindow(5)); 滑动计数窗口(.countWindow(10, 2))。
(2)WindowFunction 定义了要对窗口中收集的数据做的计算操作。
(3)其他可选API .trigger():触发器,定义window什么时候关闭,触发计算并输出结果。 .evitor():移除器,定义移除某些数据的逻辑。 .allowedLateness():允许处理迟到(窗口关闭后)的数据。 .sideOutputLateData():将迟到的数据放入侧输出流。 .getSideOutPut():获取侧输出流。