前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊PowerJob的InstanceController

聊聊PowerJob的InstanceController

原创
作者头像
code4it
发布2024-01-30 09:17:27
960
发布2024-01-30 09:17:27
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下PowerJob的InstanceController

InstanceController

tech/powerjob/server/web/controller/InstanceController.java

代码语言:javascript
复制
@Slf4j
@RestController
@RequestMapping("/instance")
public class InstanceController {



    @Resource
    private InstanceService instanceService;
    @Resource
    private InstanceLogService instanceLogService;

    @Resource
    private CacheService cacheService;
    @Resource
    private InstanceInfoRepository instanceInfoRepository;

    @GetMapping("/stop")
    public ResultDTO<Void> stopInstance(Long appId,Long instanceId) {
        instanceService.stopInstance(appId,instanceId);
        return ResultDTO.success(null);
    }

    @GetMapping("/retry")
    public ResultDTO<Void> retryInstance(String appId, Long instanceId) {
        instanceService.retryInstance(Long.valueOf(appId), instanceId);
        return ResultDTO.success(null);
    }

    @GetMapping("/detail")
    public ResultDTO<InstanceDetailVO> getInstanceDetail(Long instanceId) {
        return ResultDTO.success(InstanceDetailVO.from(instanceService.getInstanceDetail(instanceId)));
    }

    @GetMapping("/log")
    public ResultDTO<StringPage> getInstanceLog(Long appId, Long instanceId, Long index) {
        return ResultDTO.success(instanceLogService.fetchInstanceLog(appId, instanceId, index));
    }

    @GetMapping("/downloadLogUrl")
    public ResultDTO<String> getDownloadUrl(Long appId, Long instanceId) {
        return ResultDTO.success(instanceLogService.fetchDownloadUrl(appId, instanceId));
    }

    @GetMapping("/downloadLog")
    public void downloadLogFile(Long instanceId , HttpServletResponse response) throws Exception {

        File file = instanceLogService.downloadInstanceLog(instanceId);
        OmsFileUtils.file2HttpResponse(file, response);
    }

    @GetMapping("/downloadLog4Console")
    @SneakyThrows
    public void downloadLog4Console(Long appId, Long instanceId , HttpServletResponse response) {
        // 获取内部下载链接
        String downloadUrl = instanceLogService.fetchDownloadUrl(appId, instanceId);
        // 先下载到本机
        String logFilePath = OmsFileUtils.genTemporaryWorkPath() + String.format("powerjob-%s-%s.log", appId, instanceId);
        File logFile = new File(logFilePath);

        try {
            FileUtils.copyURLToFile(new URL(downloadUrl), logFile);

            // 再推送到浏览器
            OmsFileUtils.file2HttpResponse(logFile, response);
        } finally {
            FileUtils.forceDelete(logFile);
        }
    }

    @PostMapping("/list")
    public ResultDTO<PageResult<InstanceInfoVO>> list(@RequestBody QueryInstanceRequest request) {

        Sort sort = Sort.by(Sort.Direction.DESC, "gmtModified");
        PageRequest pageable = PageRequest.of(request.getIndex(), request.getPageSize(), sort);

        InstanceInfoDO queryEntity = new InstanceInfoDO();
        BeanUtils.copyProperties(request, queryEntity);
        queryEntity.setType(request.getType().getV());

        if (!StringUtils.isEmpty(request.getStatus())) {
            queryEntity.setStatus(InstanceStatus.valueOf(request.getStatus()).getV());
        }

        Page<InstanceInfoDO> pageResult = instanceInfoRepository.findAll(Example.of(queryEntity), pageable);
        return ResultDTO.success(convertPage(pageResult));
    }

    private PageResult<InstanceInfoVO> convertPage(Page<InstanceInfoDO> page) {
        List<InstanceInfoVO> content = page.getContent().stream()
                .map(x -> InstanceInfoVO.from(x, cacheService.getJobName(x.getJobId()))).collect(Collectors.toList());

        PageResult<InstanceInfoVO> pageResult = new PageResult<>(page);
        pageResult.setData(content);
        return pageResult;
    }

}

