前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊PowerJob的ContainerController

聊聊PowerJob的ContainerController

作者头像
code4it
发布2024-01-31 12:55:13
810
发布2024-01-31 12:55:13
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下PowerJob的ContainerController

ContainerController

tech/powerjob/server/web/controller/ContainerController.java

代码语言:javascript
复制
@Slf4j
@RestController
@RequestMapping("/container")
public class ContainerController {


    private final int port;

    private final ContainerService containerService;

    private final AppInfoRepository appInfoRepository;

    private final ContainerInfoRepository containerInfoRepository;

    public ContainerController(@Value("${server.port}") int port, ContainerService containerService, AppInfoRepository appInfoRepository, ContainerInfoRepository containerInfoRepository) {
        this.port = port;
        this.containerService = containerService;
        this.appInfoRepository = appInfoRepository;
        this.containerInfoRepository = containerInfoRepository;
    }

    @GetMapping("/downloadJar")
    public void downloadJar(String version, HttpServletResponse response) throws IOException {
        File file = containerService.fetchContainerJarFile(version);
        if (file.exists()) {
            OmsFileUtils.file2HttpResponse(file, response);
        } else {
            log.error("[Container] can't find container by version[{}], please deploy first!", version);
        }
    }

    @PostMapping("/downloadContainerTemplate")
    public void downloadContainerTemplate(@RequestBody GenerateContainerTemplateRequest req, HttpServletResponse response) throws IOException {
        File zipFile = ContainerTemplateGenerator.generate(req.getGroup(), req.getArtifact(), req.getName(), req.getPackageName(), req.getJavaVersion());
        OmsFileUtils.file2HttpResponse(zipFile, response);
    }

    @PostMapping("/jarUpload")
    public ResultDTO<String> fileUpload(@RequestParam("file") MultipartFile file) throws Exception {
        if (file == null || file.isEmpty()) {
            return ResultDTO.failed("empty file");
        }
        return ResultDTO.success(containerService.uploadContainerJarFile(file));
    }

    @PostMapping("/save")
    public ResultDTO<Void> saveContainer(@RequestBody SaveContainerInfoRequest request) {
        request.valid();

        ContainerInfoDO container = new ContainerInfoDO();
        BeanUtils.copyProperties(request, container);
        container.setSourceType(request.getSourceType().getV());
        container.setStatus(request.getStatus().getV());

        containerService.save(container);
        return ResultDTO.success(null);
    }

    @GetMapping("/delete")
    public ResultDTO<Void> deleteContainer(Long appId, Long containerId) {
        containerService.delete(appId, containerId);
        return ResultDTO.success(null);
    }

    @GetMapping("/list")
    public ResultDTO<List<ContainerInfoVO>> listContainers(Long appId) {
        List<ContainerInfoVO> res = containerInfoRepository.findByAppIdAndStatusNot(appId, SwitchableStatus.DELETED.getV())
                .stream().map(ContainerController::convert).collect(Collectors.toList());
        return ResultDTO.success(res);
    }

    @GetMapping("/listDeployedWorker")
    public ResultDTO<String> listDeployedWorker(Long appId, Long containerId, HttpServletResponse response) {
        AppInfoDO appInfoDO = appInfoRepository.findById(appId).orElseThrow(() -> new IllegalArgumentException("can't find app by id:" + appId));
        String targetServer = appInfoDO.getCurrentServer();

        if (StringUtils.isEmpty(targetServer)) {
            return ResultDTO.failed("No workers have even registered!");
        }

        return ResultDTO.success(containerService.fetchDeployedInfo(appId, containerId));
    }

    private static ContainerInfoVO convert(ContainerInfoDO containerInfoDO) {
        ContainerInfoVO vo = new ContainerInfoVO();
        BeanUtils.copyProperties(containerInfoDO, vo);
        if (containerInfoDO.getLastDeployTime() == null) {
            vo.setLastDeployTime("N/A");
        }else {
            vo.setLastDeployTime(DateFormatUtils.format(containerInfoDO.getLastDeployTime(), OmsConstant.TIME_PATTERN));
        }
        SwitchableStatus status = SwitchableStatus.of(containerInfoDO.getStatus());
        vo.setStatus(status.name());
        ContainerSourceType sourceType = ContainerSourceType.of(containerInfoDO.getSourceType());
        vo.setSourceType(sourceType.name());
        return vo;
    }
}

ContainerController提供了downloadJar、downloadContainerTemplate、jarUpload、save、delete、list、listDeployedWorker接口,其中downloadJar、jarUpload、save、delete委托给了containerService

