前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >聊聊powerjob的failedTaskNum

聊聊powerjob的failedTaskNum

原创
作者头像
code4it
发布2024-03-15 10:07:07
840
发布2024-03-15 10:07:07
举报
文章被收录于专栏:码匠的流水账码匠的流水账

本文主要研究一下powerjob的failedTaskNum

InstanceStatisticsHolder

powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

代码语言:javascript
复制
    @Data
    protected static class InstanceStatisticsHolder {
        // 等待派发状态(仅存在 TaskTracker 数据库中)
        protected long waitingDispatchNum;
        // 已派发,但 ProcessorTracker 未确认,可能由于网络错误请求未送达,也有可能 ProcessorTracker 线程池满,拒绝执行
        protected long workerUnreceivedNum;
        // ProcessorTracker确认接收,存在与线程池队列中,排队执行
        protected long receivedNum;
        // ProcessorTracker正在执行
        protected long runningNum;
        protected long failedNum;
        protected long succeedNum;

        public long getTotalTaskNum() {
            return waitingDispatchNum + workerUnreceivedNum + receivedNum + runningNum + failedNum + succeedNum;
        }
    }

InstanceStatisticsHolder用于存储task的各种状态的数量

fetchRunningStatus

powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java

代码语言:javascript
复制
    public InstanceDetail fetchRunningStatus() {

        InstanceDetail detail = new InstanceDetail();
        // 填充基础信息
        detail.setActualTriggerTime(createTime);
        detail.setStatus(InstanceStatus.RUNNING.getV());
        detail.setTaskTrackerAddress(workerRuntime.getWorkerAddress());

        // 填充详细信息
        InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);
        InstanceDetail.TaskDetail taskDetail = new InstanceDetail.TaskDetail();
        taskDetail.setSucceedTaskNum(holder.succeedNum);
        taskDetail.setFailedTaskNum(holder.failedNum);
        taskDetail.setTotalTaskNum(holder.getTotalTaskNum());
        detail.setTaskDetail(taskDetail);

        return detail;
    }

CommonTaskTracker的fetchRunningStatus会执行getInstanceStatisticsHolder获取InstanceStatisticsHolder,之后用holder.failedNum填充taskDetail的failedTaskNum

getInstanceStatisticsHolder

powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

代码语言:javascript
复制
    protected InstanceStatisticsHolder getInstanceStatisticsHolder(long subInstanceId) {

        Map<TaskStatus, Long> status2Num = taskPersistenceService.getTaskStatusStatistics(instanceId, subInstanceId);
        InstanceStatisticsHolder holder = new InstanceStatisticsHolder();

        holder.waitingDispatchNum = status2Num.getOrDefault(TaskStatus.WAITING_DISPATCH, 0L);
        holder.workerUnreceivedNum = status2Num.getOrDefault(TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 0L);
        holder.receivedNum = status2Num.getOrDefault(TaskStatus.WORKER_RECEIVED, 0L);
        holder.runningNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESSING, 0L);
        holder.failedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_FAILED, 0L);
        holder.succeedNum = status2Num.getOrDefault(TaskStatus.WORKER_PROCESS_SUCCESS, 0L);
        return holder;
    }

getInstanceStatisticsHolder通过taskPersistenceService.getTaskStatusStatistics获取status2Num,然后获取状态为TaskStatus.WORKER_PROCESS_FAILED的数量作为failedNum

getTaskStatusStatistics

powerjob-worker/src/main/java/tech/powerjob/worker/persistence/TaskPersistenceService.java

代码语言:javascript
复制
    public Map<TaskStatus, Long> getTaskStatusStatistics(Long instanceId, Long subInstanceId) {
        try {

            SimpleTaskQuery query = new SimpleTaskQuery();
            query.setInstanceId(instanceId);
            query.setSubInstanceId(subInstanceId);
            query.setQueryContent("status, count(*) as num");
            query.setOtherCondition("GROUP BY status");

            return execute(() -> {
                List<Map<String, Object>> dbRES = taskDAO.simpleQueryPlus(query);
                Map<TaskStatus, Long> result = Maps.newHashMap();
                dbRES.forEach(row -> {
                    // H2 数据库都是大写...
                    int status = Integer.parseInt(String.valueOf(row.get("status")));
                    long num = Long.parseLong(String.valueOf(row.get("num")));
                    result.put(TaskStatus.of(status), num);
                });
                return result;
            });
        }catch (Exception e) {
            log.error("[TaskPersistenceService] getTaskStatusStatistics for instance(id={}) failed.", instanceId, e);
        }
        return Maps.newHashMap();
    }

