通过Java程序提交通用Mapreduce任务并获取Job信息

背景

我们的一个业务须要有对MR任务的提交和状态跟踪的功能,须要通过Java代码提交一个通用的MR任务(包括mr的jar、配置文件、依赖的第三方jar包),并且须要在提交后跟踪其状态和历史,所以在提交后程序须要拿到改提交的MR的JobID。

解决方案

首先 可以想到的是通过  ${HADOOP_HOME}/bin/hadoop jar 命令来提交,并从命令执行的标准输出中取到jobID,这样确实可以解决,但是这样做有几个问题:

  1. 须要通过Java程序去启动一个子进程,并且要监控它的标准输出,性能不会很好,有点重;
  2. 从标准输出的文本中去解析jobId,性能也不会很好,如果hadoop升级标准输出格式放生变化,可能会导致解析不到,解析jobId准确性不易保证;
  3. 当提交执行过程出现异常,任然须要依赖标准输出的文本去解析,原因同2;

由于上述原因决定放弃这个方案。

另外 一个方案就是采用Java程序直接提交的方式,这种提交方式也有几个问题:

  1. 用户的jar包配置文件怎么加载,如果执行任务主类, 如果解决Hadoop的依赖?用户的jar包和我们平台程序在一个java程序中是否会有冲突或其他问题?
  2. 执行提交后如何获取到JobID? 用户代码中的org.apache.hadoop.mapreduce.Job(以下简称Job)对象是其业务代码中的一个局部变量,执行提交操作后,其JobID存在于其对象内部,外部代码无妨访问到该Job对象,也就无法获取到JobID。如果通过hadoop的rest api去获取呢,yarn rest api确实可以获取所有的任务的相关信息,但是在外部程序不知道这个Job的任务信息的情况下,又怎么能准确的对应上呢?

我们来解决上述两个问题:

  1. 我们可以创建一个独立的Classloader来加载用户的jar包和配置文件(hadoop的依赖是否也由这个这个Classloader来加载稍后再讨论),每次提交一个新的MR任务就创建一个新的类加载器来加载用户MR相关的资源(jar、配置文件、等),并通过反射的方式调用其任务的主类(可以通过Manifest指定或由用户指定)的main方法,这样用户的MR相关的资源就和平台程序隔离开了,不会对平台程序产生影响,并且每个提交的任务之间也是隔离的,不会相互影响;
  1. 设想下如果Job类中有个public static的属性(field),比如名称为currentJob,其类型为Job,当有新的Job对象创建时,就将该Job对象赋值给currentJob属性,这样我们在能访问到MR任务类加载器的地方就能获取到这个currentJob属性值,也就是当前创建的Job对象,通过调用其getXXX方法就能获取到其相关的各种信息,包括JobID,状态等。这里可以通过修改Job类的字节码,去动态的注入一个这样的属性currentJob和赋值的操作。Job类在hadoop框架的jar包内,要想拦截到Job类的字节码并在类加载的时候对其修改,hadoop的依赖也就须要一个ClassLoader来加载,并在其加载的loadClass方法中去获取并修改Job类的字节码;currentJob属性是一个全局的静态属性,所以hadoop的依赖也由用户MR ClassLoader来加载,这样每个任务都有只属于自己的Job类,防止多个不同的任务提交Job类中currentJob覆盖的问题;

实现

Classloader

通过继承 java.net.URLClassLoader 类实现MR job的类加载器。类加载器有个规则:A类中如果引用了B类,则加载B类的时候会通过A类的类加载器来加载,那么MR类加载器加载MR的主类时,其引用到的所有类也都会通过MR类加载器来加载。

public class MapReduceClassLoader extends URLClassLoader {

    private static Logger LOG = LoggerFactory.getLogger(MapReduceClassLoader.class);

    public MapReduceClassLoader() {
        super(new URL[]{});
    }

    public MapReduceClassLoader(URL[] urls, ClassLoader parent) {
        super(urls, parent);
    }

