前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊PowerJob的JobController

聊聊PowerJob的JobController

原创
作者头像
code4it
发布2024-01-28 17:28:00
800
发布2024-01-28 17:28:00
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下PowerJob的JobController

JobController

tech/powerjob/server/web/controller/JobController.java

代码语言:javascript
复制
@Slf4j
@RestController
@RequestMapping("/job")
public class JobController {

    @Resource
    private JobService jobService;
    @Resource
    private JobInfoRepository jobInfoRepository;

    @PostMapping("/save")
    public ResultDTO<Void> saveJobInfo(@RequestBody SaveJobInfoRequest request) {
        jobService.saveJob(request);
        return ResultDTO.success(null);
    }

    @PostMapping("/copy")
    public ResultDTO<JobInfoVO> copyJob(String jobId) {
        return ResultDTO.success(JobInfoVO.from(jobService.copyJob(Long.valueOf(jobId))));
    }

    @GetMapping("/export")
    public ResultDTO<SaveJobInfoRequest> exportJob(String jobId) {
        return ResultDTO.success(jobService.exportJob(Long.valueOf(jobId)));
    }

    @GetMapping("/disable")
    public ResultDTO<Void> disableJob(String jobId) {
        jobService.disableJob(Long.valueOf(jobId));
        return ResultDTO.success(null);
    }

    @GetMapping("/delete")
    public ResultDTO<Void> deleteJob(String jobId) {
        jobService.deleteJob(Long.valueOf(jobId));
        return ResultDTO.success(null);
    }

    @GetMapping("/run")
    public ResultDTO<Long> runImmediately(String appId, String jobId, @RequestParam(required = false) String instanceParams) {
        return ResultDTO.success(jobService.runJob(Long.valueOf(appId), Long.valueOf(jobId), instanceParams, 0L));
    }

    @PostMapping("/list")
    public ResultDTO<PageResult<JobInfoVO>> listJobs(@RequestBody QueryJobInfoRequest request) {

        Sort sort = Sort.by(Sort.Direction.ASC, "id");
        PageRequest pageRequest = PageRequest.of(request.getIndex(), request.getPageSize(), sort);
        Page<JobInfoDO> jobInfoPage;

        // 无查询条件,查询全部
        if (request.getJobId() == null && StringUtils.isEmpty(request.getKeyword())) {
            jobInfoPage = jobInfoRepository.findByAppIdAndStatusNot(request.getAppId(), SwitchableStatus.DELETED.getV(), pageRequest);
            return ResultDTO.success(convertPage(jobInfoPage));
        }

        // 有 jobId,直接精确查询
        if (request.getJobId() != null) {

            Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(request.getJobId());

            PageResult<JobInfoVO> result = new PageResult<>();

            if (!jobInfoOpt.isPresent()) {
                result.setTotalPages(0);
                result.setTotalItems(0);
                result.setData(Lists.newLinkedList());
                return ResultDTO.success(result);
            }

            if (!jobInfoOpt.get().getAppId().equals(request.getAppId())){
                return ResultDTO.failed("请输入该app下的jobId");
            }

            result.setIndex(0);
            result.setPageSize(request.getPageSize());

            result.setTotalItems(1);
            result.setTotalPages(1);
            result.setData(Lists.newArrayList(JobInfoVO.from(jobInfoOpt.get())));

            return ResultDTO.success(result);
        }

        // 模糊查询
        String condition = "%" + request.getKeyword() + "%";
        jobInfoPage = jobInfoRepository.findByAppIdAndJobNameLikeAndStatusNot(request.getAppId(), condition, SwitchableStatus.DELETED.getV(), pageRequest);
        return ResultDTO.success(convertPage(jobInfoPage));
    }


    private static PageResult<JobInfoVO> convertPage(Page<JobInfoDO> jobInfoPage) {
        List<JobInfoVO> jobInfoVOList = jobInfoPage.getContent().stream().map(JobInfoVO::from).collect(Collectors.toList());

        PageResult<JobInfoVO> pageResult = new PageResult<>(jobInfoPage);
        pageResult.setData(jobInfoVOList);
        return pageResult;
    }

}