TaskPersistenceService的getTaskStatusStatistics执行select status, count(*) as num from task_info where instance_id= ? and sub_instance_id=? GROUP BY status

TaskTracker will have a retry

LightTaskTracker

powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/light/LightTaskTracker.java

代码语言:javascript
复制
    private ProcessResult processTask() {
        executeThread.set(Thread.currentThread());
        // 设置任务开始执行的时间
        taskStartTime = System.currentTimeMillis();
        status = TaskStatus.WORKER_PROCESSING;
        // 开始执行时,提交任务判断是否超时
        ProcessResult res = null;
        do {
            Thread.currentThread().setContextClassLoader(processorBean.getClassLoader());
            if (res != null && !res.isSuccess()) {
                // 重试
                taskContext.setCurrentRetryTimes(taskContext.getCurrentRetryTimes() + 1);
                log.warn("[TaskTracker-{}] process failed, TaskTracker will have a retry,current retryTimes : {}", instanceId, taskContext.getCurrentRetryTimes());
            }
            try {
                res = processorBean.getProcessor().process(taskContext);
            } catch (InterruptedException e) {
                log.warn("[TaskTracker-{}] task has been interrupted !", instanceId, e);
                Thread.currentThread().interrupt();
                if (timeoutFlag.get()) {
                    res = new ProcessResult(false, SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT_INTERRUPTED);
                } else if (stopFlag.get()) {
                    res = new ProcessResult(false, SystemInstanceResult.USER_STOP_INSTANCE_INTERRUPTED);
                } else {
                    res = new ProcessResult(false, e.toString());
                }
            } catch (Exception e) {
                log.warn("[TaskTracker-{}] process failed !", instanceId, e);
                res = new ProcessResult(false, e.toString());
            }
            if (res == null) {
                log.warn("[TaskTracker-{}] processor return null !", instanceId);
                res = new ProcessResult(false, "Processor return null");
            }
        } while (!res.isSuccess() && taskContext.getCurrentRetryTimes() < taskContext.getMaxRetryTimes() && !timeoutFlag.get() && !stopFlag.get());
        executeThread.set(null);
        taskEndTime = System.currentTimeMillis();
        finished.set(true);
        result = res;
        status = result.isSuccess() ? TaskStatus.WORKER_PROCESS_SUCCESS : TaskStatus.WORKER_PROCESS_FAILED;
        // 取消超时检查任务
        if (timeoutCheckScheduledFuture != null) {
            timeoutCheckScheduledFuture.cancel(true);
        }
        log.info("[TaskTracker-{}] task complete ! create time:{},queue time:{},use time:{},result:{}", instanceId, createTime, taskStartTime - createTime, System.currentTimeMillis() - taskStartTime, result);
        // 执行完成后立即上报一次
        checkAndReportStatus();
        return result;
    }

LightTaskTracker的processTask的时候,在ProcessResult不为成功的时候,会递增重试次数,打印[TaskTracker-{}] process failed, TaskTracker will have a retry,current retryTimes : {}

updateTaskStatus

powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/HeavyTaskTracker.java

