前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊PowerJob的ServerDeployContainerRequest

聊聊PowerJob的ServerDeployContainerRequest

作者头像
code4it
发布2024-02-21 15:47:33
810
发布2024-02-21 15:47:33
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下PowerJob的ServerDeployContainerRequest

ServerDeployContainerRequest

tech/powerjob/common/request/ServerDeployContainerRequest.java

代码语言:javascript
复制
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServerDeployContainerRequest implements PowerSerializable {

    /**
     * 容器ID
     */
    private Long containerId;
    /**
     * 容器名称
     */
    private String containerName;
    /**
     * 文件名(MD5值),用于做版本校验和文件下载
     */
    private String version;
    /**
     * 下载地址
     */
    private String downloadURL;
}

ServerDeployContainerRequest定义了containerId、containerName、version、downloadURL属性

onReceiveServerDeployContainerRequest

tech/powerjob/worker/actors/WorkerActor.java

代码语言:javascript
复制
    @Handler(path = WORKER_HANDLER_DEPLOY_CONTAINER)
    public void onReceiveServerDeployContainerRequest(ServerDeployContainerRequest request) {
        OmsContainerFactory.deployContainer(request);
    }

WorkerActor的onReceiveServerDeployContainerRequest用于处理ServerDeployContainerRequest,它委托给了OmsContainerFactory.deployContainer

deployContainer

tech/powerjob/worker/container/OmsContainerFactory.java

代码语言:javascript
复制
    public static synchronized void deployContainer(ServerDeployContainerRequest request) {

        Long containerId = request.getContainerId();
        String containerName = request.getContainerName();
        String version = request.getVersion();

        log.info("[OmsContainer-{}] start to deploy container(name={},version={},downloadUrl={})", containerId, containerName, version, request.getDownloadURL());

        OmsContainer oldContainer = CARGO.get(containerId);
        if (oldContainer != null && version.equals(oldContainer.getVersion())) {
            log.info("[OmsContainer-{}] version={} already deployed, so skip this deploy task.", containerId, version);
            return;
        }

        String filePath = CONTAINER_DIR + containerId + "/" + version + ".jar";
        // 下载Container到本地
        File jarFile = new File(filePath);

        try {
            if (!jarFile.exists()) {
                FileUtils.forceMkdirParent(jarFile);
                FileUtils.copyURLToFile(new URL(request.getDownloadURL()), jarFile, 5000, 300000);
                log.info("[OmsContainer-{}] download jar successfully, path={}", containerId, jarFile.getPath());
            }

            // 创建新容器
            OmsContainer newContainer = new OmsJarContainer(containerId, containerName, version, jarFile);
            newContainer.init();

            // 替换容器
            CARGO.put(containerId, newContainer);
            log.info("[OmsContainer-{}] deployed new version:{} successfully!", containerId, version);

            if (oldContainer != null) {
                // 销毁旧容器
                oldContainer.destroy();
            }

        } catch (Exception e) {
            log.error("[OmsContainer-{}] deployContainer(name={},version={}) failed.", containerId, containerName, version, e);
            // 如果部署失败,则删除该 jar(本次失败可能是下载jar出错导致,不删除会导致这个版本永久无法重新部署)
            CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(jarFile));
        }
    }

deployContainer方法先找到旧的OmsContainer,然后判断version是否一样,一样就不用重新部署,否则先从本地查找jar包,找不到则根据downloadURL去下载,然后创建OmsJarContainer,执行其init方法,若存在旧的OmsContainer则执行其destroy方法

OmsContainer

tech/powerjob/worker/container/OmsContainer.java

代码语言:javascript
复制
public interface OmsContainer extends LifeCycle {

    /**
     * 获取处理器
     * @param className 全限定类名
     * @return 处理器(可以是 MR、BD等处理器)
     */
    BasicProcessor getProcessor(String className);

    /**
     * 获取容器的类加载器
     * @return 类加载器
     */
    OhMyClassLoader getContainerClassLoader();

    Long getContainerId();
    Long getDeployedTime();
    String getName();
    String getVersion();

    /**
     * 尝试释放容器资源
     */
    void tryRelease();
}

OmsContainer接口定义了getProcessor、getContainerClassLoader、getContainerId、getDeployedTime、getName、getVersion、tryRelease方法

OmsJarContainer

tech/powerjob/worker/container/OmsJarContainer.java

