专栏首页IT杂记通过Java程序提交通用Mapreduce任务并获取Job信息

通过Java程序提交通用Mapreduce任务并获取Job信息

背景

我们的一个业务须要有对MR任务的提交和状态跟踪的功能,须要通过Java代码提交一个通用的MR任务(包括mr的jar、配置文件、依赖的第三方jar包),并且须要在提交后跟踪其状态和历史,所以在提交后程序须要拿到改提交的MR的JobID。

解决方案

首先 可以想到的是通过  ${HADOOP_HOME}/bin/hadoop jar 命令来提交,并从命令执行的标准输出中取到jobID,这样确实可以解决,但是这样做有几个问题:

  1. 须要通过Java程序去启动一个子进程,并且要监控它的标准输出,性能不会很好,有点重;
  2. 从标准输出的文本中去解析jobId,性能也不会很好,如果hadoop升级标准输出格式放生变化,可能会导致解析不到,解析jobId准确性不易保证;
  3. 当提交执行过程出现异常,任然须要依赖标准输出的文本去解析,原因同2;

由于上述原因决定放弃这个方案。

另外 一个方案就是采用Java程序直接提交的方式,这种提交方式也有几个问题:

  1. 用户的jar包配置文件怎么加载,如果执行任务主类, 如果解决Hadoop的依赖?用户的jar包和我们平台程序在一个java程序中是否会有冲突或其他问题?
  2. 执行提交后如何获取到JobID? 用户代码中的org.apache.hadoop.mapreduce.Job(以下简称Job)对象是其业务代码中的一个局部变量,执行提交操作后,其JobID存在于其对象内部,外部代码无妨访问到该Job对象,也就无法获取到JobID。如果通过hadoop的rest api去获取呢,yarn rest api确实可以获取所有的任务的相关信息,但是在外部程序不知道这个Job的任务信息的情况下,又怎么能准确的对应上呢?

我们来解决上述两个问题:

  1. 我们可以创建一个独立的Classloader来加载用户的jar包和配置文件(hadoop的依赖是否也由这个这个Classloader来加载稍后再讨论),每次提交一个新的MR任务就创建一个新的类加载器来加载用户MR相关的资源(jar、配置文件、等),并通过反射的方式调用其任务的主类(可以通过Manifest指定或由用户指定)的main方法,这样用户的MR相关的资源就和平台程序隔离开了,不会对平台程序产生影响,并且每个提交的任务之间也是隔离的,不会相互影响;
  1. 设想下如果Job类中有个public static的属性(field),比如名称为currentJob,其类型为Job,当有新的Job对象创建时,就将该Job对象赋值给currentJob属性,这样我们在能访问到MR任务类加载器的地方就能获取到这个currentJob属性值,也就是当前创建的Job对象,通过调用其getXXX方法就能获取到其相关的各种信息,包括JobID,状态等。这里可以通过修改Job类的字节码,去动态的注入一个这样的属性currentJob和赋值的操作。Job类在hadoop框架的jar包内,要想拦截到Job类的字节码并在类加载的时候对其修改,hadoop的依赖也就须要一个ClassLoader来加载,并在其加载的loadClass方法中去获取并修改Job类的字节码;currentJob属性是一个全局的静态属性,所以hadoop的依赖也由用户MR ClassLoader来加载,这样每个任务都有只属于自己的Job类,防止多个不同的任务提交Job类中currentJob覆盖的问题;

实现

Classloader

通过继承 java.net.URLClassLoader 类实现MR job的类加载器。类加载器有个规则:A类中如果引用了B类,则加载B类的时候会通过A类的类加载器来加载,那么MR类加载器加载MR的主类时,其引用到的所有类也都会通过MR类加载器来加载。

public class MapReduceClassLoader extends URLClassLoader {

    private static Logger LOG = LoggerFactory.getLogger(MapReduceClassLoader.class);

    public MapReduceClassLoader() {
        super(new URL[]{});
    }

    public MapReduceClassLoader(URL[] urls, ClassLoader parent) {
        super(urls, parent);
    }