InstanceController提供了stop、retry、detail、log、downloadLogUrl、downloadLog、downloadLog4Console、list方法;其中stop、retry、detail委托给了instanceService;log、downloadLogUrl、downloadLog、downloadLog4Console委托给了instanceLogService;list委托给了instanceInfoRepository

InstanceService

tech/powerjob/server/core/instance/InstanceService.java

代码语言:javascript
复制
@Slf4j
@Service
@RequiredArgsConstructor
public class InstanceService {

    private final TransportService transportService;

    private final DispatchService dispatchService;

    private final IdGenerateService idGenerateService;

    private final InstanceManager instanceManager;

    private final JobInfoRepository jobInfoRepository;

    private final InstanceInfoRepository instanceInfoRepository;

    private final WorkerClusterQueryService workerClusterQueryService;

    //......
}    

InstanceService提供了stopInstance、retryInstance、cancelInstance等方法

stopInstance

代码语言:javascript
复制
    @DesignateServer
    public void stopInstance(Long appId,Long instanceId) {

        log.info("[Instance-{}] try to stop the instance instance in appId: {}", instanceId,appId);
        try {

            InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
            // 判断状态,只有运行中才能停止
            if (!InstanceStatus.GENERALIZED_RUNNING_STATUS.contains(instanceInfo.getStatus())) {
                throw new IllegalArgumentException("can't stop finished instance!");
            }

            // 更新数据库,将状态置为停止
            instanceInfo.setStatus(STOPPED.getV());
            instanceInfo.setGmtModified(new Date());
            instanceInfo.setFinishedTime(System.currentTimeMillis());
            instanceInfo.setResult(SystemInstanceResult.STOPPED_BY_USER);
            instanceInfoRepository.saveAndFlush(instanceInfo);

            instanceManager.processFinishedInstance(instanceId, instanceInfo.getWfInstanceId(), STOPPED, SystemInstanceResult.STOPPED_BY_USER);

            /*
            不可靠通知停止 TaskTracker
            假如没有成功关闭,之后 TaskTracker 会再次 reportStatus,按照流程,instanceLog 会被更新为 RUNNING,开发者可以再次手动关闭
             */
            Optional<WorkerInfo> workerInfoOpt = workerClusterQueryService.getWorkerInfoByAddress(instanceInfo.getAppId(), instanceInfo.getTaskTrackerAddress());
            if (workerInfoOpt.isPresent()) {
                ServerStopInstanceReq req = new ServerStopInstanceReq(instanceId);
                WorkerInfo workerInfo = workerInfoOpt.get();
                transportService.tell(workerInfo.getProtocol(), ServerURLFactory.stopInstance2Worker(workerInfo.getAddress()), req);
                log.info("[Instance-{}] update instanceInfo and send 'stopInstance' request succeed.", instanceId);
            } else {
                log.warn("[Instance-{}] update instanceInfo successfully but can't find TaskTracker to stop instance", instanceId);
            }

        } catch (IllegalArgumentException ie) {
            throw ie;
        } catch (Exception e) {
            log.error("[Instance-{}] stopInstance failed.", instanceId, e);
            throw e;
        }
    }

stopInstance先通过fetchInstanceInfo获取instanceInfo,然后判断状态是否是运行中,是则更新status为STOPPED,然后通过instanceManager.processFinishedInstance完成收尾工作,对于能找到WorkerInfo的,发起ServerStopInstanceReq请求

retryInstance

