Mapreduce 任务提交源码分析1

提交过程

一般我们mapreduce任务是通过如下命令进行提交的

$HADOOP_HOME/bin/hadoop jar $MR_JAR $MAIN_CLASS

hadoop脚本中有如下代码

    elif [ "$COMMAND" = "jar" ] ; then
      CLASS=org.apache.hadoop.util.RunJar

//... 略
    exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"

可以看到hadoop命令提交mapreduce其实就是执行了org.apache.hadoop.util.RunJar类的main方法,接下来我们来看下这个main方法,只关注最核心的逻辑,其他不重要的部分略去。

public static void main(String[] args) throws Throwable {
  String usage = "RunJar jarFile [mainClass] args...";
  // 第一个参数是jar文件路径,第二个参数是主类名(可选),后续跟其他参数
  // ...
  

  int firstArg = 0;
  String fileName = args[firstArg++];
  File file = new File(fileName);
  // 构建jar文件对象
  // ...

  // --: 这部分逻辑是获取主类名
  //     优先从jar文件的Manifest信息中获取主类名; (只有当打包jar时采用可运行的jar文件的方式才有这个信息,否则普通的jar文件中不包含该信息)
  //     如果无法获取到,则采用第二参数值作为主类名;
  // ---------------------------------------------------------------------------
  String mainClassName = null;

  JarFile jarFile;
  try {
    jarFile = new JarFile(fileName);
  } catch(IOException io) {
    throw new IOException("Error opening job jar: " + fileName)
      .initCause(io);
  }

  Manifest manifest = jarFile.getManifest();
  if (manifest != null) {
    mainClassName = manifest.getMainAttributes().getValue("Main-Class");
  }
  jarFile.close();

  if (mainClassName == null) {
    if (args.length < 2) {
      System.err.println(usage);
      System.exit(-1);
    }
    mainClassName = args[firstArg++];
  }
  mainClassName = mainClassName.replaceAll("/", ".");
  // ---------------------------------------------------------------------------

  // 获取Hadoop临时目录,指的是本地操作系统的一个目录
  // hadoop.tmp.dir配置可在core-site.xml配置文件中配置
  File tmpDir = new File(new Configuration().get("hadoop.tmp.dir"));
  ensureDirectory(tmpDir);

  // --:为这个任务在临时目录下面创建一个临时的工作目录,目录名的格式为:“hadoop-unjar + Long型随机数”
  //------------------------------------------------------------------------------
  final File workDir;
  try { 
    workDir = File.createTempFile("hadoop-unjar", "", tmpDir);
  } catch (IOException ioe) {
    System.err.println("Error creating temp dir in hadoop.tmp.dir "
                       + tmpDir + " due to " + ioe.getMessage());
    System.exit(-1);
    return;
  }

  if (!workDir.delete()) {
    System.err.println("Delete failed for " + workDir);
    System.exit(-1);
  }
  ensureDirectory(workDir);
  //------------------------------------------------------------------------------

  // 为Java进程添加一个钩子程序,当程序shutdown时清楚临时工作目录
  ShutdownHookManager.get().addShutdownHook(
    new Runnable() {
      @Override
      public void run() {
        FileUtil.fullyDelete(workDir);
      }
    }, SHUTDOWN_HOOK_PRIORITY);

  // 将jar文件里面的内容解压到这个临时的工作目录
  unJar(file, workDir);

  // -- 将:
  // workDir/, workDir/classes/, workDir/lib/${allfiles} 添加到classpath
  //------------------------------------------------------------------------------
  ArrayList<URL> classPath = new ArrayList<URL>();
  classPath.add(new File(workDir+"/").toURI().toURL());
  classPath.add(file.toURI().toURL());
  classPath.add(new File(workDir, "classes/").toURI().toURL());
  File[] libs = new File(workDir, "lib").listFiles();
  if (libs != null) {
    for (int i = 0; i < libs.length; i++) {
      classPath.add(libs[i].toURI().toURL());
    }
  }
  //------------------------------------------------------------------------------
  

  // --: 以上面的classpath创建一个URLClassLoader,作为主线程的classLoader
  ClassLoader loader =
    new URLClassLoader(classPath.toArray(new URL[0]));
  Thread.currentThread().setContextClassLoader(loader);

  // --:通过反射的机制去调用主类的main方法,并将除jar文件和[mainClass]以外的所有参数传递给该main方法
  Class<?> mainClass = Class.forName(mainClassName, true, loader);
  Method main = mainClass.getMethod("main", new Class[] {
    Array.newInstance(String.class, 0).getClass()
  });
  String[] newArgs = Arrays.asList(args)
    .subList(firstArg, args.length).toArray(new String[0]);
  try {
    main.invoke(null, new Object[] { newArgs });
  } catch (InvocationTargetException e) {
    throw e.getTargetException();
  }
}

总结

hadoop命令做的事情就是将jar文件的内容解压到临时工作目录,并将解压后的workDir/, workDir/classes/, workDir/lib/${allfiles} 一系列路径加入到自定义的ClassLoader中,并通过反射的机制去执行jar文件中Manifest中的主类或是用户指定的主类。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Android常用基础

Android中的ClassLoader分析

Dalvik是Google公司自己设计用于Android平台的Java虚拟机。它可以支持已转换为.dex(即Dalvik Executable)格式的Java应...

37610
来自专栏Golang语言社区

GO语言标准库概览

在Go语言五周系列教程的最后一部分中,我们将带领大家一起来浏览一下Go语言丰富的标准库。 Go标准库包含了大量包,提供了丰富广泛的功能特性。这里提供了概览仅仅是...

29040
来自专栏IT笔记

聊一聊生产环境中如何动态监听配置文件变化并重载

上一篇,我们谈到Java中的几种读取properties配置文件的方式,但是在生产环境中,最忌讳的就是重启应用了。比如某个系统的路径常量或者接口变更,需要线上及...

474110
来自专栏Danny的专栏

探秘BOF 和EOF

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/huyuyang6688/article/...

12530
来自专栏Golang语言社区

GO语言标准库概览

在Go语言五周系列教程的最后一部分中,我们将带领大家一起来浏览一下Go语言丰富的标准库。 Go标准库包含了大量包,提供了丰富广泛的功能特性。这里提供了概览仅仅是...

391100
来自专栏LanceToBigData

struts2(二)之配置文件详解与结果视图

前言   前面介绍了struts2的一个程序的大概流程,还有它的配置文件。 一、struts.xml文件元素详解 1.1、package元素   1)作用   ...

20860
来自专栏WindCoder

Logstash6中grok插件的常用正则表达式

Logstash 内置了120种默认表达式,可以查看patterns,里面对表达式做了分组,每个文件为一组,文件内部有对应的表达式模式。下面只是部分常用的。

1.8K20
来自专栏逸鹏说道

C# 温故而知新:Stream篇(六)

BufferedStream 目录: 简单介绍一下BufferedStream 如何理解缓冲区? BufferedStream的优势 从BufferedStre...

34550
来自专栏DOTNET

.Net多线程编程—并发集合

并发集合 1 为什么使用并发集合? 原因主要有以下几点: System.Collections和System.Collections.Generic名称空间中所...

35370
来自专栏Golang语言社区

GO语言标准库概览

在Go语言五周系列教程的最后一部分中,我们将带领大家一起来浏览一下Go语言丰富的标准库。 Go标准库包含了大量包,提供了丰富广泛的功能特性。这里提供了概览仅仅是...

75960

扫码关注云+社区

领取腾讯云代金券