    public synchronized Class<?> loadClass(String name) throws ClassNotFoundException {

        // 判断类是否已被加载,如果已加载则直接返回
        Class c = this.findLoadedClass(name);
        if (c != null) {
            return c;
        }

        ClassNotFoundException ex = null;

        // 如果待加载的类为Job,则进行字节码转换后再加载类
        if (name.equals("org.apache.hadoop.mapreduce.Job")) {
            byte[] bytes = transformJobBytecode(name);
            if (bytes == null) {
                ex = new ClassNotFoundException("Transform job bytecode failed.");
            } else {
                c = defineClass(name, bytes, 0, bytes.length);
            }
        }

        if (c == null) {
            // 如果待加载的是JDK提供的系统类,则由父类加载器去完成,这里的父类加载器是sun.misc.Launcher.AppClassLoader
            if (ClassPathUtils.isSystemClass(name)) {
                try {
                    c = this.getParent().loadClass(name);
                } catch (ClassNotFoundException e) {
                    ex = e;
                }
            }

            // 当前类加载器来进行加载
            if (c == null) {
                try {
                    c = findClass(name);
                } catch (Exception e) {
                    ex = new ClassNotFoundException(e.getMessage());
                }
            }

            // 当前类加载器加载不到,尝试由父类加载器来完成
            if (c == null && this.getParent() != null) {
                try {
                    c = this.getParent().loadClass(name);
                } catch (ClassNotFoundException e) {
                    ex = e;
                }
            }
        }

        if (c == null) {
            throw ex;
        } else {
            LOG.info("loaded " + c + " from " + c.getClassLoader());
            return c;
        }
    }

    /**
     * 添加由该类加载的classpath
     * @param classPath
     */
    public void addClassPath(String classPath) {
        URL[] cpUrls = ClassPathUtils.getClassPathURLs(classPath);
        for (URL cpUrl : cpUrls) {
            addURL(cpUrl);
        }
    }

    /**
     * 转换Job类的字节码
     * @param jobClassName
     * @return
     */
    private byte[] transformJobBytecode(String jobClassName) {
        String path = jobClassName.replace('.', '/').concat(".class");
        InputStream is = getResourceAsStream(path);
        if (is == null) {
            return null;
        }

        try {
            byte[] b = getBytes(is);

            ClassReader cr = new ClassReader(b);
            ClassWriter cw = new ClassWriter(cr, 0);
            cr.accept(new JobAdapter(cw), 0);
            return cw.toByteArray();
        } catch (IOException e) {
        }

        return null;
    }

    /**
     * 从流对象中过去字节码
     * @param is
     * @return
     * @throws IOException
     */
    private byte[] getBytes(InputStream is) throws IOException {
        try {
            int available = is.available();
            byte[] bytes = new byte[available];
            int pos = 0;

            byte[] buf = new byte[1024];
            int len;
            while ((len = is.read(buf)) != -1) {
                System.arraycopy(buf, 0, bytes, pos, len);
                pos += len;
                if (pos >= available) {
                    break;
                }
            }

            return bytes;
        } finally {
            if (is != null) {
                try {
                    is.close();
                } catch (IOException e) {
                }
            }
        }
    }
}

ASM

通过ASM来实现对Job类的字节码的修改,ASM的使用可以参考文档:asm4-guide

实现的ClassVisitor代码如下:

public class JobAdapter extends ClassVisitor {

    public static final String JOB_FIELD_NAME = "currentJob";
    public static final String JOB_FIELD_DESC = "Lorg/apache/hadoop/mapreduce/Job;";

    private String owner;
    private boolean isInterface;

    public JobAdapter(ClassVisitor cv) {
        super(ASM5, cv);
    }

    @Override
    public void visit(int version, int access, String name, String signature,
                      String superName, String[] interfaces) {
        super.visit(version, access, name, signature, superName, interfaces);
        owner = name;
        isInterface = (access & ACC_INTERFACE) == 1;
    }

    @Override
    public MethodVisitor visitMethod(int access, String name, String desc, String signature, String[] exceptions) {
        MethodVisitor mv = super.visitMethod(access, name, desc, signature, exceptions);

        if (isInterface || mv == null) {
            return mv;
        }

        // 选取Job(JobConf conf)类构造方法注入赋值当前对象给currentJob属性的操作
        if (access == 0
                && name.equals("<init>")
                && desc.equals("(Lorg/apache/hadoop/mapred/JobConf;)V")) {
            return new JobMethodAdapter(mv);
        }

        return mv;
    }

    @Override
    public void visitEnd() {
        if (!isInterface) {
            // 添加全局静态属性currentJob
            FieldVisitor fv = super.visitField(
                    (ACC_PUBLIC | ACC_STATIC),
                    JOB_FIELD_NAME,
                    JOB_FIELD_DESC,
                    null, null);
            if (fv != null) {
                fv.visitEnd();
            }
        }
        super.visitEnd();
    }

    // 类构造方法修改类
    class JobMethodAdapter extends MethodVisitor {

        public JobMethodAdapter(MethodVisitor mv) {
            super(ASM5, mv);
        }

