Flink整体由JobManager和TaskManager组成,遵循主从设计原则,JobManager为Master节点,TaskManager为worker节点,组件之间通信是借助Akka Framework;
集群部署这里指的是Flink standalone模式,因为在Yarn模式(包括session、single job模式也成Per-job模式)是可以仅通过Flink client提交任务到Yarn上,所以是否手动部署Flink集群对任务的执行是没有影响的。下图[1]是简单的Flink的集群构成情况,包括一个master(JobManager)、两个worker(TaskManager)。至于Flink standalone模式的HA(一般有两个JobManager加上若干个TaskManager组成)是通过zookeeper实现的,其主要思想是通过zookeeper选举出JobManager的active节点,该结点负责资源分配等,另一个节点为standby。Flink的Yarn模式的高可用的通过在container中对JobManager的重启实现的,其具体过程在此不详细说明。 部署阶段主要有以下两个参数:
#每个TaskManager中slot的个数,在conf/flink-conf.yaml中,默认为1
2 taskmanager.numberOfTaskSlots
3 #任务的并行度,默认为1
4 parallelism.default
Flink集群资源的使用情况除了和已分配的资源有关外,还与并行度有关,已分配的资源表示Flink可以利用这么多资源,而实际能利用多少资源还和并行度有关。任务并行度的最大值由TaskManager集群的Slot总数决定,如:slot总数为10,则任务最大的并行度为10. 在standalone模式中,Flink任务能利用的总资源已在启动集群时确定,其并行通过在执行./flink run 时,通过可选参数[-p]确定(不指定则为默认值1)。 在Yarn模式中,均是先Yarn集群中分配资源给新建的Flink集群,如下图,其详细过程见文档[2]。
Yarn资源分配
Flink的Yarn模式session、single job模式在实现方法上还是存在一定的区别: 1)session模式是先利用yarn-session.sh在yarn新建一个Flink集群,然后利用./flink run flinkExample.jar提交任务,其资源分配的方式和standalone模式一致;
./yarn-session.sh -n 4 -s 8 -jm 3072 -tm 32768
-n:在yarn上分配了4个container,即分配了4个TaskManager; -s:每个TaskManager有8个slot; -jm:每个JobManager中的内存数; -tm:每个TaskManager中的内存数; 2)single job模式在命令中在申请资源的同时,提交任务,常用命令如下:
./flink run -m yarn-cluster -yn 7 -ys 8 -yjm 1024 -ytm 1024 example.jar
-yn:yarn上分配的container个数,即TaskManager的个数; -ys:每个TaskManager中slot的个数 其他参数的具体使用方法可以在控制台中执行./flink、./yarn-session.sh得到说明。 说明:session模式和single job的区别: 1)session模式:Flink任务运行结束后,从yarn上申请到的资源是不被释放的,等待下一个Flink的提交,类似一个常住在yarn上永远不会结束的yarn任务。因为,yarn的资源分配模式中比如fair策略还是存在资源的竞争的,session模式资源的不释放性,这样可以在Yarn提供资源分配上的基础上进行实现资源隔离,也实现了对集群物理环境的屏蔽,但在一定的程度上造成了资源的浪费; 2)single job模式和一般的yarn任务一样,任务结束后就会释放申请到的资源,使资源得到重复利用。
【说明】这里所说的ResourceManager不是Yarn的组件,是Flink本身的,关于深层的分析,见后续博客。
假定一个3个TaskManager的集群,每个TaskManager有3个slot,集群一个9个slot,从下图中可以得到[3]:任务的并行度为1时,只会用一个slot,并行度为2会用2个slot,依次类推,当并行度为9时,9个slot都会被利用,当然从example4中可知,可以针对不同的算子(operator)定义并行度。
taskmanager和slot关系.png
一、Task和Operator Chains Flink会在生成JobGraph阶段,将代码中可以优化的算子优化成一个算子链(Operator Chains)以放到一个task(一个线程)中执行,以减少线程之间的切换和缓冲的开销,提高整体的吞吐量和延迟。
Operator Chain.png
算子之间是否可以组成一个Operator Chains,看是否满足以下条件:
算子被定义后,先根据条件优化算子链 ,然后组成一个个subtask,最后根据是否可以共享slot分布在taskManager的slot中执行。
Operator Chain参考:Operator Chains 未完待续!!!