JobController提供了save、copy、export、disable、delete、run、list方法;除了list外其他均委托给了jobService

JobService

tech/powerjob/server/core/service/JobService.java

代码语言:javascript
复制
public interface JobService {

    Long saveJob(SaveJobInfoRequest request);

    JobInfoDO copyJob(Long jobId);

    JobInfoDTO fetchJob(Long jobId);

    List<JobInfoDTO> fetchAllJob(Long appId);

    List<JobInfoDTO> queryJob(PowerQuery powerQuery);

    long runJob(Long appId, Long jobId, String instanceParams, Long delay);

    void deleteJob(Long jobId);

    void disableJob(Long jobId);

    void enableJob(Long jobId);

    SaveJobInfoRequest exportJob(Long jobId);
}

JobService定义了saveJob、copyJob、fetchJob、fetchAllJob、queryJob、runJob、deleteJob、disableJob、enableJob、exportJob方法

JobServiceImpl

tech/powerjob/server/core/service/impl/job/JobServiceImpl.java

代码语言:javascript
复制
@Slf4j
@Service
@RequiredArgsConstructor
public class JobServiceImpl implements JobService {

    private final InstanceService instanceService;

    private final DispatchService dispatchService;

    private final JobInfoRepository jobInfoRepository;

    private final InstanceInfoRepository instanceInfoRepository;

    private final TimingStrategyService timingStrategyService;

    //......
}    

JobServiceImpl实现了JobService接口

save

代码语言:javascript
复制
    @Override
    public Long saveJob(SaveJobInfoRequest request) {

        request.valid();

        JobInfoDO jobInfoDO;
        if (request.getId() != null) {
            jobInfoDO = jobInfoRepository.findById(request.getId()).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + request.getId()));
        } else {
            jobInfoDO = new JobInfoDO();
        }

        // 值拷贝
        BeanUtils.copyProperties(request, jobInfoDO);

        // 拷贝枚举值
        jobInfoDO.setExecuteType(request.getExecuteType().getV());
        jobInfoDO.setProcessorType(request.getProcessorType().getV());
        jobInfoDO.setTimeExpressionType(request.getTimeExpressionType().getV());
        jobInfoDO.setStatus(request.isEnable() ? SwitchableStatus.ENABLE.getV() : SwitchableStatus.DISABLE.getV());
        jobInfoDO.setDispatchStrategy(request.getDispatchStrategy().getV());

        // 填充默认值,非空保护防止 NPE
        fillDefaultValue(jobInfoDO);

        // 转化报警用户列表
        if (request.getNotifyUserIds() != null) {
            if (request.getNotifyUserIds().size() == 0) {
                jobInfoDO.setNotifyUserIds(null);
            } else {
                jobInfoDO.setNotifyUserIds(SJ.COMMA_JOINER.join(request.getNotifyUserIds()));
            }
        }
        LifeCycle lifecycle = Optional.ofNullable(request.getLifeCycle()).orElse(LifeCycle.EMPTY_LIFE_CYCLE);
        jobInfoDO.setLifecycle(JSON.toJSONString(lifecycle));
        // 检查定时策略
        timingStrategyService.validate(request.getTimeExpressionType(), request.getTimeExpression(), lifecycle.getStart(), lifecycle.getEnd());
        calculateNextTriggerTime(jobInfoDO);
        if (request.getId() == null) {
            jobInfoDO.setGmtCreate(new Date());
        }
        // 检查告警配置
        if (request.getAlarmConfig() != null) {
            AlarmConfig config = request.getAlarmConfig();
            if (config.getStatisticWindowLen() == null || config.getAlertThreshold() == null || config.getSilenceWindowLen() == null) {
                throw new PowerJobException("illegal alarm config!");
            }
            jobInfoDO.setAlarmConfig(JSON.toJSONString(request.getAlarmConfig()));
        }
        // 日志配置
        if (request.getLogConfig() != null) {
            jobInfoDO.setLogConfig(JSONObject.toJSONString(request.getLogConfig()));
        }
        JobInfoDO res = jobInfoRepository.saveAndFlush(jobInfoDO);
        return res.getId();
    }