代码语言:javascript
复制
public void updateTaskStatus(Long subInstanceId, String taskId, int newStatus, long reportTime, @Nullable String result) {

        if (finished.get()) {
            return;
        }
        TaskStatus nTaskStatus = TaskStatus.of(newStatus);

        int lockId = taskId.hashCode();
        try {

            // 阻塞获取锁
            segmentLock.lockInterruptible(lockId);
            TaskBriefInfo taskBriefInfo = taskId2BriefInfo.getIfPresent(taskId);

            // 缓存中不存在,从数据库查
            if (taskBriefInfo == null) {
                Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
                if (taskOpt.isPresent()) {
                    TaskDO taskDO = taskOpt.get();
                    taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.of(taskDO.getStatus()), taskDO.getLastReportTime());
                } else {
                    // 理论上不存在这种情况,除非数据库异常
                    log.error("[TaskTracker-{}-{}] can't find task by taskId={}.", instanceId, subInstanceId, taskId);
                    taskBriefInfo = new TaskBriefInfo(taskId, TaskStatus.WAITING_DISPATCH, -1L);
                }
                // 写入缓存
                taskId2BriefInfo.put(taskId, taskBriefInfo);
            }

            // 过滤过期的请求(潜在的集群时间一致性需求,重试跨 Worker 时,时间不一致可能导致问题)
            if (taskBriefInfo.getLastReportTime() > reportTime) {
                log.warn("[TaskTracker-{}-{}] receive expired(last {} > current {}) task status report(taskId={},newStatus={}), TaskTracker will drop this report.",
                        instanceId, subInstanceId, taskBriefInfo.getLastReportTime(), reportTime, taskId, newStatus);
                return;
            }
            // 检查状态转移是否合法,fix issue 404
            if (nTaskStatus.getValue() < taskBriefInfo.getStatus().getValue()) {
                log.warn("[TaskTracker-{}-{}] receive invalid task status report(taskId={},currentStatus={},newStatus={}), TaskTracker will drop this report.",
                        instanceId, subInstanceId, taskId, taskBriefInfo.getStatus().getValue(), newStatus);
                return;
            }

            // 此时本次请求已经有效,先更新相关信息
            taskBriefInfo.setLastReportTime(reportTime);
            taskBriefInfo.setStatus(nTaskStatus);

            // 处理失败的情况
            int configTaskRetryNum = instanceInfo.getTaskRetryNum();
            if (nTaskStatus == TaskStatus.WORKER_PROCESS_FAILED && configTaskRetryNum >= 1) {

                // 失败不是主要的情况,多查一次数据库也问题不大(况且前面有缓存顶着,大部分情况之前不会去查DB)
                Optional<TaskDO> taskOpt = taskPersistenceService.getTask(instanceId, taskId);
                // 查询DB再失败的话,就不重试了...
                if (taskOpt.isPresent()) {
                    int failedCnt = taskOpt.get().getFailedCnt();
                    if (failedCnt < configTaskRetryNum) {

                        TaskDO updateEntity = new TaskDO();
                        updateEntity.setFailedCnt(failedCnt + 1);

                        /*
                        地址规则:
                        1. 当前存储的地址为任务派发的目的地(ProcessorTracker地址)
                        2. 根任务、最终任务必须由TaskTracker所在机器执行(如果是根任务和最终任务,不应当修改地址)
                        3. 广播任务每台机器都需要执行,因此不应该重新分配worker(广播任务不应当修改地址)
                         */
                        String taskName = taskOpt.get().getTaskName();
                        ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());
                        if (!taskName.equals(TaskConstant.ROOT_TASK_NAME) && !taskName.equals(TaskConstant.LAST_TASK_NAME) && executeType != ExecuteType.BROADCAST) {
                            updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
                        }

                        updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
                        updateEntity.setLastReportTime(reportTime);

                        boolean retryTask = taskPersistenceService.updateTask(instanceId, taskId, updateEntity);
                        if (retryTask) {
                            log.info("[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry.", instanceId, subInstanceId, taskId);
                            return;
                        }
                    }
                }
            }

            // 更新状态(失败重试写入DB失败的,也就不重试了...谁让你那么倒霉呢...)
            result = result == null ? "" : result;
            boolean updateResult = taskPersistenceService.updateTaskStatus(instanceId, taskId, newStatus, reportTime, result);

            if (!updateResult) {
                log.warn("[TaskTracker-{}-{}] update task status failed, this task(taskId={}) may be processed repeatedly!", instanceId, subInstanceId, taskId);
            }

        } catch (InterruptedException ignore) {
            // ignore
        } catch (Exception e) {
            log.warn("[TaskTracker-{}-{}] update task status failed.", instanceId, subInstanceId, e);
        } finally {
            segmentLock.unlock(lockId);
        }
    }

HeavyTaskTracker在updateTaskStatus的时候,对于retryTask会打印[TaskTracker-{}-{}] task(taskId={}) process failed, TaskTracker will have a retry. 这里前提是failedCnt < configTaskRetryNum,而这个configTaskRetryNum为instanceInfo.getTaskRetryNum()

taskRetryNum

powerjob-worker/src/main/java/tech/powerjob/worker/pojo/model/InstanceInfo.java

代码语言:javascript
复制
@Data
public class InstanceInfo implements Serializable {

    /**
     * 基础信息
     */
    private Long jobId;
    private Long instanceId;
    private Long wfInstanceId;

    /**
     * 任务执行处理器信息
     */
    // 任务执行类型,单机、广播、MR
    private String executeType;
    // 处理器类型(JavaBean、Jar、脚本等)
    private String processorType;
    // 处理器信息
    private String processorInfo;
    // 定时类型
    private int timeExpressionType;

    /**
     * 超时时间
     */
    // 整个任务的总体超时时间
    private long instanceTimeoutMS;

    /**
     * 任务运行参数
     */
    // 任务级别的参数,相当于类的static变量
    private String jobParams;
    // 实例级别的参数,相当于类的普通变量
    private String instanceParams;