代码语言:javascript
复制
    @DesignateServer
    public void retryInstance(Long appId, Long instanceId) {

        log.info("[Instance-{}] retry instance in appId: {}", instanceId, appId);

        InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
        if (!InstanceStatus.FINISHED_STATUS.contains(instanceInfo.getStatus())) {
            throw new PowerJobException("Only stopped instance can be retry!");
        }
        // 暂时不支持工作流任务的重试
        if (instanceInfo.getWfInstanceId() != null) {
            throw new PowerJobException("Workflow's instance do not support retry!");
        }

        instanceInfo.setStatus(InstanceStatus.WAITING_DISPATCH.getV());
        instanceInfo.setExpectedTriggerTime(System.currentTimeMillis());
        instanceInfo.setFinishedTime(null);
        instanceInfo.setActualTriggerTime(null);
        instanceInfo.setTaskTrackerAddress(null);
        instanceInfo.setResult(null);
        instanceInfoRepository.saveAndFlush(instanceInfo);

        // 派发任务
        Long jobId = instanceInfo.getJobId();
        JobInfoDO jobInfo = jobInfoRepository.findById(jobId).orElseThrow(() -> new PowerJobException("can't find job info by jobId: " + jobId));
        dispatchService.dispatch(jobInfo, instanceId,Optional.of(instanceInfo),Optional.empty());
    }

retryInstance先拉取instanceInfo,判断状态是不是FINISHED_STATUS,是则更新status为WAITING_DISPATCH,然后通过dispatchService.dispatch进行派发

cancelInstance

代码语言:javascript
复制
    @DesignateServer
    public void cancelInstance(Long appId, Long instanceId) {
        log.info("[Instance-{}] try to cancel the instance with appId {}.", instanceId, appId);

        try {
            InstanceInfoDO instanceInfo = fetchInstanceInfo(instanceId);
            TimerFuture timerFuture = InstanceTimeWheelService.fetchTimerFuture(instanceId);

            boolean success;
            // 本机时间轮中存在该任务且顺利取消,抢救成功!
            if (timerFuture != null) {
                success = timerFuture.cancel();
            } else {
                // 调用该接口时间和预计调度时间相近时,理论上会出现问题,cancel 状态还没写进去另一边就完成了 dispatch,随后状态会被覆盖
                // 解决该问题的成本极高(分布式锁),因此选择不解决
                // 该接口使用条件:调用接口时间与待取消任务的预计执行时间有一定时间间隔,否则不保证可靠性
                success = InstanceStatus.WAITING_DISPATCH.getV() == instanceInfo.getStatus();
            }

            if (success) {
                instanceInfo.setStatus(InstanceStatus.CANCELED.getV());
                instanceInfo.setResult(SystemInstanceResult.CANCELED_BY_USER);
                // 如果写 DB 失败,抛异常,接口返回 false,即取消失败,任务会被 HA 机制重新调度执行,因此此处不需要任何处理
                instanceInfoRepository.saveAndFlush(instanceInfo);
                log.info("[Instance-{}] cancel the instance successfully.", instanceId);
            } else {
                log.warn("[Instance-{}] cancel the instance failed.", instanceId);
                throw new PowerJobException("instance already up and running");
            }

        } catch (Exception e) {
            log.error("[Instance-{}] cancelInstance failed.", instanceId, e);
            throw e;
        }
    }

cancelInstance通过InstanceTimeWheelService.fetchTimerFuture获取timerFuture,对于timerFuture不为null的直接cancel;然后更新status为CANCELED,最后保存

InstanceLogService

tech/powerjob/server/core/instance/InstanceLogService.java

代码语言:javascript
复制
@Slf4j
@Service
public class InstanceLogService {

    @Value("${server.port}")
    private int port;

    @Resource
    private InstanceMetadataService instanceMetadataService;

    @Resource
    private GridFsManager gridFsManager;
    /**
     * 本地数据库操作bean
     */
    @Resource(name = "localTransactionTemplate")
    private TransactionTemplate localTransactionTemplate;

    @Resource
    private LocalInstanceLogRepository localInstanceLogRepository;

    /**
     * 本地维护了在线日志的任务实例ID
     */
    private final Map<Long, Long> instanceId2LastReportTime = Maps.newConcurrentMap();

    @Resource(name = PJThreadPool.BACKGROUND_POOL)
    private AsyncTaskExecutor powerJobBackgroundPool;

    /**
     *  分段锁
     */
    private final SegmentLock segmentLock = new SegmentLock(8);

