前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊PowerJob的InstanceStatusCheckService

聊聊PowerJob的InstanceStatusCheckService

原创
作者头像
code4it
发布2024-02-10 22:11:35
770
发布2024-02-10 22:11:35
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下PowerJob的InstanceStatusCheckService

InstanceStatus

tech/powerjob/common/enums/InstanceStatus.java

代码语言:javascript
复制
@Getter
@AllArgsConstructor
public enum InstanceStatus {
    /**
     *
     */
    WAITING_DISPATCH(1, "等待派发"),
    WAITING_WORKER_RECEIVE(2, "等待Worker接收"),
    RUNNING(3, "运行中"),
    FAILED(4, "失败"),
    SUCCEED(5, "成功"),
    CANCELED(9, "取消"),
    STOPPED(10, "手动停止");

    private final int v;
    private final String des;

    /**
     * 广义的运行状态
     */
    public static final List<Integer> GENERALIZED_RUNNING_STATUS = Lists.newArrayList(WAITING_DISPATCH.v, WAITING_WORKER_RECEIVE.v, RUNNING.v);
    /**
     * 结束状态
     */
    public static final List<Integer> FINISHED_STATUS = Lists.newArrayList(FAILED.v, SUCCEED.v, CANCELED.v, STOPPED.v);

    public static InstanceStatus of(int v) {
        for (InstanceStatus is : values()) {
            if (v == is.v) {
                return is;
            }
        }
        throw new IllegalArgumentException("InstanceStatus has no item for value " + v);
    }
}

InstanceStatus定义了任务实例的状态,广义运行中的状态为WAITING_DISPATCH、WAITING_WORKER_RECEIVE、RUNNING;终态为FAILED、SUCCEED、CANCELED、STOPPED

InstanceStatusCheckService

tech/powerjob/server/core/scheduler/InstanceStatusCheckService.java

代码语言:javascript
复制
@Slf4j
@Service
@RequiredArgsConstructor
public class InstanceStatusCheckService {

    private static final int MAX_BATCH_NUM_APP = 10;
    private static final int MAX_BATCH_NUM_INSTANCE = 3000;
    private static final int MAX_BATCH_UPDATE_NUM = 500;
    private static final long DISPATCH_TIMEOUT_MS = 30000;
    private static final long RECEIVE_TIMEOUT_MS = 60000;
    private static final long RUNNING_TIMEOUT_MS = 60000;
    private static final long WORKFLOW_WAITING_TIMEOUT_MS = 60000;

    public static final long CHECK_INTERVAL = 10000;

    private final TransportService transportService;

    private final DispatchService dispatchService;

    private final InstanceManager instanceManager;

    private final WorkflowInstanceManager workflowInstanceManager;

    private final AppInfoRepository appInfoRepository;

    private final JobInfoRepository jobInfoRepository;

    private final InstanceInfoRepository instanceInfoRepository;

    private final WorkflowInfoRepository workflowInfoRepository;

    private final WorkflowInstanceInfoRepository workflowInstanceInfoRepository;

    //......
}    

InstanceStatusCheckService提供了checkRunningInstance、checkWaitingDispatchInstance、checkWaitingWorkerReceiveInstance、checkWorkflowInstance方法

checkRunningInstance

