前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >XXL-JOB系列三之调度中心初始化

XXL-JOB系列三之调度中心初始化

原创
作者头像
用户9511949
修改2024-07-02 14:31:56
900
修改2024-07-02 14:31:56
举报
文章被收录于专栏:XXL-JOBXXL-JOB

1 XxlJobScheduler

调度中心启动之后,会调用XxlJobScheduler.init方法进行初始化,代码如下

代码语言:javascript
复制
public class XxlJobScheduler  {
    ......
    public void init() throws Exception {
        // 初始化语言,支持中文、中文繁体、英文
        initI18n();
        // 触发器线程池初始化,初始化了两个线程池,fastTriggerPool和slowTriggerPool
        JobTriggerPoolHelper.toStart();
        // 初始化执行器请求处理线程池以及启动执行器定时刷新线程
        JobRegistryHelper.getInstance().start();
        // 启动调度失败重试线程
        JobFailMonitorHelper.getInstance().start();
        // 初始化执行器回调请求处理线程池以及启动任务结果丢失处理线程
        JobCompleteHelper.getInstance().start();
        // 启动调度中心报表统计线程
        JobLogReportHelper.getInstance().start();
        // 启动任务调度线程
        JobScheduleHelper.getInstance().start();
    }
}

初始化语言的方法比较简单就不看了,下面依次看下下其他步骤的代码

JobTriggerPoolHelper.start方法

代码语言:javascript
复制
public void start(){
    fastTriggerPool = new ThreadPoolExecutor(
            10,
            XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(1000),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                }
            });
    slowTriggerPool = new ThreadPoolExecutor(
            10,
            XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(2000),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                }
            });
}

可以看到,就是初始化了两个线程池,fastTriggerPool和slowTriggerPool,两个线程池的区别在于最大线程数和任务队列的大小不同,使用上有什么区别呢?在JobTriggerPoolHelper中的addTrigger方法中在触发任务时有个选择线程池的逻辑,如下

代码语言:javascript
复制
public void addTrigger(final int jobId,
                       final TriggerTypeEnum triggerType,
                       final int failRetryCount,
                       final String executorShardingParam,
                       final String executorParam,
                       final String addressList) {
    // choose thread pool
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
        triggerPool_ = slowTriggerPool;
    }
    ......
}

如果Job在最近一分钟之类超时超过10次(此处的超时不是指执行器执行业务逻辑的时间超时,而是调度中心完成任务调度的时间超时,默认500ms),说明该任务调度比较慢,所以将其调度到slowTriggerPool中执行,总的来说就是将调度较快和调度较慢的任务丢到不同的线程池中去执行,互相不影响。

JobRegistryHelper.start方法

代码语言:javascript
复制
public void start(){
    // for registry or remove
    registryOrRemoveThreadPool = new ThreadPoolExecutor(
          2,
          10,
          30L,
          TimeUnit.SECONDS,
          new LinkedBlockingQueue<Runnable>(2000),
          new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
                return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
             }
          },
          new RejectedExecutionHandler() {
             @Override
             public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                r.run();
                logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
             }
          });
    // for monitor
    registryMonitorThread = new Thread(new Runnable() {
       @Override
       public void run() {
          while (!toStop) {
             try {
                // auto registry group
                List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
                if (groupList!=null && !groupList.isEmpty()) {

                   // remove dead address (admin/executor)
                   List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
                   if (ids!=null && ids.size()>0) {
                      XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
                   }

                   // fresh online address (admin/executor)
                   HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
                   List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
                   if (list != null) {
                      for (XxlJobRegistry item: list) {
                         if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
                            String appname = item.getRegistryKey();
                            List<String> registryList = appAddressMap.get(appname);
                            if (registryList == null) {
                               registryList = new ArrayList<String>();
                            }

                            if (!registryList.contains(item.getRegistryValue())) {
                               registryList.add(item.getRegistryValue());
                            }
                            appAddressMap.put(appname, registryList);
                         }
                      }
                   }
                   // fresh group address
                   for (XxlJobGroup group: groupList) {
                      List<String> registryList = appAddressMap.get(group.getAppname());
                      String addressListStr = null;
                      if (registryList!=null && !registryList.isEmpty()) {
                         Collections.sort(registryList);
                         StringBuilder addressListSB = new StringBuilder();
                         for (String item:registryList) {
                            addressListSB.append(item).append(",");
                         }
                         addressListStr = addressListSB.toString();
                         addressListStr = addressListStr.substring(0, addressListStr.length()-1);
                      }
                      group.setAddressList(addressListStr);
                      group.setUpdateTime(new Date());

                      XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
                   }
                }
             } catch (Exception e) {
                if (!toStop) {
                   logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                }
             }
             try {
                TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
             } catch (InterruptedException e) {
                if (!toStop) {
                   logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
                }
             }
          }
          logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
       }
    });
    registryMonitorThread.setDaemon(true);
    registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
    registryMonitorThread.start();
}