save先执行request.valid()进行参数校验,通过timingStrategyService.validate校验定时策略,设置告警配置、日志配置、最后通过jobInfoRepository.saveAndFlush保存或者更新

copyJob

代码语言:javascript
复制
    public JobInfoDO copyJob(Long jobId) {

        JobInfoDO origin = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId: " + jobId));
        if (origin.getStatus() == SwitchableStatus.DELETED.getV()) {
            throw new IllegalStateException("can't copy the job which has been deleted!");
        }
        JobInfoDO copyJob = new JobInfoDO();
        // 值拷贝
        BeanUtils.copyProperties(origin, copyJob);
        // 填充默认值,理论上应该不需要
        fillDefaultValue(copyJob);
        // 修正创建时间以及更新时间
        copyJob.setId(null);
        copyJob.setJobName(copyJob.getJobName() + "_COPY");
        copyJob.setGmtCreate(new Date());
        copyJob.setGmtModified(new Date());

        copyJob = jobInfoRepository.saveAndFlush(copyJob);
        return copyJob;

    }

copyJob先找到源JobInfoDO,然后拷贝一份新的JobInfoDO,重置id、gmtCreate、gmtModified、设置jobName为源jobName_COPY,最后jobInfoRepository.saveAndFlush保存

runJob

代码语言:javascript
复制
    @DesignateServer
    public long runJob(Long appId, Long jobId, String instanceParams, Long delay) {

        delay = delay == null ? 0 : delay;
        JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by id:" + jobId));

        log.info("[Job-{}] try to run job in app[{}], instanceParams={},delay={} ms.", jobInfo.getId(), appId, instanceParams, delay);
        final InstanceInfoDO instanceInfo = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), instanceParams, null, System.currentTimeMillis() + Math.max(delay, 0));
        instanceInfoRepository.flush();
        if (delay <= 0) {
            dispatchService.dispatch(jobInfo, instanceInfo.getInstanceId(), Optional.of(instanceInfo),Optional.empty());
        } else {
            InstanceTimeWheelService.schedule(instanceInfo.getInstanceId(), delay, () -> dispatchService.dispatch(jobInfo, instanceInfo.getInstanceId(), Optional.empty(),Optional.empty()));
        }
        log.info("[Job-{}|{}] execute 'runJob' successfully, params={}", jobInfo.getId(), instanceInfo.getInstanceId(), instanceParams);
        return instanceInfo.getInstanceId();
    }

runJob方法添加了@DesignateServer注解,会根据appId查找AppInfoDO,在AppInfoDO.getCurrentServer机器上执行,若该值为空则在本机执行;它通过instanceService.create创建InstanceInfoDO,若delay小于等于0则执行dispatchService.dispatch,否则通过InstanceTimeWheelService.schedule进行调度

deleteJob

代码语言:javascript
复制
    public void deleteJob(Long jobId) {
        shutdownOrStopJob(jobId, SwitchableStatus.DELETED);
    }

deleteJob执行shutdownOrStopJob,SwitchableStatus参数为SwitchableStatus.DELETED

disableJob

代码语言:javascript
复制
    public void disableJob(Long jobId) {
        shutdownOrStopJob(jobId, SwitchableStatus.DISABLE);
    }

disableJob执行shutdownOrStopJob,SwitchableStatus参数为SwitchableStatus.DISABLE

enableJob

代码语言:javascript
复制
    public void enableJob(Long jobId) {
        JobInfoDO jobInfoDO = jobInfoRepository.findById(jobId).orElseThrow(() -> new IllegalArgumentException("can't find job by jobId:" + jobId));

        jobInfoDO.setStatus(SwitchableStatus.ENABLE.getV());
        calculateNextTriggerTime(jobInfoDO);

        jobInfoRepository.saveAndFlush(jobInfoDO);
    }

