首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hadoop总结篇之三---一个Job到底被提交到哪去了

Hadoop总结篇之三---一个Job到底被提交到哪去了

作者头像
小端
发布2018-04-16 10:46:18
6050
发布2018-04-16 10:46:18
举报
文章被收录于专栏:java架构师java架构师

我们会定义Job,我们会定义map和reduce程序。那么,这个Job到底是怎么提交的?提交到哪去了?它到底和集群怎么进行交互的呢?

这篇文章将从头讲起。

开发hadoop的程序时,一共有三大块,也就是Driver、map、reduce,在Driver中,我们要定义Configuration,定义Job,在mian方法最后,往往会以这么一段代码结尾:

if (!job.waitForCompletion(true))
			return;

而这句的作用,就是提交了我们的Job。进入代码里(其实就是Job类)我们可以看到具体实现:

public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
//这句是重点,提交。那么从代码里看出这个似乎是异步提交啊,否则后面的监测怎么执行呢?我们拭目以待
      submit();
    }
    if (verbose) {
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
     //从配置里取得轮训的间隔时间,来分析当前job是否执行完毕
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }

  依然在Job.class里,这个方法主要动作有二,一是找到集群,二是讲Job提交到集群

 public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
   //连接集群/master
    connect();
  //构造提交器
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
      //提交
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }

  我们继续往下看,看下提交的时候都做了什么?

JobStatus submitJobInternal(Job job, Cluster cluster)
			throws ClassNotFoundException, InterruptedException, IOException {

		// 检查输出目录合法性(已存在?没指定?),这就是为什么每次提交作业,总是这个 错比较靠前的报出来
		checkSpecs(job);

		Configuration conf = job.getConfiguration();
		// 将框架提交到集群缓存(具体左右还未知?)
		addMRFrameworkToDistributedCache(conf);

		// 获得登录区,用以存放作业执行过程中用到的文件,默认位置/tmp/hadoop-yarn/staging/root/.staging
		// ,可通过yarn.app.mapreduce.am.staging-dir修改
		Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
		// configure the command line options correctly on the submitting dfs
		// 这是获取和设置提交job机器的地址和主机名
		InetAddress ip = InetAddress.getLocalHost();
		if (ip != null) {
			submitHostAddress = ip.getHostAddress();
			submitHostName = ip.getHostName();
			conf.set(MRJobConfig.JOB_SUBMITHOST, submitHostName);
			conf.set(MRJobConfig.JOB_SUBMITHOSTADDR, submitHostAddress);
		}
		// 取得当前Job的ID(后面详细关注此处)
		JobID jobId = submitClient.getNewJobID();
		job.setJobID(jobId);
		// 作业提交目录
		Path submitJobDir = new Path(jobStagingArea, jobId.toString());
		JobStatus status = null;
		try {
			conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName());
			conf.set("hadoop.http.filter.initializers",
					"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
			conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
			LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir");
			// get delegation token for the dir
			TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf);

			populateTokenCache(conf, job.getCredentials());

			// generate a secret to authenticate shuffle transfers
			if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
				KeyGenerator keyGen;
				try {

					int keyLen = CryptoUtils.isShuffleEncrypted(conf)
							? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,
									MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
							: SHUFFLE_KEY_LENGTH;
					keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
					keyGen.init(keyLen);
				} catch (NoSuchAlgorithmException e) {
					throw new IOException("Error generating shuffle secret key", e);
				}
				SecretKey shuffleKey = keyGen.generateKey();
				TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials());
			}
			// 从本地copy文件到hdfs,比如我们提交的wordcount.jar
			copyAndConfigureFiles(job, submitJobDir);

			Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

			// Create the splits for the job,其实也就是确定了map的数量
			LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
			int maps = writeSplits(job, submitJobDir);
			conf.setInt(MRJobConfig.NUM_MAPS, maps);
			LOG.info("number of splits:" + maps);

			// write "queue admins of the queue to which job is being submitted"
			// to job file.
			String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME);
			AccessControlList acl = submitClient.getQueueAdmins(queue);
			conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

			// removing jobtoken referrals before copying the jobconf to HDFS
			// as the tasks don't need this setting, actually they may break
			// because of it if present as the referral will point to a
			// different job.
			TokenCache.cleanUpTokenReferral(conf);

			if (conf.getBoolean(MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
					MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
				// Add HDFS tracking ids
				ArrayList<String> trackingIds = new ArrayList<String>();
				for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) {
					trackingIds.add(t.decodeIdentifier().getTrackingId());
				}
				conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
						trackingIds.toArray(new String[trackingIds.size()]));
			}

			// Set reservation info if it exists
			ReservationId reservationId = job.getReservationId();
			if (reservationId != null) {
				conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
			}
			// Write job file to submit dir
			writeConf(conf, submitJobFile);

			//
			// Now, actually submit the job (using the submit name)
			//
			printTokens(jobId, job.getCredentials());
              //提交!!!!!!!!
			status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
			if (status != null) {
				return status;
			} else {
				throw new IOException("Could not launch job");
			}
		} finally {
			if (status == null) {
				LOG.info("Cleaning up the staging area " + submitJobDir);
				if (jtFs != null && submitJobDir != null)
					jtFs.delete(submitJobDir, true);

			}
		}
	}

  那么这个最终提交用到的submitClient是哪来的?他是怎么定义的?

