拉取xxl-job的代码
执行XxlJobAdminApplication启动调度中心。
然后执行:
启动执行器中心之后,然后访问admin界面:
从里面可以看到可以执行任务管理。
一、调度中心:从XxlJobAdminConfig可以看到与spring集成的重要方法:
@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;
//创建xxlJobScheduler对象
xxlJobScheduler = new XxlJobScheduler();
//执行调度中心初始化
xxlJobScheduler.init();
}
初始化操作:
//xxl-job定时任务
public void init() throws Exception {
//初始化i18n 国际化
initI18n();
//admin触发池帮助启动
JobTriggerPoolHelper.toStart();
//admin注册监控启动
JobRegistryHelper.getInstance().start();
//admin失败监控运行
JobFailMonitorHelper.getInstance().start();
//job完成监控运行
JobCompleteHelper.getInstance().start();
//admin日志report启动
JobLogReportHelper.getInstance().start();
// start-schedule ( depend on JobTriggerPoolHelper )
//启动定时任务
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
job触发池启动:此时会创建两种线程池,快线程池和慢线程池对象,这里区别在于调度的阻塞队列的容量。执行addTrigger方法时,首先会选择快线程池,如果此时的线程池处理任务出现超时,同时则会采用慢线程池,加大阻塞队列的容量。在run方法中会调用,而在启动定时任务的时候会调用
//添加触发器
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);=>
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); =>
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
触发器触发job,会对参数进行:
//加载数据
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
//加载
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
//处理触发器
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
处理触发器processTrigger的过程中,必然会触发执行器的执行,也即此时会运行executro,可以看到执行过程:
1.保日志id
2.初始化触发器参数
3.初始化地址
4.触发远程执行器
5.收集触发器信息
6.保存日志触发器信息
而这里我们关心的就是执行器的运行:
/**
* run executor 运行执行器
* @param triggerParam
* @param address
* @return
*/
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
//获取执行器业务
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
//进行运行
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
getExecutorBiz可以看到:
//获取执行业务
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
// valid
if (address==null || address.trim().length()==0) {
return null;
}
// load-cache
address = address.trim();
ExecutorBiz executorBiz = executorBizRepository.get(address);
if (executorBiz != null) {
return executorBiz;
}
// set-cache
executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());
executorBizRepository.put(address, executorBiz);
return executorBiz;
}
run逻辑:
//启动
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread
//加载job线程
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
//判断job线程是否为空
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// valid:jobHandler + jobThread
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
//判断粘合类型是否是bean,如果是bean,则加载jobHandler
//校验jobHandler
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// new jobhandler
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// valid old jobThread
if (jobThread!=null && jobHandler != newJobHandler) {
// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
//如果是groovy
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof GlueJobHandler
&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change handler or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
try {
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
}
//如果是脚本
} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof ScriptJobHandler
&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change script or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
}
//否者直接返回失败,并进行提示
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
// executor block strategy
//执行阻塞策略,匹配策略。同时进行判断是否进行丢弃
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
//如果是覆盖,则杀掉线程
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
// replace thread (new or exists invalid)
//如果job线程为空,则替换线程 创建或者存在无效
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// push data to queue
//将触发器参数放入到触发器队列
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
如果是bean形式:
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
}
放入之后,我们需要take:
if(triggerParam != null) {
// callback handler info
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.getXxlJobContext().getHandleCode(),
XxlJobContext.getXxlJobContext().getHandleMsg() )
);
我们可以看到回调:
/**
* job results callback queue
*/
private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();
public static void pushCallBack(HandleCallbackParam callback){
getInstance().callBackQueue.add(callback);
logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
}
同时可以看到回调take过程:
//从回调队列中take任务
HandleCallbackParam callback = getInstance().callBackQueue.take();
//如果callback不为空
if (callback != null) {
// callback list param
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
// drainTo
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
//回调参数列表
callbackParamList.add(callback);
// callback, will retry if error
//回调,将会重试如果错误
if (callbackParamList!=null && callbackParamList.size()>0) {
//执行回调
doCallback(callbackParamList);
}
}
二、执行中心启动:
此时我们来看看执行中心启动的过程:
查看启动:
/**
* @author xuxueli 2018-10-31 19:05:43
*/
public class FramelessApplication {
public static void main(String[] args) {
try {
//启动 重要
// start
FrameLessXxlJobConfig.getInstance().initXxlJobExecutor();
// Blocks until interrupted
// 阻塞直至中断
while (true) {
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
break;
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// destory 销毁xxl-job执行器
FrameLessXxlJobConfig.getInstance().destoryXxlJobExecutor();
}
}
}
初始化job执行器:FrameLessXxlJobConfig.getInstance().initXxlJobExecutor();
加载xxl-job-executor.propeties中的配置,填充xxlJobExecutor,注册XxlJobBeanList中的bean。而这个注解则是
/**
* init 初始化xxlJob执行器
*/
public void initXxlJobExecutor() {
// load executor prop
//加载执行器配置
Properties xxlJobProp = loadProperties("xxl-job-executor.properties");
// init executor
//初始化执行器
xxlJobExecutor = new XxlJobSimpleExecutor();
xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname"));
xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));
// registry job bean
//注册jobBean
xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SampleXxlJob()));
// start executor
//启动执行器 重要
try {
xxlJobExecutor.start();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
可以看到xxlJobExecutor里面的信息:
而这个bean则是通过自定义注解实现的:也即我们如果需要执行一个定时任务,可以通过@XxlJob来实现任务bea注册。
/**
* annotation for method jobhandler
* 自定义注册job处理器方法
*
* @author xuxueli 2019-12-11 20:50:13
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface XxlJob {
/**
* jobhandler name
* 任务处理器的名称
*/
String value();
/**
* init handler, invoked when JobThread init
* 初始化handler,激活当job线程初始化时
*/
String init() default "";
/**
* destroy handler, invoked when JobThread destroy
* 销毁处理器,激活job线程销毁
*/
String destroy() default "";
}
而从里面可以看到xxlJobExecutor里面有7个方法,通过反射拿到这些方法:
拿到方法,并放入到map中:
由于此时只是进行初始化,因此jobHandlerRepository还不存在配置信息,通过name拿不到配置。
同时判断拿到的方法是否存在init()、destory()方法,如果不存在,则执行注册jobHandler方法,放入到jobHandlerRepository中,而可以看到IJobHandler里面有三个方法:execute()、init()、destory()方法
执行注册:
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
有下面方式:
@XxlJob("demoJobHandler") //方式一,采用注入需要执行的job
@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy") //方式二,带生命周期
同时还可以实现任务分分片、命令行任务和跨平台任务.
/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {
// 分片参数 分片索引和分片总数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
// 业务逻辑
for (int i = 0; i < shardTotal; i++) {
if (i == shardIndex) {
XxlJobHelper.log("第 {} 片, 命中分片开始处理", i);
} else {
XxlJobHelper.log("第 {} 片, 忽略", i);
}
}
}
配置加载完成,属性信息填充完,将bean进行注册。完成之后,便可以执行任务处理器启动。
而执行执行任务处理器启动时,会首先初始化job处理器方法
//启动执行器
public void start() {
// init JobHandler Repository (for method)
//初始化job处理器reposity
initJobHandlerMethodRepository(xxlJobBeanList);
// super start
//父类启动
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
执行启动:
//启动和停止
public void start() throws Exception {
// init logpath
//初始化logPath 将logBasePath = logPath,创建文件路径、目录
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client
//初始化激活,admin客户端
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
//初始化job日志文件清理线程,使用递归删除文件
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
//初始化触发回调线程
TriggerCallbackThread.getInstance().start();
// init executor-server
//初始化执行服务 重要 服务器启动
initEmbedServer(address, ip, port, appname, accessToken);
}
初始化服嵌入服务器:executor服务,执行的过程中,会填充相关服务器的配置信息,然后执行启动服务器,可以看到服务器使用的Netty,同时其里面的handler处理与前面的业务队列形成对应。
//启动应用服务器
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
//业务线程池
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0,200,60L,TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
try {
//启动服务
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
//执行绑定
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// start registry
//启动注册
startRegistry(appname, address);
// wait util stop
future.channel().closeFuture().sync();
} finally {
// stop 优雅停机
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
关注EmbedHttpServerHandler和 startRegistry(appname, address):
netty中执行业务处理:对读事件进行处理
//执行read操作
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// request parse
String requestData = msg.content().toString(CharsetUtil.UTF_8);
String uri = msg.uri();
HttpMethod httpMethod = msg.method();
boolean keepAlive = HttpUtil.isKeepAlive(msg);
String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
// invoke
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
//执行处理
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// to json
String responseJson = GsonTool.toJson(responseObj);
//写入响应
writeResponse(ctx, keepAlive, responseJson);
}
});
}
//处理
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// services mapping
//服务映射
try {
if ("/beat".equals(uri)) {
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
}
/**
* write response
*/
private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
// write response
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson)
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8"); // HttpHeaderValues.TEXT_PLAIN.toString()
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
if (keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.writeAndFlush(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
这里的业务处理与前面调度中心中我们执行业务服务的run时,将业务执行的任务放入队列中,形成照应,也即有任务来时,就会触发EmbedHttpServerHandler中的读事件的处理。
注册startRegistry(appname, address):包括注册线程和注册移除
//执行注册操作
ReturnT<String> registryResult = adminBiz.registry(registryParam);
//执行注册移除操作
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
进行adminBizClient:
//执行注册操作,类似采用httpClient,api/registry
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
//注册移除,类似httpClient调用,api/registryRemove
@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
}
最终调用:
//job注册帮助类
public ReturnT<String> registry(RegistryParam registryParam) {
// valid
if (!StringUtils.hasText(registryParam.getRegistryGroup())
|| !StringUtils.hasText(registryParam.getRegistryKey())
|| !StringUtils.hasText(registryParam.getRegistryValue())) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
}
// async execute
//异步执行
registryOrRemoveThreadPool.execute(new Runnable() {
@Override
public void run() {
//注册更新
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
//ret<1,则执行注册保存操作
if (ret < 1) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
// fresh
//执行刷新操作
freshGroupRegistryInfo(registryParam);
}
}
});
return ReturnT.SUCCESS;
}
而从Handler可看到处理的请求则是:
if ("/beat".equals(uri)) {
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
}
}
而run请求则是:判断不同类型的请求,采用不同的方式接,将触发器参数放入到触发器队列
//启动
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread
//加载job线程
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
//判断job线程是否为空
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// valid:jobHandler + jobThread
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
//判断粘合类型是否是bean,如果是bean,则加载jobHandler
//校验jobHandler,对不同的枚举类型进行适配,从而执行相应的方式完成
// executor block strategy
//执行阻塞策略,匹配策略。同时进行判断是否进行丢弃
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
//如果是覆盖,则杀掉线程
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
//如果job线程为空,则替换线程 创建或者存在无效
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
//将触发器参数放入到触发器队列
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
将触发器放入队列:
/**
* new trigger to queue
* 新触发器到队列
*
* @param triggerParam
* @return
*/
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
// avoid repeat
if (triggerLogIdSet.contains(triggerParam.getLogId())) {
logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
}
triggerLogIdSet.add(triggerParam.getLogId());
//添加到触发队列
triggerQueue.add(triggerParam);
return ReturnT.SUCCESS;
}
而写入响应:
/**
* write response
*/
private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
// write response
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson)
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8"); // HttpHeaderValues.TEXT_PLAIN.toString()
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
if (keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.writeAndFlush(response);
}
除此之外,xxl-job还有相关的任务分配策略:一致性hash、LRU、LFU、随机、轮询等。