    public synchronized Class<?> loadClass(String name) throws ClassNotFoundException {

        // 判断类是否已被加载,如果已加载则直接返回
        Class c = this.findLoadedClass(name);
        if (c != null) {
            return c;
        }

        ClassNotFoundException ex = null;

        // 如果待加载的类为Job,则进行字节码转换后再加载类
        if (name.equals("org.apache.hadoop.mapreduce.Job")) {
            byte[] bytes = transformJobBytecode(name);
            if (bytes == null) {
                ex = new ClassNotFoundException("Transform job bytecode failed.");
            } else {
                c = defineClass(name, bytes, 0, bytes.length);
            }
        }

        if (c == null) {
            // 如果待加载的是JDK提供的系统类,则由父类加载器去完成,这里的父类加载器是sun.misc.Launcher.AppClassLoader
            if (ClassPathUtils.isSystemClass(name)) {
                try {
                    c = this.getParent().loadClass(name);
                } catch (ClassNotFoundException e) {
                    ex = e;
                }
            }

            // 当前类加载器来进行加载
            if (c == null) {
                try {
                    c = findClass(name);
                } catch (Exception e) {
                    ex = new ClassNotFoundException(e.getMessage());
                }
            }

            // 当前类加载器加载不到,尝试由父类加载器来完成
            if (c == null && this.getParent() != null) {
                try {
                    c = this.getParent().loadClass(name);
                } catch (ClassNotFoundException e) {
                    ex = e;
                }
            }
        }

        if (c == null) {
            throw ex;
        } else {
            LOG.info("loaded " + c + " from " + c.getClassLoader());
            return c;
        }
    }

    /**
     * 添加由该类加载的classpath
     * @param classPath
     */
    public void addClassPath(String classPath) {
        URL[] cpUrls = ClassPathUtils.getClassPathURLs(classPath);
        for (URL cpUrl : cpUrls) {
            addURL(cpUrl);
        }
    }

    /**
     * 转换Job类的字节码
     * @param jobClassName
     * @return
     */
    private byte[] transformJobBytecode(String jobClassName) {
        String path = jobClassName.replace('.', '/').concat(".class");
        InputStream is = getResourceAsStream(path);
        if (is == null) {
            return null;
        }

        try {
            byte[] b = getBytes(is);

            ClassReader cr = new ClassReader(b);
            ClassWriter cw = new ClassWriter(cr, 0);
            cr.accept(new JobAdapter(cw), 0);
            return cw.toByteArray();
        } catch (IOException e) {
        }

        return null;
    }

    /**
     * 从流对象中过去字节码
     * @param is
     * @return
     * @throws IOException
     */
    private byte[] getBytes(InputStream is) throws IOException {
        try {
            int available = is.available();
            byte[] bytes = new byte[available];
            int pos = 0;

            byte[] buf = new byte[1024];
            int len;
            while ((len = is.read(buf)) != -1) {
                System.arraycopy(buf, 0, bytes, pos, len);
                pos += len;
                if (pos >= available) {
                    break;
                }
            }

            return bytes;
        } finally {
            if (is != null) {
                try {
                    is.close();
                } catch (IOException e) {
                }
            }
        }
    }
}

ASM

通过ASM来实现对Job类的字节码的修改,ASM的使用可以参考文档:asm4-guide

实现的ClassVisitor代码如下:

public class JobAdapter extends ClassVisitor {

    public static final String JOB_FIELD_NAME = "currentJob";
    public static final String JOB_FIELD_DESC = "Lorg/apache/hadoop/mapreduce/Job;";

    private String owner;
    private boolean isInterface;

    public JobAdapter(ClassVisitor cv) {
        super(ASM5, cv);
    }

    @Override
    public void visit(int version, int access, String name, String signature,
                      String superName, String[] interfaces) {
        super.visit(version, access, name, signature, superName, interfaces);
        owner = name;
        isInterface = (access & ACC_INTERFACE) == 1;
    }

    @Override
    public MethodVisitor visitMethod(int access, String name, String desc, String signature, String[] exceptions) {
        MethodVisitor mv = super.visitMethod(access, name, desc, signature, exceptions);

        if (isInterface || mv == null) {
            return mv;
        }

        // 选取Job(JobConf conf)类构造方法注入赋值当前对象给currentJob属性的操作
        if (access == 0
                && name.equals("<init>")
                && desc.equals("(Lorg/apache/hadoop/mapred/JobConf;)V")) {
            return new JobMethodAdapter(mv);
        }

        return mv;
    }

    @Override
    public void visitEnd() {
        if (!isInterface) {
            // 添加全局静态属性currentJob
            FieldVisitor fv = super.visitField(
                    (ACC_PUBLIC | ACC_STATIC),
                    JOB_FIELD_NAME,
                    JOB_FIELD_DESC,
                    null, null);
            if (fv != null) {
                fv.visitEnd();
            }
        }
        super.visitEnd();
    }

    // 类构造方法修改类
    class JobMethodAdapter extends MethodVisitor {

        public JobMethodAdapter(MethodVisitor mv) {
            super(ASM5, mv);
        }