它是上文提到的,连接集群的时候创建的。这个集群定义了很多信息:客户端信息、用户组信息、文件系统信息,配置信息,历史job目录,系统目录等。其中客户端信息,提供了初始化方法,如下:

public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.conf = conf;
    this.ugi = UserGroupInformation.getCurrentUser();
//初始化是重点
    initialize(jobTrackAddr, conf);
  }

  具体看下初始化过程:

private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    synchronized (frameworkLoader) {
      for (ClientProtocolProvider provider : frameworkLoader) {
        LOG.debug("Trying ClientProtocolProvider : "
            + provider.getClass().getName());
  //根据配置,创建客户端协议提供者
        ClientProtocol clientProtocol = null; 
        try {
          if (jobTrackAddr == null) {
  //提供者返回的是一个具体的协议
            clientProtocol = provider.create(conf);
          } else {
            clientProtocol = provider.create(jobTrackAddr, conf);
          }

          if (clientProtocol != null) {
            clientProtocolProvider = provider;
  //看到没?协议是什么?协议其实就是个类,里面封装了一些约定好的属性,以及操作这些属性的方法。实例化为对象后,就是一个可用于通信的客户端
            client = clientProtocol;
            LOG.debug("Picked " + provider.getClass().getName()
                + " as the ClientProtocolProvider");
            break;
          }
          else {
            LOG.debug("Cannot pick " + provider.getClass().getName()
                + " as the ClientProtocolProvider - returned null protocol");
          }
        } 
        catch (Exception e) {
          LOG.info("Failed to use " + provider.getClass().getName()
              + " due to error: " + e.getMessage());
        }
      }
    }

    if (null == clientProtocolProvider || null == client) {
      throw new IOException(
          "Cannot initialize Cluster. Please check your configuration for "
              + MRConfig.FRAMEWORK_NAME
              + " and the correspond server addresses.");
    }
  }

  创建客户端协议提供者,用java.util.ServiceLoader,目前包含两个具体实现,LocalClientProtocolProvider(本地作业) YarnClientProtocolProvider(Yarn作业),此处会根据mapreduce.framework.name的配置选择使用哪个创建相应的客户端。

而YarnClientProtocolProvider的本质是创建了一个YarnRunner对象

 public ClientProtocol create(Configuration conf) throws IOException {
    if (MRConfig.YARN_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) {
      return new YARNRunner(conf);
    }
    return null;
  }

  YarnRunner对象是干什么的?根据注释解释,是让当前JobClient在yarn上运行的。提供一些提交Job啊,杀死Job之类的方法。它实现了ClientProtocol接口,上面讲的提交的最后一步,其实最终就是调用了YarnRunner的submitJob方法。

它里面封装了ResourceMgrDelegate委托,委托的方法正是YarnClient类里的提交方法submitApplication。这样,当前作业(Application)提交过程,走到了YarnClient阶段。

总结:Job目前提交到了YarnClient实例中。那么YarnClient接下来怎么处理呢?

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-06-01 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档