前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式调度中间件xxl-job(五):执行器Executor--任务执行

分布式调度中间件xxl-job(五):执行器Executor--任务执行

作者头像
闲宇非鱼
发布2022-02-08 11:29:26
1.8K0
发布2022-02-08 11:29:26
举报

一、前言

  在前面的学习中我们可以了解到,执行器的任务执行都是在触发器触发下执行的。对于触发器如何路由、具体的调度策略等等我们后面再进行学习,本章的重点是来看一看执行器是如何被调用以及执行任务的。

二、任务执行

1. 执行流程总览

  下图是触发器调用执行器执行任务的一个简单时序图:

2. 任务接收

  在上面的时序图中可以看到,触发器通过Netty向执行器发起执行任务请求。具体处理请求的逻辑在 EmbedHttpServerHandler 类的 process 方法中,这里主要看下请求处理部分的逻辑:

代码语言:javascript
复制
if ("/beat".equals(uri)) {
    return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
    IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
    return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
    TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
    return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
    KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
    return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
    LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
    return executorBiz.log(logParam);
} else {
    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}

  在 process 方法中进行心跳检测、是否空闲检测、运行任务、结束任务、日志等请求处理,这里主要看下运行任务(即”/run”)处理。

  顺便说一下,触发器发起任务执行请求请求路径为:{执行器内嵌服务跟地址}/run 。请求方式为POST请求 ,具体数据格式为:

代码语言:javascript
复制
{
    “jobId”:1,                                  // 任务ID
    “executorHandler”:”demoJobHandler”,         // 任务标识
    “executorParams”:”demoJobHandler”,          // 任务参数
    “executorBlockStrategy”:”COVER_EARLY”,      // 任务阻塞策略,可选值参考 com.xxl.job.core.enums.ExecutorBlockStrategyEnum
    “executorTimeout”:0,                        // 任务超时时间,单位秒,大于零时生效
    “logId”:1,                                  // 本次调度日志ID
    “logDateTime”:1586629003729,                // 本次调度日志时间
    “glueType”:”BEAN”,                          // 任务模式,可选值参考 com.xxl.job.core.glue.GlueTypeEnum
    “glueSource”:”xxx”,                         // GLUE脚本代码
    “glueUpdatetime”:1586629003727,             // GLUE脚本更新时间,用于判定脚本是否变更以及是否需要刷新
    “broadcastIndex”:0,                         // 分片参数:当前分片
    “broadcastTotal”:0                          // 分片参数:总分片
}

3. 获取jobThread

  可以看到在”/run”逻辑分支中执行了 executorBiz.run(triggerParam) 。在 run 方法中可以分为以下四个步骤:

  1. 获取jobThread和jobHandler;
    1. 根据TriggerParam中的jobId加载老的jobThread和jobHandler;
    2. 根据GLUE模式分别判断TriggerParam请求中的jobHandler是否和原先的jobHandler相同,如果不同则加载新的jobHandler,并将jobThread置为null;
  2. 当jobThread不为null时,从触发器请求TriggerParam中获取阻塞策略并执行。
  3. 在执行完阻塞策略之后,如果jobThread为null,则将新的jobHandler注册到任务线程库中替换原先的任务线程,即调用 XxlJobExecutor.registJobThread 方法;
  4. 最后将TriggerParam请求放置到TriggerQueue中,在这一过程中会通过logId进行任务是否重复执行检查,如果任务重复则不如对并返回错误,然后返回入队结果。

GLUE是雪里大佬自创的“可执行逻辑单元”,扩展了JVM的动态语言支持,本质上是一段可执行的代码。GLUE可以方便的嵌入业务代码中, GLUE中逻辑代码支持在线开发、动态推送更新、实时编译生效。简单来说,就是很牛皮。

4. 任务线程注册和启动

  在获取到新的任务之后,执行 XxlJobExecutor.registJobThread 进行任务线程注册,启动新的任务线程并将任务线程库中原先的任务线程替换掉。   这里主要关注一下 JobThread 类,在该类的 run 方法中进行了具体的任务执行。这里的逻辑相对还是比较容易理解的,具体代码逻辑大家可以自行阅读一下源码,我们来看下两个有意思的处理:

  • 在进行线程是否终止的判断时,单独使用了一个布尔类型的 toStop 变量来进行判断。这是由于 Thread.interrupt 只支持终止线程的阻塞状态(wait、sleep、join),在阻塞处会抛出InterruptedException,但是并不会终止运行的线程,所以这里使用 toStop 变量来标识线程是否终止;
  • 由于上面使用循环来判断 jobThread 是否终止,在其中获取 TriggerQueue 中的任务请求参数时需要使用 poll 而不是使用 take ,这是因为 take 方法在为获取到队列中元素时会一直等待直到获取可用元素,这就会造成线程的阻塞。而使用 poll 方法可以设置超时时间,配合对空闲次数的限制,可以有效的控制线程空闲的情况并防止大量线程阻塞造成程序崩溃;

三、总结

  本章简单介绍了执行器接收触发器任务执行的请求以及执行任务的过程。由于其中的部分流程和rpc调用类似,这里就不加赘述,这里主要分享了一下基本的调用流程和雪里大佬的一些有趣的处理思路。

以上纯属个人浅见,如有谬误,请各位看官大佬多多指正。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-06-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Brucebat的伪技术鱼塘 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言
  • 二、任务执行
    • 1. 执行流程总览
      • 3. 获取jobThread
        • 4. 任务线程注册和启动
        • 三、总结
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档