前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink1.10任务提交流程分析(二)

Flink1.10任务提交流程分析(二)

作者头像
Flink实战剖析
发布2022-04-18 13:29:21
6220
发布2022-04-18 13:29:21
举报
文章被收录于专栏:Flink实战剖析Flink实战剖析

Flink1.10任务提交流程分析(一)中分析了从flink run开始到任务提交到集群前的流程分析,对于不同的提交模式Flink中使用不同的PipelineExecutor,本篇基于yarn-per-job模式分析向yarn-cluster提交任务的流程。(注:基于1.10.1分析)

YarnJobClusterExecutor

接着上篇的分析,任务最终提交是交给PipelineExecutor来execute,PipelineExecutor的选择是根据不同的提交模式来决定即execution.target参数来决定,对于yarn-per-job会选择YarnJobClusterExecutor类型的executor。

代码语言:javascript
复制
public class YarnJobClusterExecutor extends AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> {
   public static final String NAME = "yarn-per-job";
   public YarnJobClusterExecutor() {
      super(new YarnClusterClientFactory());
   }
}

其实现比较简单,比较重要其构造器中YarnClusterClientFactory,用于创建YarnClusterDescriptor,包含了yarn客户端YarnClient、yarn配置、提交yarn的队列等一些提交yarn的信息。它继承了AbstractJobClusterExecutor 抽象任务提交executor,execute也是由AbstractJobClusterExecutor来执行:

代码语言:javascript
复制
public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements PipelineExecutor {

   private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class);
   //代表的就是YarnClusterClientFactory
   private final ClientFactory clusterClientFactory;

   public AbstractJobClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
      this.clusterClientFactory = checkNotNull(clusterClientFactory);
   }

   //执行任务提交
   //pipeline 代表StreamGraph
   public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
      //将StreamGraph转换为JobGraph
      final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
     //创建提交任务的一些信息:YarnClusterDescriptor
      try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
        //将配置信息封装在ExecutionConfigAccessor中
         final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
         //包含了提交任务所需资源描述:内存大小、并行度 
         final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
         //提交任务
         final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor
               .deployJobCluster(clusterSpecification, jobGraph, 
                                 //是否采用分离模式
                                 configAccessor.getDetachedMode());
         LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());

         return CompletableFuture.completedFuture(
               new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID()));
      }
   }
}

关于ClusterSpecification中描述了任务提交到集群所需的资源大小,对于分配模式建议详读一下官网Flink1.10的内存管理机制便于更好的理解。任务最终交给YarnClusterDescriptor deploy。

Deploy过程

deploy过程代表了与yarn交互的过程,clusterDescriptor.deployJobCluster会调用内部deployInternal方法:

代码语言:javascript
复制
private ClusterClientProvider<ApplicationId> deployInternal(
      ClusterSpecification clusterSpecification,
      String applicationName,
      String yarnClusterEntrypoint,
      @Nullable JobGraph jobGraph,
      boolean detached) throws Exception {
    //..... 会做一些检查工作: yarn队列是否存在、配置检查
    //校验资源大小等等
   ApplicationReport report = startAppMaster(
         flinkConfiguration,
         applicationName,
         yarnClusterEntrypoint,
         jobGraph,
         yarnClient,
         yarnApplication,
         validClusterSpecification);

   //....
}

最重的就是startAppMaster,在yarn上启动一个AppMaster进程,其中yarnClusterEntrypoint表示该进程的入口类,也就是JobMaster的启动入口类:org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint, 在集群的机器进程上也能看到该类,如果看到这个进程我们就可知道表示的是JobMaster的进程。

startAppMaster的过程比较长,这里也会逐一分解:

代码语言:javascript
复制
private ApplicationReport startAppMaster(
      Configuration configuration,
      String applicationName,
      String yarnClusterEntrypoint,
      JobGraph jobGraph,
      YarnClient yarnClient,
      YarnClientApplication yarnApplication,
      ClusterSpecification clusterSpecification) throws Exception {

   // ------------------ Initialize the file systems -------------------------

   org.apache.flink.core.fs.FileSystem.initialize(
         configuration,
         PluginUtils.createPluginManagerFromRootFolder(configuration));

   //获取homeDir, 表示jar、log配置上传的路径, 一般表示在hdfs上
   //其路径为/user/hadoop, (hadoop表示的当前的用户)
   final FileSystem fs = FileSystem.get(yarnConfiguration);
   final Path homeDir = fs.getHomeDirectory();
   //提交到yarn的描述信息
   ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
   // 会被上传到hdfs的文件 并且被添加到classpath中
   Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
   // 仅仅是会被上传到hdfs , 但是不会被添加到classpath
   Set<File> shipOnlyFiles = new HashSet<>();
   for (File file : shipFiles) {
      systemShipFiles.add(file.getAbsoluteFile());
   }

   final String logConfigFilePath = configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
   if (logConfigFilePath != null) {
      systemShipFiles.add(new File(logConfigFilePath));
   }
   //将flink_home/lib 下的文件添加到systemShipFiles、通过-yt指定的文件也在里面
   addLibFoldersToShipFiles(systemShipFiles);

   //将flink_home/plugins 下的文件添加到shipOnlyFiles
   addPluginsFoldersToShipFiles(shipOnlyFiles);

   final ApplicationId appId = appContext.getApplicationId();

   // zk-ha相关的配置
   String zkNamespace = getZookeeperNamespace();
   // no user specified cli argument for namespace?
   if (zkNamespace == null || zkNamespace.isEmpty()) {
      // namespace defined in config? else use applicationId as default.
      zkNamespace = configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
      setZookeeperNamespace(zkNamespace);
   }

   configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);

   if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
      // activate re-execution of failed applications
      appContext.setMaxAppAttempts(
            configuration.getInteger(
                  YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                  YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));

      activateHighAvailabilitySupport(appContext);
   } else {
      // set number of application retries to 1 in the default case
      appContext.setMaxAppAttempts(
            configuration.getInteger(
                  YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                  1));
   }

  //userJarFiles  表示用户jar
   final Set<File> userJarFiles = (jobGraph == null)
         // not per-job submission
         ? Collections.emptySet()
         // add user code jars from the provided JobGraph
         : jobGraph.getUserJars().stream().map(f -> f.toUri()).map(File::new).collect(Collectors.toSet());

   //需要cache文件上传到hdfs,一般使用在文件共享中
   if (jobGraph != null) {
      for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : jobGraph.getUserArtifacts().entrySet()) {
         org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(entry.getValue().filePath);
         // only upload local files
         if (!path.getFileSystem().isDistributedFS()) {
            Path localPath = new Path(path.getPath());
            Tuple2<Path, Long> remoteFileInfo =
               Utils.uploadLocalFileToRemote(fs, appId.toString(), localPath, homeDir, entry.getKey());
            jobGraph.setUserArtifactRemotePath(entry.getKey(), remoteFileInfo.f0.toString());
         }
      }

      jobGraph.writeUserArtifactEntriesToConfiguration();
   }

   //表示启动appMaster需要的资源文件,会从hdfs上下载
   final Map<String, LocalResource> localResources = new HashMap<>(2 + systemShipFiles.size() + userJarFiles.size());
   // 访问hdfs的安全设置
   final List<Path> paths = new ArrayList<>(2 + systemShipFiles.size() + userJarFiles.size());
   // 启动taskExecutor需要的资源文件
   StringBuilder envShipFileList = new StringBuilder();

   //几个uploadAndRegisterFiles  方法,将systemShipFiles、shipOnlyFiles、用户jar上传到hdfs

   if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
      systemClassPaths.addAll(userClassPaths);
   }

   // normalize classpath by sorting
   Collections.sort(systemClassPaths); //系统的一些classpath 排序
   Collections.sort(userClassPaths); //用户classpath 排序

   // classPathBuilder: 存放classpath的信息
   StringBuilder classPathBuilder = new StringBuilder();
     /*
      * 构建classpath: shipFile-jar、user-jar、log4j、yaml配置文件
      */

   final Path yarnFilesDir = getYarnFilesDir(appId);
   FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
   fs.setPermission(yarnFilesDir, permission); // set permission for path.

   /*
    *中间一堆与安全相关的配置
    */

  //执行的java命令信息,启动YarnJobClusterEntrypoint 
   final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
         yarnClusterEntrypoint,
         hasLogback,
         hasLog4j,
         hasKrb5,
         clusterSpecification.getMasterMemoryMB());

   if (UserGroupInformation.isSecurityEnabled()) {
      // set HDFS delegation tokens when security is enabled
      LOG.info("Adding delegation token to the AM container.");
      Utils.setTokensFor(amContainer, paths, yarnConfiguration);
   }

   amContainer.setLocalResources(localResources);
   fs.close();

   // Setup CLASSPATH and environment variables for ApplicationMaster
   final Map<String, String> appMasterEnv = new HashMap<>();
   /**
    * 配置环境变量参数  到  appMasterEnv中,在启动启动YarnJobClusterEntrypoint时用到,
    * 例如: classpath、hadoopUser、appId等
    */

   amContainer.setEnvironment(appMasterEnv);

    // 还有一堆设置提交任务队列、yarn任务名称的配置信息

   // add a hook to clean up in case deployment fails
   Thread deploymentFailureHook = new DeploymentFailureHook(yarnApplication, yarnFilesDir);
   Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
   LOG.info("Submitting application master " + appId);
   //提交任务  
   yarnClient.submitApplication(appContext);

   /**
    *  获取任务状态
    */
}

这部分的流程比较长,总结一下主要有以下几点:

  1. 将shipFiles、plugins、userJar、logFile、flink-conf.yaml、job.graph等文件上传到hdfs
  2. 构建启动需要的classpath、ha-zk配置、安全配置、jobMaster启动命令等
  3. 向yarn提交任务

在yarn上启动成功后,在JobMaster的工作目录可以看到launch_container.sh这样的一个文件,这个文件里面包含了在startAppMaster所做的所有环境变量参数设置、启动命令。

总结

本篇主要介绍了yarn-per-job的任务提交流程,结合前面两篇的分析,到现在应该掌握了如何通过API的方式去实现任务的提交,我认为重要有两点:一是做好参数的解析、配置,二是选择一个合适的PipelineExecutor提交任务。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-06-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Flink实战剖析 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • YarnJobClusterExecutor
  • Deploy过程
  • 总结
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档