        @Override
        public void visitInsn(int opcode) {
            // 在构造方法返回之前注入赋值操作
            if (opcode >= IRETURN && opcode <= RETURN) {
                super.visitVarInsn(ALOAD, 0);
                super.visitFieldInsn(PUTSTATIC, owner, JOB_FIELD_NAME, JOB_FIELD_DESC);
            }
            super.visitInsn(opcode);
        }

        @Override
        public void visitMaxs(int maxStack, int maxLocals) {
            super.visitMaxs(maxStack + 1, maxLocals);
        }
    }
}

测试

下面是测试代码

arg[0] 为hadoop依赖(包括hadoop配置文件,jar等)+ MR主类jar包 + MR配置文件等 Classpath

arg[1] 为MR依赖的第三方Jar包classpath

public static void main(String[] args) {

        String classPath = args[0];
        String libJars = args[1];

        try {
            MapReduceClassLoader cl = new MapReduceClassLoader();
            cl.addClassPath(classPath);
            cl.addClassPath(libJars);

            System.out.println("URLS:" + Arrays.toString(cl.getURLs()));

            Thread.currentThread().setContextClassLoader(cl);

            // 加载MR主类
            Class mainClass = cl.loadClass("xxx.Main");

            Method mainMethod = mainClass.getMethod("main", new Class[] { String[].class });

            String libJarsParam = ClassPathUtils.getClassPathWithDotSep(libJars);

            // 设置MR依赖的Jar包
            String[] firstParams = new String[] { "-libjars", libJarsParam };
            Object[] params = new Object[] {firstParams};

            // 调用主类main方法
            mainMethod.invoke(null, params);

            // 测试获取Job的全局属性currentJob的值
            Class jobClass = cl.loadClass("org.apache.hadoop.mapreduce.Job");
            System.out.println(jobClass.getClassLoader());
            Field field = jobClass.getField(JobAdapter.JOB_FIELD_NAME);
            System.out.println(field.get(null));
            
        } catch (Exception e) {
            e.printStackTrace();
        }
        
    }

源码

https://gitee.com/hsp8712/mr_submitor

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 使用Kafka的High Level Consumer

    ##为什么使用High Level Consumer 在某些应用场景,我们希望通过多线程读取消息,而我们并不关心从Kafka消费消息的顺序,我们仅仅关心数据能...

    囚兔
  • 小顶堆Java实现

    参考文章: 漫谈经典排序算法:一、从简单选择排序到堆排序的深度解析 http://blog.csdn.net/touch_2011/article/detai...

    囚兔
  • 关于JVM CPU资源占用过高的问题排查

    一、背景:     先执行一个java程序里面开了两个线程分别都在while循环做打印操作。 # java -cp ./test-threads.jar...

    囚兔
  • 《Kotlin 极简教程 》第4章 基本数据类型与类型系统

    到目前为止,我们已经了解了Kotlin的基本符号以及基础语法。我们可以看出,使用Kotlin写的代码更简洁、可读性更好、更富有生产力。

    一个会写诗的程序员
  • .Net中的反射(反射特性) - Part.3

    可能很多人还不了解特性,所以我们先了解一下什么是特性。想想看如果有一个消息系统,它存在这样一个方法,用来将一则短消息发送给某人:

    张子阳
  • 设计模式之行为型模式

    将一个请求封装成一个对象 ,从而使我们可用不同请求对客户进行参数化 :对请求排队或记录请求日志 ,以及支持可撤销的操作 .也叫: 动作Action模式 ,事务t...

    时间静止不是简史
  • Python 字符串操作方法大全

    python字符串操作实方法大合集,包括了几乎所有常用的python字符串操作,如字符串的替换、删除、截取、复制、连接、比较、查找、分割等,需要的朋友可以参考下

    py3study
  • CRM WebClient UI的浏览器打印实现

    WebClient UI上自带了一个打印按钮,按Ctrl + P后可以生成一个新的页面供打印。

    Jerry Wang
  • Python爬虫扩展库BeautifulSoup4用法精要

    BeautifulSoup是一个非常优秀的Python扩展库,可以用来从HTML或XML文件中提取我们感兴趣的数据,并且允许指定使用不同的解析器。由于beaut...

    Python小屋屋主
  • Head First设计模式——生成器模式和责任链模式

    现又一个度假计划指定,需要指定度假每一天的活动、旅馆、用餐、门票等等事情,但是每个客人的度假计划可能不太一样。例如天数、活动类型、用餐等等。

    SpringSun

扫码关注云+社区

领取腾讯云代金券