ContainerService

tech/powerjob/server/core/container/ContainerService.java

代码语言:javascript
复制
@Slf4j
@Service
public class ContainerService {

    @Resource
    private Environment environment;
    @Resource
    private LockService lockService;
    @Resource
    private ContainerInfoRepository containerInfoRepository;
    @Resource
    private GridFsManager gridFsManager;
    @Resource
    private TransportService transportService;

    @Resource
    private WorkerClusterQueryService workerClusterQueryService;

    // 下载用的分段锁
    private final SegmentLock segmentLock = new SegmentLock(4);
    // 并发部署的机器数量
    private static final int DEPLOY_BATCH_NUM = 50;
    // 部署间隔
    private static final long DEPLOY_MIN_INTERVAL = 10 * 60 * 1000L;
    // 最长部署时间
    private static final long DEPLOY_MAX_COST_TIME = 10 * 60 * 1000L;

    //......
}    

ContainerService提供了save、delete、uploadContainerJarFile、fetchContainerJarFile、deploy、fetchDeployedInfo方法

save

代码语言:javascript
复制
    public void save(ContainerInfoDO container) {
        Long originId = container.getId();
        if (originId != null) {
            // just validate
            containerInfoRepository.findById(originId).orElseThrow(() -> new IllegalArgumentException("can't find container by id: " + originId));
        } else {
            container.setGmtCreate(new Date());
        }
        container.setGmtModified(new Date());

        // 文件上传形式的 sourceInfo 为该文件的 md5 值,Git形式的 md5 在部署阶段生成
        if (container.getSourceType() == ContainerSourceType.FatJar.getV()) {
            container.setVersion(container.getSourceInfo());
        }else {
            container.setVersion("init");
        }
        containerInfoRepository.saveAndFlush(container);
    }

save方法通过containerInfoRepository.saveAndFlush进行保存

delete

代码语言:javascript
复制
    public void delete(Long appId, Long containerId) {
        ContainerInfoDO container = containerInfoRepository.findById(containerId).orElseThrow(() -> new IllegalArgumentException("can't find container by id: " + containerId));

        if (!Objects.equals(appId, container.getAppId())) {
            throw new RuntimeException("Permission Denied!");
        }

        ServerDestroyContainerRequest destroyRequest = new ServerDestroyContainerRequest(container.getId());
        workerClusterQueryService.getAllAliveWorkers(container.getAppId()).forEach(workerInfo -> {
            final URL url = ServerURLFactory.destroyContainer2Worker(workerInfo.getAddress());
            transportService.tell(workerInfo.getProtocol(), url, destroyRequest);
        });

        log.info("[ContainerService] delete container: {}.", container);
        // 软删除
        container.setStatus(SwitchableStatus.DELETED.getV());
        container.setGmtModified(new Date());
        containerInfoRepository.saveAndFlush(container);
    }

delete方法构造ServerDestroyContainerRequest请求,然后遍历workerClusterQueryService.getAllAliveWorkers,挨个发送destroyRequest,最后更新container状态为DELETED,保存到数据库

uploadContainerJarFile

代码语言:javascript
复制
    public String uploadContainerJarFile(MultipartFile file) throws IOException {

        String workerDirStr = OmsFileUtils.genTemporaryWorkPath();
        String tmpFileStr = workerDirStr + "tmp.jar";

        File workerDir = new File(workerDirStr);
        File tmpFile = new File(tmpFileStr);

        try {
            // 下载到本地
            FileUtils.forceMkdirParent(tmpFile);
            file.transferTo(tmpFile);

            // 生成MD5,这兄弟耗时有点小严重
            String md5 = OmsFileUtils.md5(tmpFile);
            String fileName = genContainerJarName(md5);

            // 上传到 mongoDB,这兄弟耗时也有点小严重,导致这个接口整体比较慢...不过也没必要开线程去处理
            gridFsManager.store(tmpFile, GridFsManager.CONTAINER_BUCKET, fileName);

            // 将文件拷贝到正确的路径
            String finalFileStr = OmsFileUtils.genContainerJarPath() + fileName;
            File finalFile = new File(finalFileStr);
            if (finalFile.exists()) {
                FileUtils.forceDelete(finalFile);
            }
            FileUtils.moveFile(tmpFile, finalFile);

            return md5;

        }finally {
            CommonUtils.executeIgnoreException(() -> FileUtils.forceDelete(workerDir));
        }
    }