代码语言:javascript
复制
    public void checkRunningInstance() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        // 查询 DB 获取该 Server 需要负责的 AppGroup
        List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
        if (CollectionUtils.isEmpty(allAppIds)) {
            log.info("[InstanceStatusChecker] current server has no app's job to check");
            return;
        }
        try {
            // 检查 RUNNING 状态的任务(一定时间没收到 TaskTracker 的状态报告,视为失败)
            Lists.partition(allAppIds, MAX_BATCH_NUM_APP).forEach(this::handleRunningInstance);
        } catch (Exception e) {
            log.error("[InstanceStatusChecker] RunningInstance status check failed.", e);
        }
        log.info("[InstanceStatusChecker] RunningInstance status check using {}.", stopwatch.stop());
    }

    private void handleRunningInstance(List<Long> partAppIds) {
        // 3. 检查 RUNNING 状态的任务(一定时间没收到 TaskTracker 的状态报告,视为失败)
        long threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS;
        List<BriefInstanceInfo> failedInstances = instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold), PageRequest.of(0, MAX_BATCH_NUM_INSTANCE));
        while (!failedInstances.isEmpty()) {
            // collect job id
            Set<Long> jobIds = failedInstances.stream().map(BriefInstanceInfo::getJobId).collect(Collectors.toSet());
            // query job info and map
            Map<Long, JobInfoDO> jobInfoMap = jobInfoRepository.findByIdIn(jobIds).stream().collect(Collectors.toMap(JobInfoDO::getId, e -> e));
            log.warn("[InstanceStatusCheckService] find some instances have not received status report for a long time : {}", failedInstances.stream().map(BriefInstanceInfo::getInstanceId).collect(Collectors.toList()));
            failedInstances.forEach(instance -> {
                Optional<JobInfoDO> jobInfoOpt = Optional.ofNullable(jobInfoMap.get(instance.getJobId()));
                if (!jobInfoOpt.isPresent()) {
                    final Optional<InstanceInfoDO> opt = instanceInfoRepository.findById(instance.getId());
                    opt.ifPresent(e -> updateFailedInstance(e, SystemInstanceResult.REPORT_TIMEOUT));
                    return;
                }
                TimeExpressionType timeExpressionType = TimeExpressionType.of(jobInfoOpt.get().getTimeExpressionType());
                SwitchableStatus switchableStatus = SwitchableStatus.of(jobInfoOpt.get().getStatus());
                // 如果任务已关闭,则不进行重试,将任务置为失败即可;秒级任务也直接置为失败,由派发器重新调度
                if (switchableStatus != SwitchableStatus.ENABLE || TimeExpressionType.FREQUENT_TYPES.contains(timeExpressionType.getV())) {
                    final Optional<InstanceInfoDO> opt = instanceInfoRepository.findById(instance.getId());
                    opt.ifPresent(e -> updateFailedInstance(e, SystemInstanceResult.REPORT_TIMEOUT));
                    return;
                }
                // CRON 和 API一样,失败次数 + 1,根据重试配置进行重试
                if (instance.getRunningTimes() < jobInfoOpt.get().getInstanceRetryNum()) {
                    dispatchService.redispatchAsync(instance.getInstanceId(), InstanceStatus.RUNNING.getV());
                } else {
                    final Optional<InstanceInfoDO> opt = instanceInfoRepository.findById(instance.getId());
                    opt.ifPresent(e -> updateFailedInstance(e, SystemInstanceResult.REPORT_TIMEOUT));
                }
            });
            threshold = System.currentTimeMillis() - RUNNING_TIMEOUT_MS;
            failedInstances = instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndGmtModifiedBefore(partAppIds, InstanceStatus.RUNNING.getV(), new Date(threshold), PageRequest.of(0, MAX_BATCH_NUM_INSTANCE));
        }

    }    

checkRunningInstance查询该server负责的appId,然后挨个遍历执行handleRunningInstance;handleRunningInstance查找一定时间没收到TaskTracker状态报告的任务实例,若任务已经关闭则不进行重试,若是秒级任务则更新为失败,其他的则判断运行次数市场超过重试次数,否则通过dispatchService.redispatchAsync重试

checkWaitingDispatchInstance

