在前面的学习中我们可以了解到,执行器的任务执行都是在触发器触发下执行的。对于触发器如何路由、具体的调度策略等等我们后面再进行学习,本章的重点是来看一看执行器是如何被调用以及执行任务的。
下图是触发器调用执行器执行任务的一个简单时序图:
2. 任务接收
在上面的时序图中可以看到,触发器通过Netty向执行器发起执行任务请求。具体处理请求的逻辑在 EmbedHttpServerHandler
类的 process
方法中,这里主要看下请求处理部分的逻辑:
if ("/beat".equals(uri)) {
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
在 process
方法中进行心跳检测、是否空闲检测、运行任务、结束任务、日志等请求处理,这里主要看下运行任务(即”/run”)处理。
顺便说一下,触发器发起任务执行请求请求路径为:{执行器内嵌服务跟地址}/run 。请求方式为POST请求 ,具体数据格式为:
{
“jobId”:1, // 任务ID
“executorHandler”:”demoJobHandler”, // 任务标识
“executorParams”:”demoJobHandler”, // 任务参数
“executorBlockStrategy”:”COVER_EARLY”, // 任务阻塞策略,可选值参考 com.xxl.job.core.enums.ExecutorBlockStrategyEnum
“executorTimeout”:0, // 任务超时时间,单位秒,大于零时生效
“logId”:1, // 本次调度日志ID
“logDateTime”:1586629003729, // 本次调度日志时间
“glueType”:”BEAN”, // 任务模式,可选值参考 com.xxl.job.core.glue.GlueTypeEnum
“glueSource”:”xxx”, // GLUE脚本代码
“glueUpdatetime”:1586629003727, // GLUE脚本更新时间,用于判定脚本是否变更以及是否需要刷新
“broadcastIndex”:0, // 分片参数:当前分片
“broadcastTotal”:0 // 分片参数:总分片
}
可以看到在”/run”逻辑分支中执行了 executorBiz.run(triggerParam)
。在 run
方法中可以分为以下四个步骤:
XxlJobExecutor.registJobThread
方法;GLUE是雪里大佬自创的“可执行逻辑单元”,扩展了JVM的动态语言支持,本质上是一段可执行的代码。GLUE可以方便的嵌入业务代码中, GLUE中逻辑代码支持在线开发、动态推送更新、实时编译生效。简单来说,就是很牛皮。
在获取到新的任务之后,执行 XxlJobExecutor.registJobThread
进行任务线程注册,启动新的任务线程并将任务线程库中原先的任务线程替换掉。
这里主要关注一下 JobThread
类,在该类的 run
方法中进行了具体的任务执行。这里的逻辑相对还是比较容易理解的,具体代码逻辑大家可以自行阅读一下源码,我们来看下两个有意思的处理:
toStop
变量来进行判断。这是由于 Thread.interrupt
只支持终止线程的阻塞状态(wait、sleep、join),在阻塞处会抛出InterruptedException,但是并不会终止运行的线程,所以这里使用 toStop
变量来标识线程是否终止;jobThread
是否终止,在其中获取 TriggerQueue
中的任务请求参数时需要使用 poll
而不是使用 take
,这是因为 take
方法在为获取到队列中元素时会一直等待直到获取可用元素,这就会造成线程的阻塞。而使用 poll
方法可以设置超时时间,配合对空闲次数的限制,可以有效的控制线程空闲的情况并防止大量线程阻塞造成程序崩溃;本章简单介绍了执行器接收触发器任务执行的请求以及执行任务的过程。由于其中的部分流程和rpc调用类似,这里就不加赘述,这里主要分享了一下基本的调用流程和雪里大佬的一些有趣的处理思路。
以上纯属个人浅见,如有谬误,请各位看官大佬多多指正。
本文分享自 Brucebat的伪技术鱼塘 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!