代码语言:javascript
复制
@Slf4j
public class OmsJarContainer implements OmsContainer {

    private final Long containerId;
    private final String name;
    private final String version;
    private final File localJarFile;
    private final Long deployedTime;

    // 引用计数器
    private final AtomicInteger referenceCount = new AtomicInteger(0);

    private OhMyClassLoader containerClassLoader;
    private ClassPathXmlApplicationContext container;

    private final Map<String, BasicProcessor> processorCache = Maps.newConcurrentMap();

    public OmsJarContainer(Long containerId, String name, String version, File localJarFile) {
        this.containerId = containerId;
        this.name = name;
        this.version = version;
        this.localJarFile = localJarFile;
        this.deployedTime = System.currentTimeMillis();
    }

    //......
}    

OmsJarContainer实现了OmsContainer接口

getProcessor

代码语言:javascript
复制
    public BasicProcessor getProcessor(String className) {

        BasicProcessor basicProcessor = processorCache.computeIfAbsent(className, ignore -> {
            Class<?> targetClass;
            try {
                targetClass = containerClassLoader.loadClass(className);
            } catch (ClassNotFoundException cnf) {
                log.error("[OmsJarContainer-{}] can't find class: {} in container.", containerId, className);
                return null;
            }

            // 先尝试从 Spring IOC 容器加载
            try {
                return (BasicProcessor) container.getBean(targetClass);
            } catch (BeansException be) {
                log.warn("[OmsJarContainer-{}] load instance from spring container failed, try to build instance directly.", containerId);
            } catch (ClassCastException cce) {
                log.error("[OmsJarContainer-{}] {} should implements the Processor interface!", containerId, className);
                return null;
            } catch (Exception e) {
                log.error("[OmsJarContainer-{}] get bean failed for {}.", containerId, className, e);
                return null;
            }

            // 直接实例化
            try {
                Object obj = targetClass.getDeclaredConstructor().newInstance();
                return (BasicProcessor) obj;
            } catch (Exception e) {
                log.error("[OmsJarContainer-{}] load {} failed", containerId, className, e);
            }
            return null;
        });

        if (basicProcessor != null) {
            // 引用计数 + 1
            referenceCount.getAndIncrement();
        }
        return basicProcessor;
    }

getProcessor方法会先通过containerClassLoader.loadClass去加载对应的processor类,加载不到则直接返回,之后根据targetClass去spring容器查找,若查找不到则直接通过targetClass.getDeclaredConstructor().newInstance()尝试实例化

init

代码语言:javascript
复制
    public void init() throws Exception {

        log.info("[OmsJarContainer-{}] start to init container(name={},jarPath={})", containerId, name, localJarFile.getPath());

        URL jarURL = localJarFile.toURI().toURL();

        // 创建类加载器(父类加载为 Worker 的类加载)
        this.containerClassLoader = new OhMyClassLoader(new URL[]{jarURL}, this.getClass().getClassLoader());

        // 解析 Properties
        Properties properties = new Properties();
        try (InputStream propertiesURLStream = containerClassLoader.getResourceAsStream(ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME)) {

            if (propertiesURLStream == null) {
                log.error("[OmsJarContainer-{}] can't find {} in jar {}.", containerId, ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME, localJarFile.getPath());
                throw new PowerJobException("invalid jar file because of no " + ContainerConstant.CONTAINER_PROPERTIES_FILE_NAME);
            }

            properties.load(propertiesURLStream);
            log.info("[OmsJarContainer-{}] load container properties successfully: {}", containerId, properties);
        }
        String packageName = properties.getProperty(ContainerConstant.CONTAINER_PACKAGE_NAME_KEY);
        if (StringUtils.isEmpty(packageName)) {
            log.error("[OmsJarContainer-{}] get package name failed, developer should't modify the properties file!", containerId);
            throw new PowerJobException("invalid jar file");
        }

        // 加载用户类
        containerClassLoader.load(packageName);

        // 创建 Spring IOC 容器(Spring配置文件需要填相对路径)
        // 需要切换线程上下文类加载器以加载 JDBC 类驱动(SPI)
        ClassLoader oldCL = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(containerClassLoader);
        try {
            this.container = new ClassPathXmlApplicationContext(new String[]{ContainerConstant.SPRING_CONTEXT_FILE_NAME}, false);
            this.container.setClassLoader(containerClassLoader);
            this.container.refresh();
        }finally {
            Thread.currentThread().setContextClassLoader(oldCL);
        }

        log.info("[OmsJarContainer-{}] init container(name={},jarPath={}) successfully", containerId, name, localJarFile.getPath());
    }

