前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >XXL-JOB系列四之调度全流程

XXL-JOB系列四之调度全流程

原创
作者头像
用户9511949
发布2024-07-01 17:56:35
650
发布2024-07-01 17:56:35
举报
文章被收录于专栏:XXL-JOBXXL-JOB

1 JobTriggerPoolHelper

任务调度触发的入口在JobTriggerPoolHelper.trigger方法,调用了helper.addTrigger方法

代码语言:javascript
复制
private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
......
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
    helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);

看下helper的addTrigger方法

代码语言:javascript
复制
public void addTrigger(final int jobId,
                       final TriggerTypeEnum triggerType,
                       final int failRetryCount,
                       final String executorShardingParam,
                       final String executorParam,
                       final String addressList) {
    // choose thread pool
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
        triggerPool_ = slowTriggerPool;
    }
    // trigger
    triggerPool_.execute(new Runnable() {
        @Override
        public void run() {
            long start = System.currentTimeMillis();
            try {
                // do trigger
                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                // check timeout-count-map
                long minTim_now = System.currentTimeMillis()/60000;
                if (minTim != minTim_now) {
                    minTim = minTim_now;
                    jobTimeoutCountMap.clear();
                }
                // incr timeout-count-map
                long cost = System.currentTimeMillis()-start;
                if (cost > 500) {       // ob-timeout threshold 500ms
                    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                    if (timeoutCount != null) {
                        timeoutCount.incrementAndGet();
                    }
                }
            }
        }
    });
}

第一步选择一个线程池执行任务,如果任务在最近一分钟内调度超时的次数大于10次,就选择slowTriggerPool,否则选择fastTriggerPool,第二步提交调度任务,由XxlJobTrigger.trigger方法执行具体的逻辑,执行完成之后统计超时的次数

看看XxlJobTrigger.trigger方法的逻辑

代码语言:javascript
复制
public static void trigger(int jobId,
                           TriggerTypeEnum triggerType,
                           int failRetryCount,
                           String executorShardingParam,
                           String executorParam,
                           String addressList) {

    // load data
    XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
    if (jobInfo == null) {
        logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
        return;
    }
    if (executorParam != null) {
        jobInfo.setExecutorParam(executorParam);
    }
    int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
    XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

    // cover addressList
    if (addressList!=null && addressList.trim().length()>0) {
        group.setAddressType(1);
        group.setAddressList(addressList.trim());
    }

    // sharding param
    int[] shardingParam = null;
    if (executorShardingParam!=null){
        String[] shardingArr = executorShardingParam.split("/");
        if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
            shardingParam = new int[2];
            shardingParam[0] = Integer.valueOf(shardingArr[0]);
            shardingParam[1] = Integer.valueOf(shardingArr[1]);
        }
    }
    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
            && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
            && shardingParam==null) {
        for (int i = 0; i < group.getRegistryList().size(); i++) {
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
        }
    } else {
        if (shardingParam == null) {
            shardingParam = new int[]{0, 1};
        }
        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
    }

}

该方法逻辑比较简单,如果需要分片执行,则依次通过processTrigger方法执行,否则就封装一个单分片信息,还是调用processTrigger,所以下面看看processTrigger方法

代码语言:javascript
复制
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

    // param
    ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
    ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
    String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

    // 1、save log-id
    XxlJobLog jobLog = new XxlJobLog();
    jobLog.setJobGroup(jobInfo.getJobGroup());
    jobLog.setJobId(jobInfo.getId());
    jobLog.setTriggerTime(new Date());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
    logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

    // 2、init trigger-param
    TriggerParam triggerParam = new TriggerParam();
    triggerParam.setJobId(jobInfo.getId());
    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
    triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
    triggerParam.setLogId(jobLog.getId());
    triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
    triggerParam.setGlueType(jobInfo.getGlueType());
    triggerParam.setGlueSource(jobInfo.getGlueSource());
    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
    triggerParam.setBroadcastIndex(index);
    triggerParam.setBroadcastTotal(total);

    // 3、init address
    String address = null;
    ReturnT<String> routeAddressResult = null;
    if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
            if (index < group.getRegistryList().size()) {
                address = group.getRegistryList().get(index);
            } else {
                address = group.getRegistryList().get(0);
            }
        } else {
            routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
            if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                address = routeAddressResult.getContent();
            }
        }
    } else {
        routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
    }

    // 4、trigger remote executor
    ReturnT<String> triggerResult = null;
    if (address != null) {
        triggerResult = runExecutor(triggerParam, address);
    } else {
        triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
    }

    // 5、collection trigger info
    StringBuffer triggerMsgSb = new StringBuffer();
    triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
            .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
    if (shardingParam != null) {
        triggerMsgSb.append("("+shardingParam+")");
    }
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
    triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

    triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
            .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

    // 6、save log trigger-info
    jobLog.setExecutorAddress(address);
    jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
    jobLog.setExecutorParam(jobInfo.getExecutorParam());
    jobLog.setExecutorShardingParam(shardingParam);
    jobLog.setExecutorFailRetryCount(finalFailRetryCount);
    //jobLog.setTriggerTime();
    jobLog.setTriggerCode(triggerResult.getCode());
    jobLog.setTriggerMsg(triggerMsgSb.toString());
    XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

    logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}

