前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >通过Java程序提交通用Mapreduce任务并获取Job信息

通过Java程序提交通用Mapreduce任务并获取Job信息

作者头像
囚兔
发布2018-02-08 11:03:09
2.3K1
发布2018-02-08 11:03:09
举报
文章被收录于专栏:IT杂记IT杂记

背景

我们的一个业务须要有对MR任务的提交和状态跟踪的功能,须要通过Java代码提交一个通用的MR任务(包括mr的jar、配置文件、依赖的第三方jar包),并且须要在提交后跟踪其状态和历史,所以在提交后程序须要拿到改提交的MR的JobID。

解决方案

首先 可以想到的是通过  ${HADOOP_HOME}/bin/hadoop jar 命令来提交,并从命令执行的标准输出中取到jobID,这样确实可以解决,但是这样做有几个问题:

  1. 须要通过Java程序去启动一个子进程,并且要监控它的标准输出,性能不会很好,有点重;
  2. 从标准输出的文本中去解析jobId,性能也不会很好,如果hadoop升级标准输出格式放生变化,可能会导致解析不到,解析jobId准确性不易保证;
  3. 当提交执行过程出现异常,任然须要依赖标准输出的文本去解析,原因同2;

由于上述原因决定放弃这个方案。

另外 一个方案就是采用Java程序直接提交的方式,这种提交方式也有几个问题:

  1. 用户的jar包配置文件怎么加载,如果执行任务主类, 如果解决Hadoop的依赖?用户的jar包和我们平台程序在一个java程序中是否会有冲突或其他问题?
  2. 执行提交后如何获取到JobID? 用户代码中的org.apache.hadoop.mapreduce.Job(以下简称Job)对象是其业务代码中的一个局部变量,执行提交操作后,其JobID存在于其对象内部,外部代码无妨访问到该Job对象,也就无法获取到JobID。如果通过hadoop的rest api去获取呢,yarn rest api确实可以获取所有的任务的相关信息,但是在外部程序不知道这个Job的任务信息的情况下,又怎么能准确的对应上呢?

我们来解决上述两个问题:

  1. 我们可以创建一个独立的Classloader来加载用户的jar包和配置文件(hadoop的依赖是否也由这个这个Classloader来加载稍后再讨论),每次提交一个新的MR任务就创建一个新的类加载器来加载用户MR相关的资源(jar、配置文件、等),并通过反射的方式调用其任务的主类(可以通过Manifest指定或由用户指定)的main方法,这样用户的MR相关的资源就和平台程序隔离开了,不会对平台程序产生影响,并且每个提交的任务之间也是隔离的,不会相互影响;
Classloader模型
Classloader模型
  1. 设想下如果Job类中有个public static的属性(field),比如名称为currentJob,其类型为Job,当有新的Job对象创建时,就将该Job对象赋值给currentJob属性,这样我们在能访问到MR任务类加载器的地方就能获取到这个currentJob属性值,也就是当前创建的Job对象,通过调用其getXXX方法就能获取到其相关的各种信息,包括JobID,状态等。这里可以通过修改Job类的字节码,去动态的注入一个这样的属性currentJob和赋值的操作。Job类在hadoop框架的jar包内,要想拦截到Job类的字节码并在类加载的时候对其修改,hadoop的依赖也就须要一个ClassLoader来加载,并在其加载的loadClass方法中去获取并修改Job类的字节码;currentJob属性是一个全局的静态属性,所以hadoop的依赖也由用户MR ClassLoader来加载,这样每个任务都有只属于自己的Job类,防止多个不同的任务提交Job类中currentJob覆盖的问题;

实现

Classloader

通过继承 java.net.URLClassLoader 类实现MR job的类加载器。类加载器有个规则:A类中如果引用了B类,则加载B类的时候会通过A类的类加载器来加载,那么MR类加载器加载MR的主类时,其引用到的所有类也都会通过MR类加载器来加载。

public class MapReduceClassLoader extends URLClassLoader {

    private static Logger LOG = LoggerFactory.getLogger(MapReduceClassLoader.class);

    public MapReduceClassLoader() {
        super(new URL[]{});
    }

    public MapReduceClassLoader(URL[] urls, ClassLoader parent) {
        super(urls, parent);
    }

    public synchronized Class<?> loadClass(String name) throws ClassNotFoundException {

        // 判断类是否已被加载,如果已加载则直接返回
        Class c = this.findLoadedClass(name);
        if (c != null) {
            return c;
        }

        ClassNotFoundException ex = null;

        // 如果待加载的类为Job,则进行字节码转换后再加载类
        if (name.equals("org.apache.hadoop.mapreduce.Job")) {
            byte[] bytes = transformJobBytecode(name);
            if (bytes == null) {
                ex = new ClassNotFoundException("Transform job bytecode failed.");
            } else {
                c = defineClass(name, bytes, 0, bytes.length);
            }
        }

        if (c == null) {
            // 如果待加载的是JDK提供的系统类,则由父类加载器去完成,这里的父类加载器是sun.misc.Launcher.AppClassLoader
            if (ClassPathUtils.isSystemClass(name)) {
                try {
                    c = this.getParent().loadClass(name);
                } catch (ClassNotFoundException e) {
                    ex = e;
                }
            }

            // 当前类加载器来进行加载
            if (c == null) {
                try {
                    c = findClass(name);
                } catch (Exception e) {
                    ex = new ClassNotFoundException(e.getMessage());
                }
            }

            // 当前类加载器加载不到,尝试由父类加载器来完成
            if (c == null && this.getParent() != null) {
                try {
                    c = this.getParent().loadClass(name);
                } catch (ClassNotFoundException e) {
                    ex = e;
                }
            }
        }

        if (c == null) {
            throw ex;
        } else {
            LOG.info("loaded " + c + " from " + c.getClassLoader());
            return c;
        }
    }