代码语言:javascript
复制
    public void checkWaitingDispatchInstance() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        // 查询 DB 获取该 Server 需要负责的 AppGroup
        List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
        if (CollectionUtils.isEmpty(allAppIds)) {
            log.info("[InstanceStatusChecker] current server has no app's job to check");
            return;
        }
        try {
            // 检查等待 WAITING_DISPATCH 状态的任务
            Lists.partition(allAppIds, MAX_BATCH_NUM_APP).forEach(this::handleWaitingDispatchInstance);
        } catch (Exception e) {
            log.error("[InstanceStatusChecker] WaitingDispatchInstance status check failed.", e);
        }
        log.info("[InstanceStatusChecker] WaitingDispatchInstance status check using {}.", stopwatch.stop());
    }

    private void handleWaitingDispatchInstance(List<Long> partAppIds) {
        // 1. 检查等待 WAITING_DISPATCH 状态的任务
        long threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS;
        List<InstanceInfoDO> waitingDispatchInstances = instanceInfoRepository.findAllByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold, PageRequest.of(0, MAX_BATCH_NUM_INSTANCE));
        while (!waitingDispatchInstances.isEmpty()) {
            List<Long> overloadAppIdList = new ArrayList<>();
            long startTime = System.currentTimeMillis();
            // 按照 appId 分组处理,方便处理超载的逻辑
            Map<Long, List<InstanceInfoDO>> waitingDispatchInstancesMap = waitingDispatchInstances.stream().collect(Collectors.groupingBy(InstanceInfoDO::getAppId));
            for (Map.Entry<Long, List<InstanceInfoDO>> entry : waitingDispatchInstancesMap.entrySet()) {
                final Long currentAppId = entry.getKey();
                final List<InstanceInfoDO> currentAppWaitingDispatchInstances = entry.getValue();
                // collect job id
                Set<Long> jobIds = currentAppWaitingDispatchInstances.stream().map(InstanceInfoDO::getJobId).collect(Collectors.toSet());
                // query job info and map
                Map<Long, JobInfoDO> jobInfoMap = jobInfoRepository.findByIdIn(jobIds).stream().collect(Collectors.toMap(JobInfoDO::getId, e -> e));
                log.warn("[InstanceStatusChecker] find some instance in app({}) which is not triggered as expected: {}", currentAppId, currentAppWaitingDispatchInstances.stream().map(InstanceInfoDO::getInstanceId).collect(Collectors.toList()));
                final Holder<Boolean> overloadFlag = new Holder<>(false);
                // 先这么简单处理没问题,毕竟只有这一个地方用了 parallelStream
                currentAppWaitingDispatchInstances.parallelStream().forEach(instance -> {
                    if (overloadFlag.get()) {
                        // 直接忽略
                        return;
                    }
                    Optional<JobInfoDO> jobInfoOpt = Optional.ofNullable(jobInfoMap.get(instance.getJobId()));
                    if (jobInfoOpt.isPresent()) {
                        // 处理等待派发的任务没有必要再重置一次状态,减少 io 次数
                        dispatchService.dispatch(jobInfoOpt.get(), instance.getInstanceId(), Optional.of(instance), Optional.of(overloadFlag));
                    } else {
                        log.warn("[InstanceStatusChecker] can't find job by jobId[{}], so redispatch failed, failed instance: {}", instance.getJobId(), instance);
                        final Optional<InstanceInfoDO> opt = instanceInfoRepository.findById(instance.getId());
                        opt.ifPresent(instanceInfoDO -> updateFailedInstance(instanceInfoDO, SystemInstanceResult.CAN_NOT_FIND_JOB_INFO));
                    }
                });
                threshold = System.currentTimeMillis() - DISPATCH_TIMEOUT_MS;
                if (overloadFlag.get()) {
                    overloadAppIdList.add(currentAppId);
                }
            }
            log.info("[InstanceStatusChecker] process {} task,use {} ms", waitingDispatchInstances.size(), System.currentTimeMillis() - startTime);
            if (!overloadAppIdList.isEmpty()) {
                log.warn("[InstanceStatusChecker] app[{}] is overload, so skip check waiting dispatch instance", overloadAppIdList);
                partAppIds.removeAll(overloadAppIdList);
            }
            if (partAppIds.isEmpty()) {
                break;
            }
            waitingDispatchInstances = instanceInfoRepository.findAllByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_DISPATCH.getV(), threshold, PageRequest.of(0, MAX_BATCH_NUM_INSTANCE));
        }

    }    

checkWaitingDispatchInstance查找待派发的任务实例,通过dispatchService.dispatch进行派发

checkWaitingWorkerReceiveInstance