第一步生成一条调度日志保存在xxl_job_log表中

第二步封装触发的参数

第三步获取执行器的地址,如果是分片,那么根据index到执行器列表中选择对应的执行器,如果index无效,则选择第一个执行器,如果不是分片,那么根据配置的路由策略选择一个执行器,xxl-job支持的路由策略如下

代码语言:javascript
复制
public enum ExecutorRouteStrategyEnum {
    // 选择第一个执行器
    FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
    // 选最后一个执行器
    LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
    // 轮询,每个job维护了一个计数器,每次加1再对执行器列表取余
    ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
    // 随机选择一个
    RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
    // 一致性Hash
    CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
    // 最近使用次数最少的优先,内部对每个Job对应的执行器维护一个计数器,选择计数器最小值对应执行器
    LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
    // 最近未使用时间最长的执行器优先,内部采用LinkedHashMap实现
    LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
    // 依次对执行器进行进行健康检查,返回第一个健康的执行器
    FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
    // 依次查看执行器是否空闲,根据执行器对应的Job的执行线程是否正在处理任务或者执行线程的任务队列中
    // 是否有任务来判断执行器是否空闲
    BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
    // 分片时使用,无须路由
    SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);
}

第四步调用runExecutor方法执行

代码语言:javascript
复制
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}
代码语言:javascript
复制
public ReturnT<String> run(TriggerParam triggerParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}

可以看出调度中心通过Http请求的方式向执行器发送了一个执行的请求

第五步记录调度的具体信息,并更新调度日志对应的字段信息,以上调度中心的逻辑就完成了。

之前的文章中提到执行器内部会启动一个NettyHttpServer用来处理调中心的请求,那么直接看netty对应的业务处理EmbedHttpServerHandler.channelRead方法即可

代码语言:javascript
复制
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
    ......
    // invoke
    bizThreadPool.execute(new Runnable() {
        @Override
        public void run() {
            // do invoke
            Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
            // to json
            String responseJson = GsonTool.toJson(responseObj);
            // write response
            writeResponse(ctx, keepAlive, responseJson);
        }
    });
}

通过bizThreadPool执行具体的业务逻辑,看下process方法

代码语言:javascript
复制
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
    ......
    // services mapping
    try {
        switch (uri) {
            case "/beat":
                return executorBiz.beat();
            case "/idleBeat":
                IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                return executorBiz.idleBeat(idleBeatParam);
            case "/run":
                TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                return executorBiz.run(triggerParam);
            case "/kill":
                KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                return executorBiz.kill(killParam);
            case "/log":
                LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                return executorBiz.log(logParam);
            default:
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
        return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
    }
}

最终调用executorBiz对应的方法执行,这里看看执行业务逻辑的run方法,主要看看GlueType为BEAN模式的处理逻辑,其他类型类似

代码语言:javascript
复制
public ReturnT<String> run(TriggerParam triggerParam) {
    // 根据JobId获取对应的任务处理线程和handler,调度中心一个Job对应的执行器一个@XxlJob注解配置的方法
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {

        // 根据传入的名称获取对应的Handler,名称为@XxlJob注解中配置的value属性
        // 执行器注册时会在内部维护一个Map,Key为@XxlJob注解配置的value属性,value为对应的method
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
        if (jobThread!=null && jobHandler != newJobHandler) {
            // 如果任务线程不为空,且最新的handler和任务线程中的hander不一致,说明handler发生了变化
            // 那么将老的任务线程和老的handler置为空
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
            jobThread = null;
            jobHandler = null;
        }
        if (jobHandler == null) {
            // 如果jobHandler 为空,则设置为最新的handler
            jobHandler = newJobHandler;
            // 如果JobHandler获取失败,直接返回错误
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }

    } else if (......) {
        ......
    } else {
        ......
    }
    
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // 根据任务处理策略为discard,那么当任务线程正在处理或者任务队列不为空时,直接丢弃任务,返回失败
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // 根据任务处理策略为cover,那么当任务线程正在处理或者任务队列不为空时,丢弃现有的所有任务
            // 将jobTread置为空
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }

    // jobThread为空,则注册一个新的任务线程到内部的一个Map结构中,并且将老的任务线程停掉
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    // 将此次任务添加到任务线程的任务队列中,添加任务时需要根据JobLogId判重,避免重复调度
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

再来看看任务线程的处理逻辑