第一步初始化registryOrRemoveThreadPool,用来处理执行器的注册和删除请求

第二步启动了一个执行器监控线程,每隔30s将自动注册的执行器从xxl_job_group表中查出来,从xxl_job_registry表中删除过期的执行器(最后更新时间+90s < 当前时间),并且将有效的执行器的地址列表填入xxl_job_group的addressList字段供后续调度使用。

JobFailMonitorHelper.start

代码语言:javascript
复制
private Thread monitorThread;
private volatile boolean toStop = false;
public void start(){
    monitorThread = new Thread(new Runnable() {
       @Override
       public void run() {
          // monitor
          while (!toStop) {
             try {
                List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
                if (failLogIds!=null && !failLogIds.isEmpty()) {
                   for (long failLogId: failLogIds) {

                      // lock log
                      int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
                      if (lockRet < 1) {
                         continue;
                      }
                      XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
                      XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

                      // 1、fail retry monitor
                      if (log.getExecutorFailRetryCount() > 0) {
                         JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
                         String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
                         log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
                         XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
                      }

                      // 2、fail alarm monitor
                      int newAlarmStatus = 0;       // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
                      if (info != null) {
                         boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
                         newAlarmStatus = alarmResult?2:3;
                      } else {
                         newAlarmStatus = 1;
                      }

                      XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
                   }
                }

             } catch (Exception e) {
                if (!toStop) {
                   logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                }
             }

                   try {
                       TimeUnit.SECONDS.sleep(10);
                   } catch (Exception e) {
                       if (!toStop) {
                           logger.error(e.getMessage(), e);
                       }
                   }

               }

          logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");

       }
    });
    monitorThread.setDaemon(true);
    monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
    monitorThread.start();
}

针对每一次调度会在xxl_job_log中保存一次调度记录,该方法的作用是

第一步启动一个线程每10秒扫描一次调度失败且告警状态为0的记录

第二步锁定告警状态

第三步如果重试次数大于0,那么执行重试的逻辑(重试又会增加一条记录,并且重试次数字段减1,重试时如果是分片执行,需要带上对应的分片信息)

第四步发送告警,更新记录的告警状态字段

JobCompleteHelper.start

代码语言:javascript
复制
public void start(){

    // for callback
    callbackThreadPool = new ThreadPoolExecutor(
          2,
          20,
          30L,
          TimeUnit.SECONDS,
          new LinkedBlockingQueue<Runnable>(3000),
          new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
                return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode());
             }
          },
          new RejectedExecutionHandler() {
             @Override
             public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                r.run();
                logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");
             }
          });
    // for monitor
    monitorThread = new Thread(new Runnable() {

       @Override
       public void run() {

          // wait for JobTriggerPoolHelper-init
          try {
             TimeUnit.MILLISECONDS.sleep(50);
          } catch (InterruptedException e) {
             if (!toStop) {
                logger.error(e.getMessage(), e);
             }
          }

          // monitor
          while (!toStop) {
             try {
                // 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
                Date losedTime = DateUtil.addMinutes(new Date(), -10);
                List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

                if (losedJobIds!=null && losedJobIds.size()>0) {
                   for (Long logId: losedJobIds) {

                      XxlJobLog jobLog = new XxlJobLog();
                      jobLog.setId(logId);

                      jobLog.setHandleTime(new Date());
                      jobLog.setHandleCode(ReturnT.FAIL_CODE);
                      jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );

                      XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
                   }

                }
             } catch (Exception e) {
                if (!toStop) {
                   logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
                }
             }

                   try {
                       TimeUnit.SECONDS.sleep(60);
                   } catch (Exception e) {
                       if (!toStop) {
                           logger.error(e.getMessage(), e);
                       }
                   }

               }

          logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");

       }
    });
    monitorThread.setDaemon(true);
    monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");
    monitorThread.start();
}

JobCompleteHelper初始化了一个回调请求处理线程池,紧接着启动了一个监控线程monitorThread,每隔60s扫描一次一直处于运行中且持续时间超过10分钟的调度记录,并手动设置为失败状态

JobLogReportHelper的逻辑比较简单,就是启动了一个线程每隔一分钟去统计下前三天的调度日志统计,比如运行中、失败、成功的数量等供前段页面展示,并且删除过期的调度日志记录

最后看下JobScheduleHelper.start方法,这个方法比较复杂,属于调度中心的核心逻辑,代码如下