    /**
     * 添加由该类加载的classpath
     * @param classPath
     */
    public void addClassPath(String classPath) {
        URL[] cpUrls = ClassPathUtils.getClassPathURLs(classPath);
        for (URL cpUrl : cpUrls) {
            addURL(cpUrl);
        }
    }

    /**
     * 转换Job类的字节码
     * @param jobClassName
     * @return
     */
    private byte[] transformJobBytecode(String jobClassName) {
        String path = jobClassName.replace('.', '/').concat(".class");
        InputStream is = getResourceAsStream(path);
        if (is == null) {
            return null;
        }

        try {
            byte[] b = getBytes(is);

            ClassReader cr = new ClassReader(b);
            ClassWriter cw = new ClassWriter(cr, 0);
            cr.accept(new JobAdapter(cw), 0);
            return cw.toByteArray();
        } catch (IOException e) {
        }

        return null;
    }

    /**
     * 从流对象中过去字节码
     * @param is
     * @return
     * @throws IOException
     */
    private byte[] getBytes(InputStream is) throws IOException {
        try {
            int available = is.available();
            byte[] bytes = new byte[available];
            int pos = 0;

            byte[] buf = new byte[1024];
            int len;
            while ((len = is.read(buf)) != -1) {
                System.arraycopy(buf, 0, bytes, pos, len);
                pos += len;
                if (pos >= available) {
                    break;
                }
            }

            return bytes;
        } finally {
            if (is != null) {
                try {
                    is.close();
                } catch (IOException e) {
                }
            }
        }
    }
}

ASM

通过ASM来实现对Job类的字节码的修改,ASM的使用可以参考文档:asm4-guide

实现的ClassVisitor代码如下:

public class JobAdapter extends ClassVisitor {

    public static final String JOB_FIELD_NAME = "currentJob";
    public static final String JOB_FIELD_DESC = "Lorg/apache/hadoop/mapreduce/Job;";

    private String owner;
    private boolean isInterface;

    public JobAdapter(ClassVisitor cv) {
        super(ASM5, cv);
    }

    @Override
    public void visit(int version, int access, String name, String signature,
                      String superName, String[] interfaces) {
        super.visit(version, access, name, signature, superName, interfaces);
        owner = name;
        isInterface = (access & ACC_INTERFACE) == 1;
    }

    @Override
    public MethodVisitor visitMethod(int access, String name, String desc, String signature, String[] exceptions) {
        MethodVisitor mv = super.visitMethod(access, name, desc, signature, exceptions);

        if (isInterface || mv == null) {
            return mv;
        }

        // 选取Job(JobConf conf)类构造方法注入赋值当前对象给currentJob属性的操作
        if (access == 0
                && name.equals("<init>")
                && desc.equals("(Lorg/apache/hadoop/mapred/JobConf;)V")) {
            return new JobMethodAdapter(mv);
        }

        return mv;
    }

    @Override
    public void visitEnd() {
        if (!isInterface) {
            // 添加全局静态属性currentJob
            FieldVisitor fv = super.visitField(
                    (ACC_PUBLIC | ACC_STATIC),
                    JOB_FIELD_NAME,
                    JOB_FIELD_DESC,
                    null, null);
            if (fv != null) {
                fv.visitEnd();
            }
        }
        super.visitEnd();
    }

    // 类构造方法修改类
    class JobMethodAdapter extends MethodVisitor {

        public JobMethodAdapter(MethodVisitor mv) {
            super(ASM5, mv);
        }

        @Override
        public void visitInsn(int opcode) {
            // 在构造方法返回之前注入赋值操作
            if (opcode >= IRETURN && opcode <= RETURN) {
                super.visitVarInsn(ALOAD, 0);
                super.visitFieldInsn(PUTSTATIC, owner, JOB_FIELD_NAME, JOB_FIELD_DESC);
            }
            super.visitInsn(opcode);
        }

        @Override
        public void visitMaxs(int maxStack, int maxLocals) {
            super.visitMaxs(maxStack + 1, maxLocals);
        }
    }
}

测试

下面是测试代码

arg[0] 为hadoop依赖(包括hadoop配置文件,jar等)+ MR主类jar包 + MR配置文件等 Classpath

arg[1] 为MR依赖的第三方Jar包classpath

public static void main(String[] args) {

        String classPath = args[0];
        String libJars = args[1];

        try {
            MapReduceClassLoader cl = new MapReduceClassLoader();
            cl.addClassPath(classPath);
            cl.addClassPath(libJars);

            System.out.println("URLS:" + Arrays.toString(cl.getURLs()));

            Thread.currentThread().setContextClassLoader(cl);

            // 加载MR主类
            Class mainClass = cl.loadClass("xxx.Main");

            Method mainMethod = mainClass.getMethod("main", new Class[] { String[].class });

            String libJarsParam = ClassPathUtils.getClassPathWithDotSep(libJars);

            // 设置MR依赖的Jar包
            String[] firstParams = new String[] { "-libjars", libJarsParam };
            Object[] params = new Object[] {firstParams};

            // 调用主类main方法
            mainMethod.invoke(null, params);

            // 测试获取Job的全局属性currentJob的值
            Class jobClass = cl.loadClass("org.apache.hadoop.mapreduce.Job");
            System.out.println(jobClass.getClassLoader());
            Field field = jobClass.getField(JobAdapter.JOB_FIELD_NAME);
            System.out.println(field.get(null));
            
        } catch (Exception e) {
            e.printStackTrace();
        }
        
    }

源码

https://gitee.com/hsp8712/mr_submitor

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 背景
  • 解决方案
  • 实现
    • Classloader
      • ASM
        • 测试
          • 源码
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档