前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink Yarn Cluster & HA

Flink Yarn Cluster & HA

作者头像
编程那点事
发布2023-02-25 16:08:52
8470
发布2023-02-25 16:08:52
举报
文章被收录于专栏:java编程那点事

在一个企业中,为了最大化的利用集群资源,一般都会在一个集群中同时运行多种类型的 Workload。因此 Flink 也支持在 Yarn 上面运行。首先,让我们了解下 Yarn 和 Flink 的关系。

1.png

在图中可以看出,Flink 与 Yarn 的关系与 MapReduce 和 Yarn 的关系是一样的。Flink 通过 Yarn 的接口实现了自己的 App Master。当在 Yarn 中部署了 Flink,Yarn 就会用自己的 Container 来启动 Flink 的 JobManager(也就是 App Master)和 TaskManager。

启动新的Flink YARN会话时,客户端首先检查所请求的资源(容器和内存)是否可用。之后,它将包含Flink和配置的jar上传到HDFS(步骤1)。

客户端的下一步是请求(步骤2)YARN容器以启动ApplicationMaster(步骤3)。由于客户端将配置和jar文件注册为容器的资源,因此在该特定机器上运行的YARN的NodeManager将负责准备容器(例如,下载文件)。完成后,将启动ApplicationMaster(AM)。

该JobManager和AM在同一容器中运行。一旦它们成功启动,AM就知道JobManager(它自己的主机)的地址。它正在为TaskManagers生成一个新的Flink配置文件(以便它们可以连接到JobManager)。该文件也上传到HDFS。此外,AM容器还提供Flink的Web界面。YARN代码分配的所有端口都是临时端口。这允许用户并行执行多个Flink YARN会话。

之后,AM开始为Flink的TaskManagers分配容器,这将从HDFS下载jar文件和修改后的配置。完成这些步骤后,即可建立Flink并准备接受作业。

修改环境变量
代码语言:javascript
复制
export  HADOOP_CONF_DIR= /opt/module/hadoop-2.7.6/etc/hadoop
部署启动
代码语言:javascript
复制
$ yarn-session.sh -d -s 1 -tm 800 -n 2

-n : TaskManager的数量,相当于executor的数量

-s : 每个JobManager的core的数量,executor-cores。建议将slot的数量设置每台机器的处理器数量

-tm : 每个TaskManager的内存大小,executor-memory

-jm : JobManager的内存大小,driver-memory

上面的命令的意思是,同时向Yarn申请3个container,其中 2 个 Container 启动 TaskManager(-n 2),每个 TaskManager 拥有两个 Task Slot(-s 2),并且向每个 TaskManager 的 Container 申请 800M 的内存,以及一个ApplicationMaster(Job Manager)。

Flink部署到Yarn Cluster后,会显示Job Manager的连接细节信息。

Flink on Yarn会覆盖下面几个参数,如果不希望改变配置文件中的参数,可以动态的通过-D选项指定,如

代码语言:javascript
复制
-Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368

jobmanager.rpc.address:因为JobManager会经常分配到不同的机器上

taskmanager.tmp.dirs:使用Yarn提供的tmp目录

parallelism.default:如果有指定slot个数的情况下

yarn-session.sh会挂起进程,所以可以通过在终端使用CTRL+C或输入stop停止yarn-session。

如果不希望Flink Yarn client长期运行,Flink提供了一种detached YARN session,启动时候加上参数-d或—detached

在上面的命令成功后,我们就可以在 Yarn Application 页面看到 Flink 的纪录。

如果在虚拟机中测试,可能会遇到错误。这里需要注意内存的大小,Flink 向 Yarn 会申请多个 Container,但是 Yarn 的配置可能限制了 Container 所能申请的内存大小,甚至 Yarn 本身所管理的内存就很小。这样很可能无法正常启动 TaskManager,尤其当指定多个 TaskManager 的时候。因此,在启动 Flink 之后,需要去 Flink 的页面中检查下 Flink 的状态。这里可以从 RM 的页面中,直接跳转(点击 Tracking UI)。

yarn-session.sh启动命令参数如下:

代码语言:javascript
复制
$ yarn-session.sh --help

Usage:

Required