    /**
     * 格式化时间戳
     */
    private static final FastDateFormat DATE_FORMAT = FastDateFormat.getInstance(OmsConstant.TIME_PATTERN_PLUS);
    /**
     * 每一个展示的行数
     */
    private static final int MAX_LINE_COUNT = 100;
    /**
     * 过期时间
     */
    private static final long EXPIRE_INTERVAL_MS = 60000;

    //......
}    

InstanceLogService提供了fetchInstanceLog、fetchDownloadUrl、downloadInstanceLog等方法

fetchInstanceLog

代码语言:javascript
复制
    @DesignateServer
    public StringPage fetchInstanceLog(Long appId, Long instanceId, Long index) {
        try {
            Future<File> fileFuture = prepareLogFile(instanceId);
            // 超时并不会打断正在执行的任务
            File logFile = fileFuture.get(5, TimeUnit.SECONDS);

            // 分页展示数据
            long lines = 0;
            StringBuilder sb = new StringBuilder();
            String lineStr;
            long left = index * MAX_LINE_COUNT;
            long right = left + MAX_LINE_COUNT;
            try (LineNumberReader lr = new LineNumberReader(new FileReader(logFile))) {
                while ((lineStr = lr.readLine()) != null) {

                    // 指定范围内,读出
                    if (lines >= left && lines < right) {
                        sb.append(lineStr).append(System.lineSeparator());
                    }
                    ++lines;
                }
            }catch (Exception e) {
                log.warn("[InstanceLog-{}] read logFile from disk failed for app: {}.", instanceId, appId, e);
                return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e));
            }

            double totalPage = Math.ceil(1.0 * lines / MAX_LINE_COUNT);
            return new StringPage(index, (long) totalPage, sb.toString());

        }catch (TimeoutException te) {
            return StringPage.simple("log file is being prepared, please try again later.");
        }catch (Exception e) {
            log.warn("[InstanceLog-{}] fetch instance log failed.", instanceId, e);
            return StringPage.simple("oms-server execution exception, caused by " + ExceptionUtils.getRootCauseMessage(e));
        }
    }

    private Future<File> prepareLogFile(long instanceId) {
        return powerJobBackgroundPool.submit(() -> {
            // 在线日志还在不断更新,需要使用本地数据库中的数据
            if (instanceId2LastReportTime.containsKey(instanceId)) {
                return genTemporaryLogFile(instanceId);
            }
            return genStableLogFile(instanceId);
        });
    }    

fetchInstanceLog先通过prepareLogFile准备日志文件,对于还在更新的则执行genTemporaryLogFile,否则执行genStableLogFile;本地数据库存在的则直接下载,否则判断gridFsManager是否可用,可用则从gridFsManager取

fetchDownloadUrl

代码语言:javascript
复制
    @DesignateServer
    public String fetchDownloadUrl(Long appId, Long instanceId) {
        String url = "http://" + NetUtils.getLocalHost() + ":" + port + "/instance/downloadLog?instanceId=" + instanceId;
        log.info("[InstanceLog-{}] downloadURL for appId[{}]: {}", instanceId, appId, url);
        return url;
    }

fetchDownloadUrl则返回/instance/downloadLog下载url

downloadInstanceLog

代码语言:javascript
复制
    public File downloadInstanceLog(long instanceId) throws Exception {
        Future<File> fileFuture = prepareLogFile(instanceId);
        return fileFuture.get(1, TimeUnit.MINUTES);
    }

downloadInstanceLog则是通过prepareLogFile准备文件,然后等待1分钟

小结

InstanceController提供了stop、retry、detail、log、downloadLogUrl、downloadLog、downloadLog4Console、list方法;其中stop、retry、detail委托给了instanceService;log、downloadLogUrl、downloadLog、downloadLog4Console委托给了instanceLogService;list委托给了instanceInfoRepository。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • InstanceController
  • InstanceService
    • stopInstance
      • retryInstance
        • cancelInstance
        • InstanceLogService
          • fetchInstanceLog
            • fetchDownloadUrl
              • downloadInstanceLog
              • 小结
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档