enableJob更新jobInfoDO的status为SwitchableStatus.ENABLE,然后执行calculateNextTriggerTime(jobInfoDO),最后jobInfoRepository.saveAndFlush保存

exportJob

代码语言:javascript
复制
    public SaveJobInfoRequest exportJob(Long jobId) {
        Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
        if (!jobInfoOpt.isPresent()) {
            throw new IllegalArgumentException("can't find job by jobId: " + jobId);
        }
        final JobInfoDO jobInfoDO = jobInfoOpt.get();
        final SaveJobInfoRequest saveJobInfoRequest = JobConverter.convertJobInfoDO2SaveJobInfoRequest(jobInfoDO);
        saveJobInfoRequest.setId(null);
        saveJobInfoRequest.setJobName(saveJobInfoRequest.getJobName() + "_EXPORT_" + System.currentTimeMillis());
        log.info("[Job-{}] [exportJob] jobInfoDO: {}, saveJobInfoRequest: {}", jobId, JsonUtils.toJSONString(jobInfoDO), JsonUtils.toJSONString(saveJobInfoRequest));
        return saveJobInfoRequest;
    }

exportJob创建SaveJobInfoRequest

shutdownOrStopJob

代码语言:javascript
复制
    private void shutdownOrStopJob(Long jobId, SwitchableStatus status) {

        // 1. 先更新 job_info 表
        Optional<JobInfoDO> jobInfoOPT = jobInfoRepository.findById(jobId);
        if (!jobInfoOPT.isPresent()) {
            throw new IllegalArgumentException("can't find job by jobId:" + jobId);
        }
        JobInfoDO jobInfoDO = jobInfoOPT.get();
        jobInfoDO.setStatus(status.getV());
        jobInfoDO.setGmtModified(new Date());
        jobInfoRepository.saveAndFlush(jobInfoDO);

        // 2. 关闭秒级任务
        if (!TimeExpressionType.FREQUENT_TYPES.contains(jobInfoDO.getTimeExpressionType())) {
            return;
        }
        List<InstanceInfoDO> executeLogs = instanceInfoRepository.findByJobIdAndStatusIn(jobId, InstanceStatus.GENERALIZED_RUNNING_STATUS);
        if (CollectionUtils.isEmpty(executeLogs)) {
            return;
        }
        if (executeLogs.size() > 1) {
            log.warn("[Job-{}] frequent job should just have one running instance, there must have some bug.", jobId);
        }
        executeLogs.forEach(instance -> {
            try {
                // 重复查询了数据库,不过问题不大,这个调用量很小
                instanceService.stopInstance(instance.getAppId(), instance.getInstanceId());
            } catch (Exception ignore) {
                // ignore exception
            }
        });
    }

shutdownOrStopJob先更新JobInfoDO的status,然后对于秒级任务还要通过instanceService.stopInstance停止正在运行的任务实例

小结

JobController提供了save、copy、export、disable、delete、run、list方法;除了list外其他均委托给了jobService;save先执行request.valid()进行参数校验,通过timingStrategyService.validate校验定时策略,设置告警配置、日志配置、最后通过jobInfoRepository.saveAndFlush保存或者更新;deleteJob执行shutdownOrStopJob,SwitchableStatus参数为SwitchableStatus.DELETED;disableJob执行shutdownOrStopJob,SwitchableStatus参数为SwitchableStatus.DISABLE;enableJob更新jobInfoDO的status为SwitchableStatus.ENABLE,然后执行calculateNextTriggerTime(jobInfoDO),最后jobInfoRepository.saveAndFlush保存。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • JobController
  • JobService
  • JobServiceImpl
    • save
      • copyJob
        • runJob
          • deleteJob
            • disableJob
              • enableJob
                • exportJob
                  • shutdownOrStopJob
                  • 小结
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档