-n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)

Optional

-D <property=value>             use value for given property

-d,--detached                   If present, runs the job in detached mode

-h,--help                       Help for the Yarn session CLI.

-id,--applicationId <arg>       Attach to running YARN session

-j,--jar <arg>                  Path to Flink jar file

-jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)

-m,--jobmanager <arg>           Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.

-n,--container <arg>            Number of YARN container to allocate (=Number of Task Managers)

-nl,--nodeLabel <arg>           Specify YARN node label for the YARN application

-nm,--name <arg>                Set a custom name for the application on YARN

-q,--query                      Display available YARN resources (memory, cores)

-qu,--queue <arg>               Specify YARN queue.

-s,--slots <arg>                Number of slots per TaskManager

-st,--streaming                 Start Flink in streaming mode

-t,--ship <arg>                 Ship files in the specified directory (t for transfer)

-tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)

-yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)

-z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode
提交任务

之后,我们可以通过这种方式提交我们的任务

代码语言:javascript
复制
$ ./bin/flink run -m yarn-cluster -yn2 ./examples/batch/WordCount.jar --input /opt/wcinput/wc.txt --output /opt/wcoutput/

bin/flink run -m yarn-cluster -yn2 examples/batch/WordCount.jar --input /input/ --output /Andy

以上命令在参数前加上y前缀,-yn表示TaskManager个数。

在这个模式下,同样可以使用-m yarn-cluster提交一个"运行后即焚"的detached yarn(-yd)作业到yarn cluster。

停止yarn cluster
代码语言:javascript
复制
yarn application -kill application_1539058959130_0001
Yarn模式的HA

应用最大尝试次数(yarn-site.xml),您必须配置为尝试应用的最大数量的设置yarn-site.xml,当前YARN版本的默认值为2(表示允许单个JobManager失败)。

代码语言:javascript
复制
<property>

 <name>yarn.resourcemanager.am.max-attempts</name>

 <value>4</value>

 <description>The maximum number of application master execution attempts</description>

</property>

申请尝试(flink-conf.yaml),您还必须配置最大尝试次数 conf/flink-conf.yaml

代码语言:javascript
复制
yarn.application-attempts:10

示例:高度可用的YARN会话

配置HA模式和zookeeper法定人数在conf/flink-conf.yaml

代码语言:javascript
复制
high-availability: zookeeper

high-availability.zookeeper.quorum: bigdata11:2181,bigdata12:2181,bigdata13:2181

high-availability.storageDir: hdfs:///flink/recovery

high-availability.zookeeper.path.root: /flink

yarn.application-attempts: 10

配置ZooKeeper的服务器中conf/zoo.cfg(目前它只是可以运行每台机器的单一的ZooKeeper服务器):

代码语言:javascript
复制
server.1=bigdata11:2888:3888
server.2=bigdata12:2888:3888
server.3=bigdata13:2888:3888

启动ZooKeeper仲裁:

代码语言:javascript
复制
$ bin / start-zookeeper-quorum.sh

启动HA群集:

代码语言:javascript
复制
$ bin / yarn-session.sh -n 2

错误异常

1.身份认证失败

代码语言:javascript
复制
flink run examples/streaming/SocketWindowWordCount.jar --port 9000

Starting execution of program

------------------------------------------------------------

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: b7a99ac5db242290413dbebe32ba52b0) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:267) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) at org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804) at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:215)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1044)

   at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)

 at java.security.AccessController.doPrivileged(Native Method)

 at javax.security.auth.Subject.doAs(Subject.java:422)

 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)

Caused by: java.net.ConnectException: Connection refused (Connection refused)

  at java.net.PlainSocketImpl.socketConnect(Native Method)

   at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)

   at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)

   at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)

   at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

   at java.net.Socket.connect(Socket.java:589)

通过查看日志,发现有如下报错

代码语言:javascript
复制
ERRORorg.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - Authentication failed

解决法案:添加定时任务认证kerberos

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019-03-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 修改环境变量
  • 部署启动
  • Flink部署到Yarn Cluster后,会显示Job Manager的连接细节信息。
  • 提交任务
  • 停止yarn cluster
  • Yarn模式的HA
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档