        @Override
        public void visitInsn(int opcode) {
            // 在构造方法返回之前注入赋值操作
            if (opcode >= IRETURN && opcode <= RETURN) {
                super.visitVarInsn(ALOAD, 0);
                super.visitFieldInsn(PUTSTATIC, owner, JOB_FIELD_NAME, JOB_FIELD_DESC);
            }
            super.visitInsn(opcode);
        }

        @Override
        public void visitMaxs(int maxStack, int maxLocals) {
            super.visitMaxs(maxStack + 1, maxLocals);
        }
    }
}

测试

下面是测试代码

arg[0] 为hadoop依赖(包括hadoop配置文件,jar等)+ MR主类jar包 + MR配置文件等 Classpath

arg[1] 为MR依赖的第三方Jar包classpath

public static void main(String[] args) {

        String classPath = args[0];
        String libJars = args[1];

        try {
            MapReduceClassLoader cl = new MapReduceClassLoader();
            cl.addClassPath(classPath);
            cl.addClassPath(libJars);

            System.out.println("URLS:" + Arrays.toString(cl.getURLs()));

            Thread.currentThread().setContextClassLoader(cl);

            // 加载MR主类
            Class mainClass = cl.loadClass("xxx.Main");

            Method mainMethod = mainClass.getMethod("main", new Class[] { String[].class });

            String libJarsParam = ClassPathUtils.getClassPathWithDotSep(libJars);

            // 设置MR依赖的Jar包
            String[] firstParams = new String[] { "-libjars", libJarsParam };
            Object[] params = new Object[] {firstParams};

            // 调用主类main方法
            mainMethod.invoke(null, params);

            // 测试获取Job的全局属性currentJob的值
            Class jobClass = cl.loadClass("org.apache.hadoop.mapreduce.Job");
            System.out.println(jobClass.getClassLoader());
            Field field = jobClass.getField(JobAdapter.JOB_FIELD_NAME);
            System.out.println(field.get(null));
            
        } catch (Exception e) {
            e.printStackTrace();
        }
        
    }

源码

https://gitee.com/hsp8712/mr_submitor

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Charlie's Road

<Solidity学习系列四>使用编译器

Solidity存储库的一个构建目标是solc,solidity命令行编译器。 使用solc --help为您提供所有选项的解释。 编译器可以生成各种输出,范围...

1462
来自专栏IT杂记

通过Java程序提交通用Mapreduce无法回收类的问题

问题描述 上次发布的博客 通过Java程序提交通用Mapreduce,在实施过程中发现,每次提交一次Mapreduce任务,JVM无法回收过程中产生的MapRe...

3066
来自专栏Java 源码分析

NioEventLoopGroup 源码分析

NioEventLoopGroup 源码分析 1. 在阅读源码时做了一定的注释,并且做了一些测试分析源码内的执行流程,由于博客篇幅有限。为了方便 IDE 查看...

3406
来自专栏java 成神之路

URL 源码分析

37913
来自专栏進无尽的文章

简述OC语言

对于一门语言的学习是需要时间领悟的,而对于一些原理性的问题,我们需要清楚其核心思想,知其然而知其所以然,这样才能有利于自己的后续发展。本文只是简述,没有面面具到...

1792
来自专栏LhWorld哥陪你聊算法

【Linux篇】--awk的使用

awk是一个强大的文本分析工具。相对于grep的查找,sed的编辑,awk在其对数据分析并生成报告时,显得尤为强大。 简单来说awk就是把文件逐行的读入,(空格...

1382
来自专栏青玉伏案

iOS逆向工程之Hopper中的ARM指令

虽然前段时间ARM被日本软银收购了,但是科技是无国界的,所以呢ARM相关知识该学的学。现在看ARM指令集还是倍感亲切的,毕竟大学里开了ARM这门课,并且做了不少...

2997
来自专栏大内老A

WCF技术剖析之十八:消息契约(Message Contract)和基于消息契约的序列化

在本篇文章中,我们将讨论WCF四大契约(服务契约、数据契约、消息契约和错误契约)之一的消息契约(Message Contract)。服务契约关注于对服务操作的描...

3735
来自专栏编码小白

ofbiz中FreeMarkerWorker的makeConfiguration方法

            这个方法是说明了为什么在ftl中可以使用一些java方法             1.代码展示 public static Confi...

3687
来自专栏坚毅的PHP

my php & mysql FAQ

php中文字符串长度及定长截取问题使用str_len("中国") 结果为6,php系统默认一个中文字符长度为3,可改用mb_strlen函数获得长度,mb_su...

3796

扫码关注云+社区

领取腾讯云代金券