uploadContainerJarFile先将MultipartFile保存到本地,然后通过gridFsManager.store保存到gridFs,然后将文件移动到指定目录,最后删除临时目录

fetchContainerJarFile

代码语言:javascript
复制
    public File fetchContainerJarFile(String version) {

        String fileName = genContainerJarName(version);
        String filePath = OmsFileUtils.genContainerJarPath() + fileName;
        File localFile = new File(filePath);

        if (localFile.exists()) {
            return localFile;
        }
        if (gridFsManager.available()) {
            downloadJarFromGridFS(fileName, localFile);
        }
        return localFile;
    }

fetchContainerJarFile根据版本生成文件名,然后去本地查找,找到则返回,找不到则判断gridFsManager是否可用,可用则downloadJarFromGridFS

deploy

代码语言:javascript
复制
    public void deploy(Long containerId, Session session) throws Exception {

        String deployLock = "containerDeployLock-" + containerId;
        RemoteEndpoint.Async remote = session.getAsyncRemote();
        // 最长部署时间:10分钟
        boolean lock = lockService.tryLock(deployLock, DEPLOY_MAX_COST_TIME);
        if (!lock) {
            remote.sendText("SYSTEM: acquire deploy lock failed, maybe other user is deploying, please wait until the running deploy task finished.");
            return;
        }

        try {
            Optional<ContainerInfoDO> containerInfoOpt = containerInfoRepository.findById(containerId);
            if (!containerInfoOpt.isPresent()) {
                remote.sendText("SYSTEM: can't find container by id: " + containerId);
                return;
            }
            ContainerInfoDO container = containerInfoOpt.get();

            Date lastDeployTime = container.getLastDeployTime();
            if (lastDeployTime != null) {
                if ((System.currentTimeMillis() - lastDeployTime.getTime()) < DEPLOY_MIN_INTERVAL) {
                    remote.sendText("SYSTEM: [warn] deploy too frequent, last deploy time is: " + DateFormatUtils.format(lastDeployTime, OmsConstant.TIME_PATTERN));
                }
            }

            // 准备文件
            File jarFile = prepareJarFile(container, session);
            if (jarFile == null) {
                return;
            }

            double sizeMB = 1.0 * jarFile.length() / FileUtils.ONE_MB;
            remote.sendText(String.format("SYSTEM: the jarFile(size=%fMB) is prepared and ready to be deployed to the worker.", sizeMB));

            // 修改数据库,更新 MD5和最新部署时间
            Date now = new Date();
            container.setGmtModified(now);
            container.setLastDeployTime(now);
            containerInfoRepository.saveAndFlush(container);
            remote.sendText(String.format("SYSTEM: update current container version=%s successfully!", container.getVersion()));

            // 开始部署(需要分批进行)
            final List<WorkerInfo> allAliveWorkers = workerClusterQueryService.getAllAliveWorkers(container.getAppId());
            if (allAliveWorkers.isEmpty()) {
                remote.sendText("SYSTEM: there is no worker available now, deploy failed!");
                return;
            }

            String port = environment.getProperty("local.server.port");
            String downloadURL = String.format("http://%s:%s/container/downloadJar?version=%s", NetUtils.getLocalHost(), port, container.getVersion());
            ServerDeployContainerRequest req = new ServerDeployContainerRequest(containerId, container.getContainerName(), container.getVersion(), downloadURL);
            long sleepTime = calculateSleepTime(jarFile.length());

            AtomicInteger count = new AtomicInteger();
            allAliveWorkers.forEach(workerInfo -> {

                final URL url = ServerURLFactory.deployContainer2Worker(workerInfo.getAddress());
                transportService.tell(workerInfo.getProtocol(), url, req);

                remote.sendText("SYSTEM: send deploy request to " + url.getAddress());

                if (count.incrementAndGet() % DEPLOY_BATCH_NUM == 0) {
                    CommonUtils.executeIgnoreException(() -> Thread.sleep(sleepTime));
                }
            });

            remote.sendText("SYSTEM: deploy finished, congratulations!");

        }finally {
            lockService.unlock(deployLock);
        }
    }

deploy先加锁,然后通过prepareJarFile准备jar,更新container的最近部署时间,然后通过workerClusterQueryService.getAllAliveWorkers(container.getAppId())获取存活的workers,遍历挨个执行ServerDeployContainerRequest,最后释放锁

fetchDeployedInfo

