前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink之基础概念

Flink之基础概念

作者头像
丁D
发布2023-10-20 08:25:30
2180
发布2023-10-20 08:25:30
举报
文章被收录于专栏:老铁丁D老铁丁D

摘要本文介绍一下Flink一些基本概念并行度、slot及对应的组件

依赖

开发flink应用我们需要引入对应的maven依赖 flink-java、flink-streaming-java,以及 flink-clients(客户端,也可以省略)

代码语言:javascript
复制
<!-- 引入 Flink 相关依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

在属性中,我们定义了,这指代的是所依赖的 Scala 版本。这有一点 奇怪:Flink 底层是 Java,而且我们也只用 Java API,为什么还会依赖 Scala 呢?这是因为 Flink 的架构中使用了 Akka 来实现底层的分布式通信,而 Akka 是用 Scala 开发的。

编程模型

执行环境 -> source -> 算子逻辑 -> sink

1、创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); flink在1.12版本之前的流处理和批处理提供了两套api,从1.12官方推荐使用DataStream API 然后在提交任务 指定是流处理还是批处理

代码语言:javascript
复制
$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
3> (world,1)
2> (hello,1)
4> (flink,1)
2> (hello,2)
2> (hello,3)
1> (java,1)

前面的数据是指本地执行的不同线程,所以是乱序的,代表1~4代表了并行线程是4,并行度4,本地环境默认并行度是运行电脑的cpu个数

Flink组件 client(客户端) jobManager(作业管理器,相当master) taskManager(任务管理器,工作者,相当于worker)

jobmanager包含3三个组件 1、jobMaster:处理单独的job,和具体的job一一对应 2、resourceManager注意:这是Flink内置的资源管理器要跟跟其他平台的区分开 3、分发器:提供一个rest接口用来提交应用,并为每个新提交的作业启动一个新的jobmaster

“资源”,主要是指TaskManager的任务槽(task slots)。任务槽就是Flink集群中的资源调配单元,包含了机器用来执行计算的一组CPU和内存资源。每一个任务(Task)都需要分配到一个slot上执行。 slot是最小的调度单位,每一个 TaskManager 都包含了一定数量的任务槽(task slots)。slot 的数量限制了 TaskManager 能够并行处理的任务数量。

作业提交流程步骤: 1、客户端将程序通过分发器提供的rest接口,提交到jobmanager 2、分发器启动jobmaster,并将作业提交给jobmaster 3、jobmaster将jobGraph解析成可执行的executionGraph,得到所需的资源数量即slot的个数,然后向资源管理器请求资源 4、资源管理器判断当前是否有足够的资源,没有就启动新的taskManager 5、taskManager启动后向资源管理器注册自己的任务槽 6、资源管理器通知taskManager为新的作业提供slots 7、TaskManager 连接到对应的 JobMaster,提供 slots。 8、JobMaster 将需要执行的任务分发给 TaskManager。 9、TaskManager 执行任务,互相之间可以交换数据。

算子任务 source就是一个算子任务,sink也是,sum,map等都是

算子子任务 在 Flink 执行过程中,每一个算子(operator)可以包含一个或多个子任务(operator subtask), 这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行。

同一个算子子任务只能在不同的slot执行,不同算子的任务可以共享任务槽

所以我们要算这个作业需要多少slot,只需要找到算子任务最大的并行度,即算子子任务的个数

算子链 一个数据流在算子之间传输数据的形式可以是一对一(one-to-one)的直通 (forwarding)模式入map、filter、flatMap 等算子都是这种 one-to-one,也可以是打乱的重分区(redistributing)模式,具体是哪一种形式,取决于算子的种类。 并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task) 可以合并起来形成算子链一起共享一个slot 为什么这样设计?可以减少线程之间的切换,和基于缓存器的数据交换 ,减少延时,提高吞吐量

槽位slot

任务槽就是Flink集群中的资源调配单元,包含了机器用来执行计算的一组CPU和内存资源。每一个任务(Task)都需要分配到一个slot上执行。 slot是最小的调度单位,每一个 TaskManager 都包含了一定数量的任务槽(task slots)。slot 的数量限制了 TaskManager 能够并行处理的任务数量。