init方法根据jar包地址创建OhMyClassLoader,然后先解析oms-worker-container.properties,执行properties.load(propertiesURLStream),接着获取配置的packageName,执行containerClassLoader.load(packageName)加载类,然后根据oms-worker-container-spring-context.xml创建spring的ClassPathXmlApplicationContext,设置其classLoader,执行其refresh方法

destroy

代码语言:javascript
复制
    public void destroy() throws Exception {

        // 没有其余引用时,才允许执行 destroy
        if (referenceCount.get() <= 0) {
            try {
                if (localJarFile.exists()) {
                    FileUtils.forceDelete(localJarFile);
                }
            }catch (Exception e) {
                log.warn("[OmsJarContainer-{}] delete jarFile({}) failed.", containerId, localJarFile.getPath(), e);
            }
            try {
                processorCache.clear();
                container.close();
                containerClassLoader.close();
                log.info("[OmsJarContainer-{}] container destroyed successfully", containerId);
            }catch (Exception e) {
                log.error("[OmsJarContainer-{}] container destroyed failed", containerId, e);
            }
            return;
        }

        log.warn("[OmsJarContainer-{}] container's reference count is {}, won't destroy now!", containerId, referenceCount.get());
    }

destroy方法在referenceCount小于等于0时会先删除localJarFile,然后执行processorCache.clear()、ClassPathXmlApplicationContext的close、OhMyClassLoader的close

JarContainerProcessorFactory

tech/powerjob/worker/processor/impl/JarContainerProcessorFactory.java

代码语言:javascript
复制
@Slf4j
public class JarContainerProcessorFactory implements ProcessorFactory {

    private final WorkerRuntime workerRuntime;

    public JarContainerProcessorFactory(WorkerRuntime workerRuntime) {
        this.workerRuntime = workerRuntime;
    }

    @Override
    public Set<String> supportTypes() {
        return Sets.newHashSet(ProcessorType.EXTERNAL.name());
    }

    @Override
    public ProcessorBean build(ProcessorDefinition processorDefinition) {

        String processorInfo = processorDefinition.getProcessorInfo();
        String[] split = processorInfo.split("#");
        String containerName = split[0];
        String className = split[1];

        log.info("[ProcessorFactory] try to load processor({}) in container({})", className, containerName);

        OmsContainer omsContainer = OmsContainerFactory.fetchContainer(Long.valueOf(containerName), workerRuntime);
        if (omsContainer != null) {
            return new ProcessorBean()
                    .setProcessor(omsContainer.getProcessor(className))
                    .setClassLoader(omsContainer.getContainerClassLoader());
        } else {
            log.warn("[ProcessorFactory] load container failed. processor info : {}", processorInfo);
        }
        return null;
    }
}

JarContainerProcessorFactory的build方法它根据#来解析出containerId及className,然后通过OmsContainerFactory.fetchContainer去查找容器,然后通过omsContainer.getProcessor(className)获取对应的processor;JarContainerProcessorFactory的supportTypes为EXTERNAL(外部处理器(动态加载))

小结

WorkerActor的onReceiveServerDeployContainerRequest用于处理ServerDeployContainerRequest,它委托给了OmsContainerFactory.deployContainer;deployContainer方法先找到旧的OmsContainer,然后判断version是否一样,一样就不用重新部署,否则先从本地查找jar包,找不到则根据downloadURL去下载,然后创建OmsJarContainer,执行其init方法,若存在旧的OmsContainer则执行其destroy方法;init方法根据jar包地址创建OhMyClassLoader,创建ClassPathXmlApplicationContext,设置其classLoader,执行其refresh方法;destroy方法在referenceCount小于等于0时会先删除localJarFile,然后执行processorCache.clear()、ClassPathXmlApplicationContext的close、OhMyClassLoader的close。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2024-02-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ServerDeployContainerRequest
  • onReceiveServerDeployContainerRequest
  • deployContainer
  • OmsContainer
  • OmsJarContainer
    • getProcessor
      • init
        • destroy
        • JarContainerProcessorFactory
        • 小结
        相关产品与服务
        容器服务
        腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档