代码语言:javascript
复制
    @DesignateServer
    public String fetchDeployedInfo(Long appId, Long containerId) {
        List<DeployedContainerInfo> infoList = workerClusterQueryService.getDeployedContainerInfos(appId, containerId);

        Set<String> aliveWorkers = workerClusterQueryService.getAllAliveWorkers(appId)
                .stream()
                .map(WorkerInfo::getAddress)
                .collect(Collectors.toSet());

        Set<String> deployedList = Sets.newLinkedHashSet();
        List<String> unDeployedList = Lists.newLinkedList();
        Multimap<String, String> version2Address = ArrayListMultimap.create();
        infoList.forEach(info -> {
            String targetWorkerAddress = info.getWorkerAddress();
            if (aliveWorkers.contains(targetWorkerAddress)) {
                deployedList.add(targetWorkerAddress);
                version2Address.put(info.getVersion(), targetWorkerAddress);
            }else {
                unDeployedList.add(targetWorkerAddress);
            }
        });

        StringBuilder sb = new StringBuilder("========== DeployedInfo ==========").append(System.lineSeparator());
        // 集群分裂,各worker版本不统一,问题很大
        if (version2Address.keySet().size() > 1) {
            sb.append("WARN: there exists multi version container now, please redeploy to fix this problem").append(System.lineSeparator());
            sb.append("divisive version ==> ").append(System.lineSeparator());
            version2Address.forEach((v, addressList) -> {
                sb.append("version: ").append(v).append(System.lineSeparator());
                sb.append(addressList);
            });
            sb.append(System.lineSeparator());
        }
        // 当前在线未部署机器
        if (!CollectionUtils.isEmpty(unDeployedList)) {
            sb.append("WARN: there exists unDeployed worker(OhMyScheduler will auto fix when some job need to process)").append(System.lineSeparator());
            sb.append("unDeployed worker list ==> ").append(System.lineSeparator());
        }
        // 当前部署机器
        sb.append("deployed worker list ==> ").append(System.lineSeparator());
        if (CollectionUtils.isEmpty(deployedList)) {
            sb.append("no worker deployed now~");
        }else {
            sb.append(deployedList);
        }

        return sb.toString();
    }

fetchDeployedInfo先通过workerClusterQueryService.getDeployedContainerInfos获取DeployedContainerInfo,再通过workerClusterQueryService.getAllAliveWorkers(appId)获取aliveWorkers,最后遍历DeployedContainerInfo,挨个判断是否已经部署,最后构建DeployedInfo输出

prepareJarFile