代码语言:javascript
复制
public void run() {
        // 在@XxlJob中如果配置了初始化方法,在任务线程启动时就会先执行init方法
        try {
          handler.init();
       } catch (Throwable e) {
           logger.error(e.getMessage(), e);
       }
       // execute
       while(!toStop){
          // 设置任务线程未处理状态,闲置次数+1
          running = false;
          idleTimes++;
            TriggerParam triggerParam = null;
            try {
             // 从对列中获取任务,3s超时
             triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
             if (triggerParam!=null) {
                // 拿到任务之后,设置任务线程运行中,清除闲置次数,并且从triggerLogIdSet中删除LogId
                // triggerLogIdSet的作用是为了防止重复调度
                running = true;
                idleTimes = 0;
                triggerLogIdSet.remove(triggerParam.getLogId());

                // 生成本次调度任务执行的日志文件名称,后续根据LogId就能获取到对应的日志
                // 调度中心查看日志使用
                String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
                XxlJobContext xxlJobContext = new XxlJobContext(
                      triggerParam.getJobId(),
                      triggerParam.getExecutorParams(),
                      logFileName,
                      triggerParam.getBroadcastIndex(),
                      triggerParam.getBroadcastTotal());

                // 设置上下文
                XxlJobContext.setXxlJobContext(xxlJobContext);

                // execute
                XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());

                if (triggerParam.getExecutorTimeout() > 0) {
                   // 如果设置了超时时间,则新启动一个线程执行任务并且同步获取结果
                   Thread futureThread = null;
                   try {
                      FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                         @Override
                         public Boolean call() throws Exception {
                            // 因为是新启的线程,所以这里要重新设置一次上下文
                            XxlJobContext.setXxlJobContext(xxlJobContext);
                            handler.execute();
                            return true;
                         }
                      });
                      futureThread = new Thread(futureTask);
                      futureThread.start();

                      Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                   } catch (TimeoutException e) {

                      XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
                      XxlJobHelper.log(e);

                      // handle result
                      XxlJobHelper.handleTimeout("job execute timeout ");
                   } finally {
                      futureThread.interrupt();
                   }
                } else {
                   // 如果没有设置超时时间,直接在当前线程执行即可
                   handler.execute();
                }

                // 处理任务执行结果信息,并且将其设置到上下文中
                if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
                   XxlJobHelper.handleFail("job handle result lost.");
                } else {
                   String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
                   tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
                         ?tempHandleMsg.substring(0, 50000).concat("...")
                         :tempHandleMsg;
                   XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
                }
                XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
                      + XxlJobContext.getXxlJobContext().getHandleCode()
                      + ", handleMsg = "
                      + XxlJobContext.getXxlJobContext().getHandleMsg()
                );

             } else {
                // 如果闲置次数超过30次,且任务队列中没有任务,删除任务线程,防止占用系统资源
                if (idleTimes > 30) {
                   if(triggerQueue.size() == 0) {
                      XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                   }
                }
             }
          } catch (Throwable e) {
             // 处理任务执行异常,记录异常日志,并且将错误信息设置到上下文中
             if (toStop) {
                XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
             }

             // handle result
             StringWriter stringWriter = new StringWriter();
             e.printStackTrace(new PrintWriter(stringWriter));
             String errorMsg = stringWriter.toString();

             XxlJobHelper.handleFail(errorMsg);

             XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
          } finally {
                if(triggerParam != null) {
                    // 将任务执行结果添加到回调队列中,所有任务共用一个队列
                    if (!toStop) {
                        // commonm
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                               triggerParam.getLogId(),
                         triggerParam.getLogDateTime(),
                         XxlJobContext.getXxlJobContext().getHandleCode(),
                         XxlJobContext.getXxlJobContext().getHandleMsg() )
                   );
                    } else {
                        // is killed
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                               triggerParam.getLogId(),
                         triggerParam.getLogDateTime(),
                         XxlJobContext.HANDLE_CODE_FAIL,
                         stopReason + " [job running, killed]" )
                   );
                    }
                }
            }
        }

       // 能执行到这一步,说明任务线程被停了,那么需要将没有执行的任务也通过回调的方式通知调度中心
       while(triggerQueue !=null && triggerQueue.size()>0){
          TriggerParam triggerParam = triggerQueue.poll();
          if (triggerParam!=null) {
             // is killed
             TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                   triggerParam.getLogId(),
                   triggerParam.getLogDateTime(),
                   XxlJobContext.HANDLE_CODE_FAIL,
                   stopReason + " [job not executed, in the job queue, killed.]")
             );
          }
       }

       // 最终调用handler的destroy方法,该方法同样也是在@XxlJob注解中配置
       try {
          handler.destroy();
       } catch (Throwable e) {
          logger.error(e.getMessage(), e);
       }
       logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());

最后由TriggerCallbackThread不断的从回调任务队列中获取任务,调用调度中心的接口回传结果,最终由调度中心的JobCompleteHelper中的callbackThreadPool处理回调结果,更新xxl_job_log表对应的字段信息

代码语言:javascript
复制
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
		callbackThreadPool.execute(new Runnable() {
			@Override
			public void run() {
				for (HandleCallbackParam handleCallbackParam: callbackParamList) {
					ReturnT<String> callbackResult = callback(handleCallbackParam);
					logger.debug(">>>>>>>>> JobApiController.callback {}, handleCallbackParam={}, callbackResult={}",
							(callbackResult.getCode()== ReturnT.SUCCESS_CODE?"success":"fail"), handleCallbackParam, callbackResult);
				}
			}
		});
		return ReturnT.SUCCESS;
}

至此整个调度的流程结束

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 JobTriggerPoolHelper
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档