代码语言:javascript
复制
    public void checkWaitingWorkerReceiveInstance() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        // 查询 DB 获取该 Server 需要负责的 AppGroup
        List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
        if (CollectionUtils.isEmpty(allAppIds)) {
            log.info("[InstanceStatusChecker] current server has no app's job to check");
            return;
        }
        try {
            // 检查 WAITING_WORKER_RECEIVE 状态的任务
            Lists.partition(allAppIds, MAX_BATCH_NUM_APP).forEach(this::handleWaitingWorkerReceiveInstance);
        } catch (Exception e) {
            log.error("[InstanceStatusChecker] WaitingWorkerReceiveInstance status check failed.", e);
        }
        log.info("[InstanceStatusChecker] WaitingWorkerReceiveInstance status check using {}.", stopwatch.stop());
    }

    private void handleWaitingWorkerReceiveInstance(List<Long> partAppIds) {
        // 2. 检查 WAITING_WORKER_RECEIVE 状态的任务
        long threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS;
        List<BriefInstanceInfo> waitingWorkerReceiveInstances = instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold, PageRequest.of(0, MAX_BATCH_NUM_INSTANCE));
        while (!waitingWorkerReceiveInstances.isEmpty()) {
            log.warn("[InstanceStatusChecker] find some instance didn't receive any reply from worker, try to redispatch: {}", waitingWorkerReceiveInstances.stream().map(BriefInstanceInfo::getInstanceId).collect(Collectors.toList()));
            final List<List<BriefInstanceInfo>> partitions = Lists.partition(waitingWorkerReceiveInstances, MAX_BATCH_UPDATE_NUM);
            for (List<BriefInstanceInfo> partition : partitions) {
                dispatchService.redispatchBatchAsyncLockFree(partition.stream().map(BriefInstanceInfo::getInstanceId).collect(Collectors.toList()), InstanceStatus.WAITING_WORKER_RECEIVE.getV());
            }
            // 重新查询
            threshold = System.currentTimeMillis() - RECEIVE_TIMEOUT_MS;
            waitingWorkerReceiveInstances = instanceInfoRepository.selectBriefInfoByAppIdInAndStatusAndActualTriggerTimeLessThan(partAppIds, InstanceStatus.WAITING_WORKER_RECEIVE.getV(), threshold, PageRequest.of(0, MAX_BATCH_NUM_INSTANCE));
        }
    }    

checkWaitingWorkerReceiveInstance检查等待worker接收的实例,挨个执行dispatchService.redispatchBatchAsyncLockFree

checkWorkflowInstance

代码语言:javascript
复制
    public void checkWorkflowInstance() {
        Stopwatch stopwatch = Stopwatch.createStarted();
        // 查询 DB 获取该 Server 需要负责的 AppGroup
        List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
        if (CollectionUtils.isEmpty(allAppIds)) {
            log.info("[InstanceStatusChecker] current server has no app's job to check");
            return;
        }
        try {
            checkWorkflowInstance(allAppIds);
        } catch (Exception e) {
            log.error("[InstanceStatusChecker] WorkflowInstance status check failed.", e);
        }
        log.info("[InstanceStatusChecker] WorkflowInstance status check using {}.", stopwatch.stop());
    }

    private void checkWorkflowInstance(List<Long> allAppIds) {

        // 重试长时间处于 WAITING 状态的工作流实例
        long threshold = System.currentTimeMillis() - WORKFLOW_WAITING_TIMEOUT_MS;
        Lists.partition(allAppIds, MAX_BATCH_NUM_APP).forEach(partAppIds -> {
            List<WorkflowInstanceInfoDO> waitingWfInstanceList = workflowInstanceInfoRepository.findByAppIdInAndStatusAndExpectedTriggerTimeLessThan(partAppIds, WorkflowInstanceStatus.WAITING.getV(), threshold);
            if (!CollectionUtils.isEmpty(waitingWfInstanceList)) {

                List<Long> wfInstanceIds = waitingWfInstanceList.stream().map(WorkflowInstanceInfoDO::getWfInstanceId).collect(Collectors.toList());
                log.warn("[WorkflowInstanceChecker] wfInstance({}) is not started as expected, oms try to restart these workflowInstance.", wfInstanceIds);

                waitingWfInstanceList.forEach(wfInstance -> {
                    Optional<WorkflowInfoDO> workflowOpt = workflowInfoRepository.findById(wfInstance.getWorkflowId());
                    workflowOpt.ifPresent(workflowInfo -> {
                        workflowInstanceManager.start(workflowInfo, wfInstance.getWfInstanceId());
                        log.info("[Workflow-{}|{}] restart workflowInstance successfully~", workflowInfo.getId(), wfInstance.getWfInstanceId());
                    });
                });
            }
        });
    }    

checkWorkflowInstance定期检测工作流实例的状态,针对WAITING的挨个执行workflowInstanceManager.start

小结

InstanceStatusCheckService提供了checkRunningInstance、checkWaitingDispatchInstance、checkWaitingWorkerReceiveInstance、checkWorkflowInstance方法,他们分别用于检查状态是运行中但是上报超时的任务实例、等待worker接收处理的任务实例、等待调度的工作流实例。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • InstanceStatus
  • InstanceStatusCheckService
  • checkRunningInstance
  • checkWaitingDispatchInstance
  • checkWaitingWorkerReceiveInstance
  • checkWorkflowInstance
  • 小结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档