代码语言:javascript
复制
    private File prepareJarFile(ContainerInfoDO container, Session session) throws Exception {

        RemoteEndpoint.Async remote = session.getAsyncRemote();
        // 获取Jar,Git需要先 clone成Jar计算MD5,JarFile则直接下载
        ContainerSourceType sourceType = ContainerSourceType.of(container.getSourceType());
        if (sourceType == ContainerSourceType.Git) {

            String workerDirStr = OmsFileUtils.genTemporaryWorkPath();
            File workerDir = new File(workerDirStr);
            FileUtils.forceMkdir(workerDir);

            try {
                // git clone
                remote.sendText("SYSTEM: start to git clone the code repo, using config: " + container.getSourceInfo());
                GitRepoInfo gitRepoInfo = JsonUtils.parseObject(container.getSourceInfo(), GitRepoInfo.class);

                CloneCommand cloneCommand = Git.cloneRepository()
                        .setDirectory(workerDir)
                        .setURI(gitRepoInfo.getRepo())
                        .setBranch(gitRepoInfo.getBranch());
                if (!StringUtils.isEmpty(gitRepoInfo.getUsername())) {
                    CredentialsProvider credentialsProvider = new UsernamePasswordCredentialsProvider(gitRepoInfo.getUsername(), gitRepoInfo.getPassword());
                    cloneCommand.setCredentialsProvider(credentialsProvider);
                }
                cloneCommand.call();

                // 获取最新的 commitId 作为版本
                String oldVersion = container.getVersion();
                try (Repository repository = Git.open(workerDir).getRepository()) {
                    Ref head = repository.getRefDatabase().findRef("HEAD");
                    container.setVersion(head.getObjectId().getName());
                }

                if (container.getVersion().equals(oldVersion)) {
                    remote.sendText(String.format("SYSTEM: this commitId(%s) is the same as the last.", oldVersion));
                }else {
                    remote.sendText(String.format("SYSTEM: new version detected, from %s to %s.", oldVersion, container.getVersion()));
                }
                remote.sendText("SYSTEM: git clone successfully, star to compile the project.");

                // mvn clean package -DskipTests -U
                Invoker mvnInvoker = new DefaultInvoker();
                InvocationRequest ivkReq = new DefaultInvocationRequest();
                // -U:强制让Maven检查所有SNAPSHOT依赖更新,确保集成基于最新的状态
                // -e:如果构建出现异常,该参数能让Maven打印完整的stack trace
                // -B:让Maven使用批处理模式构建项目,能够避免一些需要人工参与交互而造成的挂起状态
                ivkReq.setGoals(Lists.newArrayList("clean", "package", "-DskipTests", "-U", "-e", "-B"));
                ivkReq.setBaseDirectory(workerDir);
                ivkReq.setOutputHandler(remote::sendText);
                ivkReq.setBatchMode(true);

                mvnInvoker.execute(ivkReq);

                String targetDirStr = workerDirStr + "/target";
                File targetDir = new File(targetDirStr);
                IOFileFilter fileFilter = FileFilterUtils.asFileFilter((dir, name) -> name.endsWith("jar-with-dependencies.jar"));
                Collection<File> jarFile = FileUtils.listFiles(targetDir, fileFilter, null);

                if (CollectionUtils.isEmpty(jarFile)) {
                    remote.sendText("SYSTEM: can't find packaged jar(maybe maven build failed), so deploy failed.");
                    return null;
                }

                File jarWithDependency = jarFile.iterator().next();

                String jarFileName = genContainerJarName(container.getVersion());

                if (!gridFsManager.exists(GridFsManager.CONTAINER_BUCKET, jarFileName)) {
                    remote.sendText("SYSTEM: can't find the jar resource in remote, maybe this is a new version, start to upload new version.");
                    gridFsManager.store(jarWithDependency, GridFsManager.CONTAINER_BUCKET, jarFileName);
                    remote.sendText("SYSTEM: upload to GridFS successfully~");
                }else {
                    remote.sendText("SYSTEM: find the jar resource in remote successfully, so it's no need to upload anymore.");
                }

                // 将文件从临时工作目录移动到正式目录
                String localFileStr = OmsFileUtils.genContainerJarPath() + jarFileName;
                File localFile = new File(localFileStr);
                if (localFile.exists()) {
                    FileUtils.forceDelete(localFile);
                }
                FileUtils.copyFile(jarWithDependency, localFile);

                return localFile;
            } catch (Throwable  t) {
                log.error("[ContainerService] prepareJarFile failed for container: {}", container, t);
                remote.sendText("SYSTEM: [ERROR] prepare jar file failed: " + ExceptionUtils.getStackTrace(t));
            } finally {
                // 删除工作区数据
                FileUtils.forceDelete(workerDir);
            }
        }

        // 先查询本地是否存在目标 Jar 文件
        String jarFileName = genContainerJarName(container.getVersion());
        String localFileStr = OmsFileUtils.genContainerJarPath() + jarFileName;
        File localFile = new File(localFileStr);
        if (localFile.exists()) {
            remote.sendText("SYSTEM: find the jar file in local disk.");
            return localFile;
        }

        // 从 MongoDB 下载
        remote.sendText(String.format("SYSTEM: try to find the jarFile(%s) in GridFS", jarFileName));
        downloadJarFromGridFS(jarFileName, localFile);
        remote.sendText("SYSTEM: download jar file from GridFS successfully~");
        return localFile;
    }

prepareJarFile针对git来源的先执行clone,然后执行mvn打包,最后找出jar-with-dependencies.jar结尾的文件;非git来源的则先判断本地是否目标jar,没有的话,则从gridFs下载

小结

ContainerController提供了downloadJar、downloadContainerTemplate、jarUpload、save、delete、list、listDeployedWorker接口,其中downloadJar、jarUpload、save、delete委托给了containerService。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2024-01-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 码匠的流水账 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ContainerController
  • ContainerService
    • save
      • delete
        • uploadContainerJarFile
          • fetchContainerJarFile
            • deploy
              • fetchDeployedInfo
                • prepareJarFile
                • 小结
                相关产品与服务
                云数据库 MongoDB
                腾讯云数据库 MongoDB(TencentDB for MongoDB)是腾讯云基于全球广受欢迎的 MongoDB 打造的高性能 NoSQL 数据库,100%完全兼容 MongoDB 协议,支持跨文档事务,提供稳定丰富的监控管理,弹性可扩展、自动容灾,适用于文档型数据库场景,您无需自建灾备体系及控制管理系统。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档