前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink(二)

Flink(二)

作者头像
matt
发布2022-10-25 16:02:27
5100
发布2022-10-25 16:02:27
举报
文章被收录于专栏:CSDN迁移

Flink

流处理系统由于需要支持无限数据集的处理,一般采用一种数据驱动的处理方式。它会提前设置一些算子,然后等到数据到达后对数据进行处理。

为了表达复杂的逻辑,flink在内的分布式流处理引擎,一般采用 DAG(有向无环图) 图来表示整个计算逻辑,其中 DAG 图中的每一个点就代表一个基本的逻辑单元,也就是前面说的算子,由于计算逻辑被组织成有向图,数据会按照边的方向,从一些特殊的 Source 节点流入系统,然后通过网络传输、本地传输等不同的数据传输方式在算子之间进行发送和处理,最后会通过另外一些特殊的 Sink 节点将计算结果发送到某个外部系统或数据库中。

一、Flink流处理API

Environment -> Source -> Transform -> Sink 懒加载模式,需要手动执行。

1. Environment

1.1 getExecutionEnvironment 创建一个执行环境,表示当前执行程序的上下文。自动查询当前运行的方式,返回Local或Remote,调用底层方法。

1.2 createLocalEnvironment 返回本地执行环境,需要在调用时指定默认的并行度。

1.3 createRemoteEnvironment 返回集群执行环境,将Jar包提交到远程服务器。需要在调用时制定JM的IP和端口号,并指定要在集群中运行的Jar包(有变动需要修改源码)。

2. Source

2.1 fromCollection 有界流:从自定义的集合中读取、从文件中读取

无界流:从Kafka中读取数据

代码语言:javascript
复制
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-${kafka.version}_${scala.version}</artifactId>
  <version>${flink.version}</version>
</dependency>

先定义Kafka的Properties,env.addSource(new FlinkKafkaConsumer${version}[String](,,));。Flink会将Kafka的Offset作为状态保存,并保证状态一致性。

自定义Source:自定义一个继承SourceFunction类

3. Transform

常见的转换算子:map、flatMap、Filter、KeyBy、(基本)滚动聚合算子、Reduce、(聚合)Split、Select、Connect、CoMap、Union(多流转换)。并行度可以在每个算子后设置。

  • 基本转换算子 (1)map 映射,对每个元素进行一定的变换后,映射为另一个元素。输出泛型可以变化,常用作分词操作。

(2)flatMap 将元素摊平,每个元素可以变为0个、1个、或者多个元素。

(3)Filter 过滤元素。

(4)KeyBy DataStream转换为KeyedStream,逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素(内部hash),分区不分流。

  • 聚合算子 (5)滚动聚合算子(Rolling Aggregation) 针对KeyedStream的每一个支流做聚合。 sum()\min()\max()\minBy()\maxBy()

(6)Reduce 归并操作,它可以将KeyedStream 转变为 DataStream,实质是按照key做叠加计算。

  • 多流转换算子 (7)Split DataStream转换为SplitStream,根据某些特征将一个DataStream拆分成两个或多个DataStream(结合Select提取数据)。 分流作用:处理kafka复杂数据中有效的数据,盖戳分流消费。

(8)Select(@deprecated:side output) 结合Split,将SplitStream数据提取出来,变为DataStream。

(9)Connect 两个DataStream(可以是不同类型流)合并为一个ConnectedStreams,但内部仍属于各自独立的DataStream。

(10)CoMap,CoFlatMap 结合Connect,将ConnectedStreams(可以是不同类型流)合并为一个DataStream。

(11)Union 一个或多个DataStream(是相同类型流)合并为一个DataStream。

3* 支持的数据类型

(1)Java和Scala基础数据类型; (2)Java和Scala元组(Tuples); (3)Scala样例类(case classes) (4)Java简单对象(POJO); (5)其他(ArrayList、HashMap、Enums)。