    // 每台机器的处理线程数上限
    private int threadConcurrency;
    // 子任务重试次数(任务本身的重试机制由server控制)
    private int taskRetryNum;

    private String logConfig;
}

InstanceInfo定义了taskRetryNum,用于指定子任务的重试次数,默认是1

StatusCheckRunnable

powerjob-worker/src/main/java/tech/powerjob/worker/core/tracker/task/heavy/CommonTaskTracker.java

代码语言:javascript
复制
    private class StatusCheckRunnable implements Runnable {

        private static final long DISPATCH_TIME_OUT_MS = 15000;

        @SuppressWarnings("squid:S3776")
        private void innerRun() {

            InstanceStatisticsHolder holder = getInstanceStatisticsHolder(instanceId);

            long finishedNum = holder.succeedNum + holder.failedNum;
            long unfinishedNum = holder.waitingDispatchNum + holder.workerUnreceivedNum + holder.receivedNum + holder.runningNum;

            log.debug("[TaskTracker-{}] status check result: {}", instanceId, holder);

            TaskTrackerReportInstanceStatusReq req = new TaskTrackerReportInstanceStatusReq();
            req.setAppId(workerRuntime.getAppId());
            req.setJobId(instanceInfo.getJobId());
            req.setInstanceId(instanceId);
            req.setWfInstanceId(instanceInfo.getWfInstanceId());
            req.setTotalTaskNum(finishedNum + unfinishedNum);
            req.setSucceedTaskNum(holder.succeedNum);
            req.setFailedTaskNum(holder.failedNum);
            req.setReportTime(System.currentTimeMillis());
            req.setStartTime(createTime);
            req.setSourceAddress(workerRuntime.getWorkerAddress());

            boolean success = false;
            String result = null;

            // 2. 如果未完成任务数为0,判断是否真正结束,并获取真正结束任务的执行结果
            if (unfinishedNum == 0) {

                // 数据库中一个任务都没有,说明根任务创建失败,该任务实例失败
                if (finishedNum == 0) {
                    finished.set(true);
                    result = SystemInstanceResult.TASK_INIT_FAILED;
                } else {
                    ExecuteType executeType = ExecuteType.valueOf(instanceInfo.getExecuteType());

                    switch (executeType) {

                        // STANDALONE 只有一个任务,完成即结束
                        case STANDALONE:
                            finished.set(true);
                            List<TaskDO> allTask = taskPersistenceService.getAllTask(instanceId, instanceId);
                            if (CollectionUtils.isEmpty(allTask) || allTask.size() > 1) {
                                result = SystemInstanceResult.UNKNOWN_BUG;
                                log.warn("[TaskTracker-{}] there must have some bug in TaskTracker.", instanceId);
                            } else {
                                result = allTask.get(0).getResult();
                                success = allTask.get(0).getStatus() == TaskStatus.WORKER_PROCESS_SUCCESS.getValue();
                            }
                            break;
                        // MAP 不关心结果,最简单
                        case MAP:
                            finished.set(true);
                            success = holder.failedNum == 0;
                            result = String.format("total:%d,succeed:%d,failed:%d", holder.getTotalTaskNum(), holder.succeedNum, holder.failedNum);
                            break;
                        // MapReduce 和 Broadcast 任务实例是否完成根据**LastTask**的执行情况判断
                        default:

                            Optional<TaskDO> lastTaskOptional = taskPersistenceService.getLastTask(instanceId, instanceId);
                            if (lastTaskOptional.isPresent()) {

                                // 存在则根据 reduce 任务来判断状态
                                TaskDO resultTask = lastTaskOptional.get();
                                TaskStatus lastTaskStatus = TaskStatus.of(resultTask.getStatus());

                                if (lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS || lastTaskStatus == TaskStatus.WORKER_PROCESS_FAILED) {
                                    finished.set(true);
                                    success = lastTaskStatus == TaskStatus.WORKER_PROCESS_SUCCESS;
                                    result = resultTask.getResult();
                                }

                            } else {

                                // 不存在,代表前置任务刚刚执行完毕,需要创建 lastTask,最终任务必须在本机执行!
                                TaskDO newLastTask = new TaskDO();
                                newLastTask.setTaskName(TaskConstant.LAST_TASK_NAME);
                                newLastTask.setTaskId(LAST_TASK_ID);
                                newLastTask.setSubInstanceId(instanceId);
                                newLastTask.setAddress(workerRuntime.getWorkerAddress());
                                submitTask(Lists.newArrayList(newLastTask));
                            }
                    }
                }
            }

            // 3. 检查任务实例整体是否超时
            if (isTimeout()) {
                finished.set(true);
                success = false;
                result = SystemInstanceResult.INSTANCE_EXECUTE_TIMEOUT;
            }

            // 4. 执行完毕,报告服务器
            if (finished.get()) {
                req.setResult(result);
                // 上报追加的工作流上下文信息
                req.setAppendedWfContext(appendedWfContext);
                req.setInstanceStatus(success ? InstanceStatus.SUCCEED.getV() : InstanceStatus.FAILED.getV());
                reportFinalStatusThenDestroy(workerRuntime, req);
                return;
            }

            // 5. 未完成,上报状态
            req.setInstanceStatus(InstanceStatus.RUNNING.getV());
            TransportUtils.ttReportInstanceStatus(req, workerRuntime.getServerDiscoveryService().getCurrentServerAddress(), workerRuntime.getTransporter());

            // 6.1 定期检查 -> 重试派发后未确认的任务
            long currentMS = System.currentTimeMillis();
            if (holder.workerUnreceivedNum != 0) {
                taskPersistenceService.getTaskByStatus(instanceId, TaskStatus.DISPATCH_SUCCESS_WORKER_UNCHECK, 100).forEach(uncheckTask -> {

                    long elapsedTime = currentMS - uncheckTask.getLastModifiedTime();
                    if (elapsedTime > DISPATCH_TIME_OUT_MS) {

                        TaskDO updateEntity = new TaskDO();
                        updateEntity.setStatus(TaskStatus.WAITING_DISPATCH.getValue());
                        // 特殊任务只能本机执行
                        if (!TaskConstant.LAST_TASK_NAME.equals(uncheckTask.getTaskName())) {
                            updateEntity.setAddress(RemoteConstant.EMPTY_ADDRESS);
                        }
                        // 失败次数 + 1
                        updateEntity.setFailedCnt(uncheckTask.getFailedCnt() + 1);

                        taskPersistenceService.updateTask(instanceId, uncheckTask.getTaskId(), updateEntity);

                        log.warn("[TaskTracker-{}] task(id={},name={}) try to dispatch again due to unreceived the response from ProcessorTracker.",
                                instanceId, uncheckTask.getTaskId(), uncheckTask.getTaskName());
                    }

                });
            }

            // 6.2 定期检查 -> 重新执行被派发到宕机ProcessorTracker上的任务
            List<String> disconnectedPTs = ptStatusHolder.getAllDisconnectedProcessorTrackers();
            if (!disconnectedPTs.isEmpty()) {
                log.warn("[TaskTracker-{}] some ProcessorTracker disconnected from TaskTracker,their address is {}.", instanceId, disconnectedPTs);
                if (taskPersistenceService.updateLostTasks(instanceId, disconnectedPTs, true)) {
                    ptStatusHolder.remove(disconnectedPTs);
                    log.warn("[TaskTracker-{}] removed these ProcessorTracker from StatusHolder: {}", instanceId, disconnectedPTs);
                }
            }
        }

        /**
         * 任务是否超时
         */
        public boolean isTimeout() {
            if (instanceInfo.getInstanceTimeoutMS() > 0) {
                return System.currentTimeMillis() - createTime > instanceInfo.getInstanceTimeoutMS();
            }
            return false;
        }

        @Override
        public void run() {
            try {
                innerRun();
            } catch (Exception e) {
                log.warn("[TaskTracker-{}] status checker execute failed, please fix the bug (@tjq)!", instanceId, e);
            }
        }
    }

StatusCheckRunnable默认每隔13s会汇报一次TaskTrackerReportInstanceStatusReq,针对MAP任务,它通过holder.failedNum == 0来判断任务实例是否执行成功与否,true则更新instance的status为InstanceStatus.SUCCEED,否则为InstanceStatus.FAILED

小结

powerjob的map reduce任务实例执行结果展示的failed次数取的是failedTaskNum,它来源于TaskPersistenceService的getTaskStatusStatistics执行select status, count(*) as num from task_info where instance_id= ? and sub_instance_id=? GROUP BY status的TaskStatus.WORKER_PROCESS_FAILED的数量;默认子任务会有1次重试的机会。若有子任务失败,则最终该任务实例的状态为失败。而目前powerjob没有入口针对这些失败的子任务再进行重试,只能单独重新执行整个map reduce任务。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • InstanceStatisticsHolder
  • fetchRunningStatus
  • getInstanceStatisticsHolder
  • getTaskStatusStatistics
  • TaskTracker will have a retry
    • LightTaskTracker
      • updateTaskStatus
        • taskRetryNum
        • StatusCheckRunnable
        • 小结
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档