设置一个taskManager的slot数量 : taskmanager.numberOfTaskSlots: 8Slot 和并行度确实都跟程序的并行执行有关,但两者是完全不同的概念。 简单来说,taskslot 是 静 态 的 概 念 , 是 指 TaskManager 具 有 的 并 发 执 行 能 力 , 可 以 通 过 参 数taskmanager.numberOfTaskSlots 进行配置; 而并行度(parallelism)是动态概念,也就是TaskManager 运行程序时实际使用的并发能力,可以通过参数 parallelism.default 进行配置。 换句话说,并行度如果小于等于集群中可用 slot 的总数,程序是可以正常执行的,因为 slot 不一定要全部占用,有十分力气可以只用八分; 而如果并行度大于可用 slot 总数,导致超出了并行能力上限,那么心有余力不足,程序就只好等待资源管理器分配更多的资源了。

并行度 一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。

1、代码中设置,算子后面跟上并行度设置,优先级最高 stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2); 2、代码中设置,执行环境设置,这样所有的算子并行度都一样,优先级中 env.setParallelism(2); 3、如果代码中没设置,可以在提交作业的时候使用“-p”参数来设置,优先级低于代码设置,高于配置文件 3、配置文件设置,优先级最低 parallelism.default: 2

统计单词demo批处理

代码语言:javascript
复制
package _1wordcount;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* 批处理 单词统计
*/
public class WordCount {
public static void main(String[] args) throws Exception {
//创建执行环境 这里是批处理的执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//流处理执行环境
//StreamExecutionEnvironment env = StreamExecutionEnvironment
// .getExecutionEnvironment();
DataSource<String> dataSource = env
.readTextFile("D:\\java\\code\\flink\\flinkdemo\\src\\main\\resources\\_1wordcount.txt");
//map 做数据转换,输入1个返回1个,就是做类型转换
//flatMap 打散,平坦,输入1个,可以返回0个、1个、N个,(如下面按空格分隔,返回多个单词)
//keyby用于流处理,groupBy用在批处理
//这里返回的是一个元祖是因为groupBy只能返回元祖,不然会报错
DataSet<Tuple2<String, Long>> dataSet = dataSource
.flatMap(new WordCountFlatMap()).groupBy(0).sum(1);
dataSet.print();
//输出结果
/*(flink,1)
(world,1)
(hello,3)
(java,1)*/
}
public static class WordCountFlatMap implements FlatMapFunction<String, Tuple2<String, Long>> {
//input是输入的字符串
//collector 用来输出
public void flatMap(String input, Collector<Tuple2<String, Long>> collector) throws Exception {
if (StringUtils.isNotBlank(input)) {
for (String word : input.split(" ")) {
if (word.length() < 1) {
continue;
}
collector.collect(new Tuple2<>(word, 1L));
}
}
}
}
}

统计单词demo流处理

代码语言:javascript
复制
package _1wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* 流处理单词统计
*/
public class WordCountStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
//使用nc测试
DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 10008);
streamSource.setParallelism(1);//设置并行度
/*DataStream里没有reduce和sum这类聚合操作的方法,因为Flink设计中,所有数据必须先分组才能做聚合操作。
先keyBy得到KeyedStream,然后调用其reduce、sum等聚合操作方法。(先分组后聚合)*/
DataStream<WordWithCount> dataStream = streamSource
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String input, Collector<WordWithCount> collector) throws Exception {
for (String word : input.split(" ")) {
if (word.length() < 1) {
continue;
}
WordWithCount wordWithCount = new WordWithCount();
wordWithCount.setCount(1L);
wordWithCount.setWord(word);
collector.collect(wordWithCount);
}
}
}).keyBy("word").window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum("count");
dataStream.print();
//DataStream流式应用需要显示指定execute()方法运行程序,如果不调用则Flink流式程序不会执行
//对于DataSet API输出算子中已经包含了对execute()方法的调用,不需要显式调用execute()方法,否则程序会出异常。
env.execute("streaming word count");
/*3> WordWithCount{word='java', count=1}
5> WordWithCount{word='hello', count=1}
13> WordWithCount{word='flink', count=1}
5> WordWithCount{word='hello', count=1}
5> WordWithCount{word='hello', count=1}
13> WordWithCount{word='flink', count=1}
5> WordWithCount{word='hello', count=2}
13> WordWithCount{word='flink', count=2}
5> WordWithCount{word='hello', count=1}
13> WordWithCount{word='flink', count=1}
3> WordWithCount{word='java', count=1}
4> WordWithCount{word='f', count=1}
5> WordWithCount{word='hello', count=2}*/
}
public static class WordWithCount {
private String word;
private Long count;
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public Long getCount() {
return count;
}
public void setCount(Long count) {
this.count = count;
}
}
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-05-05 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 依赖
  • 编程模型
  • 统计单词demo批处理
  • 统计单词demo流处理
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档