3** 实现UDF函数(更细粒度的控制流)

  1. 函数类(Function Classes) 自定义类继承对应的函数类,可以传参。
  2. 匿名函数(Lambda Function)
  3. 富函数(Rich Function) DataStream API提供的一个函数类的接口,所有Flink函数类都有其Rich版本。与常规函数的区别是,可以获取运行环境的上下文,并拥有一些生命周期方法(open、close、invoke)。 如MapFunction对应RichMapFunction。

4. Sink

Flink对外输出操作必须利用Sink完成(addSink(new SinkFunction(){})),print()实际调用的也是DataStreamSink方法,此外,官方提供了一部分框架的Sink。(Kafka提供了Source和Sink) (1)Kafka

代码语言:javascript
复制
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>
代码语言:javascript
复制
env.addSink(new FlinkKafkaConsumer011[String]("${id:port}", "brokerList", new SimpleStringSchema()))

// 到这里就实现了Kafka进,Kafka出

(2)Redis

代码语言:javascript
复制
<dependency>
  <groupId>org.apache.bahir</groupId>
  <artifactId>flink-connector-redis-2.11</artifactId>
  <version>${bahir.version}</version>
</dependency>

(3)ES

代码语言:javascript
复制
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
  <version>${flink.version}</version>
</dependency>

(4)MySQL(JDBC连接)

代码语言:javascript
复制
<dependency>
  <groupId>mysql</groupId>
  <artifactId>mysql-connector-java</artifactId>
  <version>${mysql.version}</version>
</dependency>

需要自定义RichSinkFunction(),仅在初始化时调用连接

二、Flink Window API

1. Window概念

将无界数据流切分为有界数据流集进行处理,窗口(window)就是切分无界流的一种方式,将流数据分发到有限大小的桶(bucket)中进行分析。

(1)类型 Time Window:

  • 滚动时间窗口(Tumbling Windows) 将数据依据固定的窗口长度 windows size 1个参数对数据进行切分,时间对齐,窗口长度固定,没有重叠。
  • 滑动时间窗口(Sliding Windows) 由固定的窗口长度 windows size 和滑动间隔 slice 2个参数组成 ,窗口长度固定,可以有重叠。当滑动间距等于窗口长度时为滚动时间窗口。(同一个数据可能属于不同的窗口)
  • 会话窗口(Session Windows) 由一系列事件组合一个指定时间长度的timeout间隙组成,即一段时间没有接收到新的数据就会生成新的窗口。时间无对齐。(无计数窗口,因为不能舍弃一段数据)

Count Window

  • 滚动计数窗口
  • 滑动计数窗口

2. Window API

窗口分配器window()方法,必须在keyBy之后才能用,再做聚合操作。flink还提供了.timeWindow.countWindow方法。

(1)WindowAssigner window()方法接收的参数是一个WindowAssigner。 Flink提供了: 滚动窗口(.timeWindow(Time.secounds(15))); 滑动窗口(.timeWindow(Time.secounds(15), Time.secounds(15))); 会话窗口(.window(EventTimeSessionWindows.withGap(Time.minutes(10)))); 全局窗口(一个无界流); 滚动计数窗口(.countWindow(5)); 滑动计数窗口(.countWindow(10, 2))。

(2)WindowFunction 定义了要对窗口中收集的数据做的计算操作。

  • 增量聚合函数(incremental aggregation functions) 每条数据到来就进行计算,保持一个简单的状态,计算内容简单。ReduceFunction、AggregateFunction。
  • 全窗口函数(full window functions) 先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction。

(3)其他可选API .trigger():触发器,定义window什么时候关闭,触发计算并输出结果。 .evitor():移除器,定义移除某些数据的逻辑。 .allowedLateness():允许处理迟到(窗口关闭后)的数据。 .sideOutputLateData():将迟到的数据放入侧输出流。 .getSideOutPut():获取侧输出流。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-05-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Flink
  • 一、Flink流处理API
    • 1. Environment
      • 2. Source
        • 3. Transform
          • 3* 支持的数据类型
            • 3** 实现UDF函数(更细粒度的控制流)
              • 4. Sink
              • 二、Flink Window API
                • 1. Window概念
                  • 2. Window API
                  相关产品与服务
                  云数据库 MySQL
                  腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档