代码语言:javascript
复制
public void start(){
    // 1.启动一个调度线程
    scheduleThread = new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                // 1.1 启动之后暂停5整秒执行
                TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
            } catch (InterruptedException e) {
                if (!scheduleThreadToStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            // 1.2 获取每次扫描的任务数量(fastPoolSize:200 + slowPoolSize:100)* 20 = 6000
            // 默认每次只能处理6000个任务,使用时要注意这个参数
            int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
            while (!scheduleThreadToStop) {
                long start = System.currentTimeMillis();
                Connection conn = null;
                Boolean connAutoCommit = null;
                PreparedStatement preparedStatement = null;
                boolean preReadSuc = true;
                try {
                    conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                    connAutoCommit = conn.getAutoCommit();
                    conn.setAutoCommit(false);
                    // 1.3 获取调度锁,防止调度中心集群部署时多次调度
                    preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                    preparedStatement.execute();

                    // tx start

                    // 1.4 从xxl_job_info表中获取在未来5s之内需要执行的任务
                    long nowTime = System.currentTimeMillis();
                    List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                    if (scheduleList!=null && scheduleList.size()>0) {
                        for (XxlJobInfo jobInfo: scheduleList) {
                            if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                // 如果任务的下一次执行时间 + 5s < 当前时间
                                // 说明上一次任务没有触发,然后再根据配置的调度过期策略决定是触发一次还是马上触发一次                                                        
                                MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                    // FIRE_ONCE_NOW 》 trigger
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                }
                                // 最后再更新Job的下一次执行的时间
                                refreshNextValidTime(jobInfo, new Date());
                            } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                // 如果任务的下一次执行时间 < 当前时间,直接触发
                                JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                // 更新Job的下一次执行的时间
                                refreshNextValidTime(jobInfo, new Date());                                
                                if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
                                    // 如果任务是运行中的状态,且下次执行的时间在当前时间 + 5s 之内
                                    // 那么将任务放入时间轮,等待触发
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
                                    pushTimeRing(ringSecond, jobInfo.getId());
                                    // 最后再更新Job的下一次执行的时间
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
                                }
                            } else {
                                // 如果任务的下一次执行时间 > 当前时间,将任务放入时间轮,等待触发                         
                                int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
                                pushTimeRing(ringSecond, jobInfo.getId());
                                // 更新Job的下一次执行的时间
                                refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
                            }
                        }
                        // 最后更新任务信息
                        for (XxlJobInfo jobInfo: scheduleList) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                        }
                    } else {
                        preReadSuc = false;
                    }
                    
                    // tx stop

                } catch (Exception e) {
                    if (!scheduleThreadToStop) {
                        logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                    }
                } finally {
                    // 提交事务
                    if (conn != null) {
                        try {
                            conn.commit();
                        } catch (SQLException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                        try {
                            conn.setAutoCommit(connAutoCommit);
                        } catch (SQLException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                        try {
                            conn.close();
                        } catch (SQLException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }
                    // close PreparedStatement
                    if (null != preparedStatement) {
                        try {
                            preparedStatement.close();
                        } catch (SQLException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }
                }
                long cost = System.currentTimeMillis()-start;
                // 如果以上的方法执行时间小于1s,那么等待1整秒后继续扫描任务
                // 如果大于1s就不等待,所以xxl-job对于每秒执行的任务支持有点儿问题,如果以上的方法的
                // 执行时间大于1s,那么每秒执行的任务将永远延迟
                if (cost < 1000) {
                    try {
                        TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                    } catch (InterruptedException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
        }
    });
    scheduleThread.setDaemon(true);
    scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
    scheduleThread.start();

    // 2 启动时间轮线程
    ringThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (!ringThreadToStop) {
                try {
                    // 每次等待到整秒执行
                    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                } catch (InterruptedException e) {
                    if (!ringThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                try {
                    // 获取当前时间的秒数
                    List<Integer> ringItemData = new ArrayList<>();
                    int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
                    // 避免处理耗时太长,跨过刻度,向前校验一个刻度   
                    for (int i = 0; i < 2; i++) {
                        List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                        if (tmpData != null) {
                            ringItemData.addAll(tmpData);
                        }
                    }
                    logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                    if (ringItemData.size() > 0) {
                        // 触发任务
                        for (int jobId: ringItemData) {
                            JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                        }
                        // 清理已经触发的任务
                        ringItemData.clear();
                    }
                } catch (Exception e) {
                    if (!ringThreadToStop) {
                        logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
        }
    });
    ringThread.setDaemon(true);
    ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
    ringThread.start();
}

至此调度中心的初始化完成。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
作者已关闭评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 XxlJobScheduler
相关产品与服务
腾讯云 BI
腾讯云 BI(Business Intelligence,BI)提供从数据源接入、数据建模到数据可视化分析全流程的BI能力,帮助经营者快速获取决策数据依据。系统采用敏捷自助式设计,使用者仅需通过简单拖拽即可完成原本复杂的报表开发过程,并支持报表的分享、推送等企业协作场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档