前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >xxl-job学习

xxl-job学习

作者头像
路行的亚洲
发布2021-02-03 10:09:35
1.4K0
发布2021-02-03 10:09:35
举报
文章被收录于专栏:后端技术学习

拉取xxl-job的代码

执行XxlJobAdminApplication启动调度中心。

然后执行:

启动执行器中心之后,然后访问admin界面:

从里面可以看到可以执行任务管理。

一、调度中心:从XxlJobAdminConfig可以看到与spring集成的重要方法:

代码语言:javascript
复制
@Override
public void afterPropertiesSet() throws Exception {
    adminConfig = this;

    //创建xxlJobScheduler对象
    xxlJobScheduler = new XxlJobScheduler();
    //执行调度中心初始化
    xxlJobScheduler.init();
}

初始化操作:

代码语言:javascript
复制
//xxl-job定时任务
public void init() throws Exception {
    //初始化i18n 国际化
    initI18n();

    //admin触发池帮助启动
    JobTriggerPoolHelper.toStart();

    //admin注册监控启动
    JobRegistryHelper.getInstance().start();

    //admin失败监控运行
    JobFailMonitorHelper.getInstance().start();

    //job完成监控运行
    JobCompleteHelper.getInstance().start();

    //admin日志report启动
    JobLogReportHelper.getInstance().start();

    // start-schedule  ( depend on JobTriggerPoolHelper )
    //启动定时任务
    JobScheduleHelper.getInstance().start();

    logger.info(">>>>>>>>> init xxl-job admin success.");
}

job触发池启动:此时会创建两种线程池,快线程池和慢线程池对象,这里区别在于调度的阻塞队列的容量。执行addTrigger方法时,首先会选择快线程池,如果此时的线程池处理任务出现超时,同时则会采用慢线程池,加大阻塞队列的容量。在run方法中会调用,而在启动定时任务的时候会调用

代码语言:javascript
复制
//添加触发器
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);=>
 // do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null); =>    
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);

触发器触发job,会对参数进行:

代码语言:javascript
复制
//加载数据
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
 //加载
        XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
//处理触发器
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());

处理触发器processTrigger的过程中,必然会触发执行器的执行,也即此时会运行executro,可以看到执行过程:

代码语言:javascript
复制
1.保日志id
2.初始化触发器参数
3.初始化地址
4.触发远程执行器
5.收集触发器信息
6.保存日志触发器信息      

而这里我们关心的就是执行器的运行:

代码语言:javascript
复制
/**
 * run executor 运行执行器
 * @param triggerParam
 * @param address
 * @return
 */
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        //获取执行器业务
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        //进行运行
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}

getExecutorBiz可以看到:

代码语言:javascript
复制
//获取执行业务
    public static ExecutorBiz getExecutorBiz(String address) throws Exception {
        // valid
        if (address==null || address.trim().length()==0) {
            return null;
        }

        // load-cache
        address = address.trim();
        ExecutorBiz executorBiz = executorBizRepository.get(address);
        if (executorBiz != null) {
            return executorBiz;
        }

        // set-cache
        executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());

        executorBizRepository.put(address, executorBiz);
        return executorBiz;
    }

run逻辑:

代码语言:javascript
复制
 //启动
    @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        // load old:jobHandler + jobThread
        //加载job线程
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        //判断job线程是否为空
        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
        String removeOldReason = null;

        // valid:jobHandler + jobThread
        GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
        //判断粘合类型是否是bean,如果是bean,则加载jobHandler
        //校验jobHandler
        if (GlueTypeEnum.BEAN == glueTypeEnum) {

            // new jobhandler
            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

            // valid old jobThread
            if (jobThread!=null && jobHandler != newJobHandler) {
                // change handler, need kill old thread
                removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = newJobHandler;
                if (jobHandler == null) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                }
            }

            //如果是groovy
        } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof GlueJobHandler
                        && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change handler or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                try {
                    IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                    jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
                }
            }
            //如果是脚本
        } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {

            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof ScriptJobHandler
                            && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change script or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
            }
        //否者直接返回失败,并进行提示
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
        }

        // executor block strategy
        //执行阻塞策略,匹配策略。同时进行判断是否进行丢弃
        if (jobThread != null) {
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                // discard when running
                if (jobThread.isRunningOrHasQueue()) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                }
                //如果是覆盖,则杀掉线程
            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                // kill running jobThread
                if (jobThread.isRunningOrHasQueue()) {
                    removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                    jobThread = null;
                }
            } else {
                // just queue trigger
            }
        }

        // replace thread (new or exists invalid)
        //如果job线程为空,则替换线程 创建或者存在无效
        if (jobThread == null) {
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }

        // push data to queue
        //将触发器参数放入到触发器队列
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
    }

如果是bean形式:

代码语言:javascript
复制
public static IJobHandler loadJobHandler(String name){
    return jobHandlerRepository.get(name);
}

放入之后,我们需要take:

代码语言:javascript
复制
 if(triggerParam != null) {
                    // callback handler info
                    if (!toStop) {
                        // commonm
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                              triggerParam.getLogId(),
                        triggerParam.getLogDateTime(),
                        XxlJobContext.getXxlJobContext().getHandleCode(),
                        XxlJobContext.getXxlJobContext().getHandleMsg() )
                  );

我们可以看到回调:

代码语言:javascript
复制
/**
 * job results callback queue
 */
private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();
public static void pushCallBack(HandleCallbackParam callback){
    getInstance().callBackQueue.add(callback);
    logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
}

同时可以看到回调take过程:

代码语言:javascript
复制
//从回调队列中take任务
HandleCallbackParam callback = getInstance().callBackQueue.take();
//如果callback不为空
if (callback != null) {

    // callback list param
    List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
    // drainTo
    int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
    //回调参数列表
    callbackParamList.add(callback);

    // callback, will retry if error
    //回调,将会重试如果错误
    if (callbackParamList!=null && callbackParamList.size()>0) {
        //执行回调
        doCallback(callbackParamList);
    }
}

二、执行中心启动:

此时我们来看看执行中心启动的过程:

查看启动:

代码语言:javascript
复制
/**
 * @author xuxueli 2018-10-31 19:05:43
 */
public class FramelessApplication {
    public static void main(String[] args) {
        try {
            //启动 重要
            // start
            FrameLessXxlJobConfig.getInstance().initXxlJobExecutor();

            // Blocks until interrupted
            // 阻塞直至中断
            while (true) {
                try {
                    TimeUnit.HOURS.sleep(1);
                } catch (InterruptedException e) {
                    break;
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            // destory  销毁xxl-job执行器
            FrameLessXxlJobConfig.getInstance().destoryXxlJobExecutor();
        }

    }

}

初始化job执行器:FrameLessXxlJobConfig.getInstance().initXxlJobExecutor();

加载xxl-job-executor.propeties中的配置,填充xxlJobExecutor,注册XxlJobBeanList中的bean。而这个注解则是

代码语言:javascript
复制
/**
 * init 初始化xxlJob执行器
 */
public void initXxlJobExecutor() {

    // load executor prop
    //加载执行器配置
    Properties xxlJobProp = loadProperties("xxl-job-executor.properties");

    // init executor
    //初始化执行器
    xxlJobExecutor = new XxlJobSimpleExecutor();
    xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
    xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
    xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname"));
    xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
    xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
    xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
    xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
    xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));

    // registry job bean
    //注册jobBean
    xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SampleXxlJob()));

    // start executor
    //启动执行器 重要
    try {
        xxlJobExecutor.start();
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
}

可以看到xxlJobExecutor里面的信息:

而这个bean则是通过自定义注解实现的:也即我们如果需要执行一个定时任务,可以通过@XxlJob来实现任务bea注册。

代码语言:javascript
复制
/**
 * annotation for method jobhandler
 *  自定义注册job处理器方法
 *
 * @author xuxueli 2019-12-11 20:50:13
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
public @interface XxlJob {

    /**
     * jobhandler name
     * 任务处理器的名称
     */
    String value();

    /**
     * init handler, invoked when JobThread init
     * 初始化handler,激活当job线程初始化时
     */
    String init() default "";

    /**
     * destroy handler, invoked when JobThread destroy
     * 销毁处理器,激活job线程销毁
     */
    String destroy() default "";

}

而从里面可以看到xxlJobExecutor里面有7个方法,通过反射拿到这些方法:

拿到方法,并放入到map中:

由于此时只是进行初始化,因此jobHandlerRepository还不存在配置信息,通过name拿不到配置。

同时判断拿到的方法是否存在init()、destory()方法,如果不存在,则执行注册jobHandler方法,放入到jobHandlerRepository中,而可以看到IJobHandler里面有三个方法:execute()、init()、destory()方法

执行注册:

代码语言:javascript
复制
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));

有下面方式:

代码语言:javascript
复制
@XxlJob("demoJobHandler")  //方式一,采用注入需要执行的job
@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")  //方式二,带生命周期

同时还可以实现任务分分片、命令行任务和跨平台任务.

代码语言:javascript
复制
/**
 * 2、分片广播任务
 */
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {

    // 分片参数 分片索引和分片总数
    int shardIndex = XxlJobHelper.getShardIndex();
    int shardTotal = XxlJobHelper.getShardTotal();

    XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);

    // 业务逻辑
    for (int i = 0; i < shardTotal; i++) {
        if (i == shardIndex) {
            XxlJobHelper.log("第 {} 片, 命中分片开始处理", i);
        } else {
            XxlJobHelper.log("第 {} 片, 忽略", i);
        }
    }

}

配置加载完成,属性信息填充完,将bean进行注册。完成之后,便可以执行任务处理器启动。

而执行执行任务处理器启动时,会首先初始化job处理器方法

代码语言:javascript
复制
//启动执行器
public void start() {

    // init JobHandler Repository (for method)
    //初始化job处理器reposity
    initJobHandlerMethodRepository(xxlJobBeanList);

    // super start
    //父类启动
    try {
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

执行启动:

代码语言:javascript
复制
//启动和停止
public void start() throws Exception {

    // init logpath
    //初始化logPath 将logBasePath = logPath,创建文件路径、目录
    XxlJobFileAppender.initLogPath(logPath);

    // init invoker, admin-client
    //初始化激活,admin客户端
    initAdminBizList(adminAddresses, accessToken);


    // init JobLogFileCleanThread
    //初始化job日志文件清理线程,使用递归删除文件
    JobLogFileCleanThread.getInstance().start(logRetentionDays);

    // init TriggerCallbackThread
    //初始化触发回调线程
    TriggerCallbackThread.getInstance().start();

    // init executor-server
    //初始化执行服务  重要 服务器启动
    initEmbedServer(address, ip, port, appname, accessToken);
}

初始化服嵌入服务器:executor服务,执行的过程中,会填充相关服务器的配置信息,然后执行启动服务器,可以看到服务器使用的Netty,同时其里面的handler处理与前面的业务队列形成对应。

代码语言:javascript
复制
//启动应用服务器
public void start(final String address, final int port, final String appname, final String accessToken) {
    executorBiz = new ExecutorBizImpl();
    thread = new Thread(new Runnable() {
        @Override
        public void run() {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            //业务线程池
            ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0,200,60L,TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
                        }
                    },
                    new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                        }
                    });
            try {
                //启动服务
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline()
                                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                        .addLast(new HttpServerCodec())
                                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                            }
                        })
                        .childOption(ChannelOption.SO_KEEPALIVE, true);
                //执行绑定
                ChannelFuture future = bootstrap.bind(port).sync();
                logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
                // start registry
                //启动注册
                startRegistry(appname, address);
                // wait util stop
                future.channel().closeFuture().sync();
            }  finally {
                // stop 优雅停机
                try {
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }

        }

    });
    thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    thread.start();
}

关注EmbedHttpServerHandler和 startRegistry(appname, address):

netty中执行业务处理:对读事件进行处理

代码语言:javascript
复制
   //执行read操作
    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

        // request parse
        String requestData = msg.content().toString(CharsetUtil.UTF_8);
        String uri = msg.uri();
        HttpMethod httpMethod = msg.method();
        boolean keepAlive = HttpUtil.isKeepAlive(msg);
        String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);

        // invoke
        bizThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                //执行处理
                Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
                // to json
                String responseJson = GsonTool.toJson(responseObj);
                //写入响应
                writeResponse(ctx, keepAlive, responseJson);
            }
        });
    }

    //处理
    private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
        // services mapping
        //服务映射
        try {
            if ("/beat".equals(uri)) {
                return executorBiz.beat();
            } else if ("/idleBeat".equals(uri)) {
                IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                return executorBiz.idleBeat(idleBeatParam);
            } else if ("/run".equals(uri)) {
                TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                return executorBiz.run(triggerParam);
            } else if ("/kill".equals(uri)) {
                KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                return executorBiz.kill(killParam);
            } else if ("/log".equals(uri)) {
                LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                return executorBiz.log(logParam);
            } else {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
        }
    }

    /**
     * write response
     */
    private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
        // write response
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));   //  Unpooled.wrappedBuffer(responseJson)
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");       // HttpHeaderValues.TEXT_PLAIN.toString()
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        if (keepAlive) {
            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }
        ctx.writeAndFlush(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

这里的业务处理与前面调度中心中我们执行业务服务的run时,将业务执行的任务放入队列中,形成照应,也即有任务来时,就会触发EmbedHttpServerHandler中的读事件的处理。

注册startRegistry(appname, address):包括注册线程和注册移除

代码语言:javascript
复制
//执行注册操作
ReturnT<String> registryResult = adminBiz.registry(registryParam);
//执行注册移除操作
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);

进行adminBizClient:

代码语言:javascript
复制
//执行注册操作,类似采用httpClient,api/registry
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}

//注册移除,类似httpClient调用,api/registryRemove
@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
}

最终调用:

代码语言:javascript
复制
 //job注册帮助类
public ReturnT<String> registry(RegistryParam registryParam) {

   // valid
   if (!StringUtils.hasText(registryParam.getRegistryGroup())
         || !StringUtils.hasText(registryParam.getRegistryKey())
         || !StringUtils.hasText(registryParam.getRegistryValue())) {
      return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
   }

   // async execute
   //异步执行
   registryOrRemoveThreadPool.execute(new Runnable() {
      @Override
      public void run() {
         //注册更新
         int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
         //ret<1,则执行注册保存操作
         if (ret < 1) {
            XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
            // fresh
            //执行刷新操作
            freshGroupRegistryInfo(registryParam);
         }
      }
   });

   return ReturnT.SUCCESS;
}

而从Handler可看到处理的请求则是:

代码语言:javascript
复制
 if ("/beat".equals(uri)) {
                return executorBiz.beat();
            } else if ("/idleBeat".equals(uri)) {
                IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                return executorBiz.idleBeat(idleBeatParam);
            } else if ("/run".equals(uri)) {
                TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                return executorBiz.run(triggerParam);
            } else if ("/kill".equals(uri)) {
                KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                return executorBiz.kill(killParam);
            } else if ("/log".equals(uri)) {
                LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                return executorBiz.log(logParam);
            }
   }         

而run请求则是:判断不同类型的请求,采用不同的方式接,将触发器参数放入到触发器队列

代码语言:javascript
复制
//启动
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
    // load old:jobHandler + jobThread
    //加载job线程
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    //判断job线程是否为空
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    // valid:jobHandler + jobThread
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    //判断粘合类型是否是bean,如果是bean,则加载jobHandler
    //校验jobHandler,对不同的枚举类型进行适配,从而执行相应的方式完成
    // executor block strategy
    //执行阻塞策略,匹配策略。同时进行判断是否进行丢弃
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
            //如果是覆盖,则杀掉线程
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }
    //如果job线程为空,则替换线程 创建或者存在无效
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }
    //将触发器参数放入到触发器队列
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

将触发器放入队列:

代码语言:javascript
复制
 /**
    * new trigger to queue
 * 新触发器到队列
    *
    * @param triggerParam
    * @return
    */
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
   // avoid repeat
   if (triggerLogIdSet.contains(triggerParam.getLogId())) {
      logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
      return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
   }

   triggerLogIdSet.add(triggerParam.getLogId());
   //添加到触发队列
   triggerQueue.add(triggerParam);
       return ReturnT.SUCCESS;
}

而写入响应:

代码语言:javascript
复制
/**
 * write response
 */
private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
    // write response
    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));   //  Unpooled.wrappedBuffer(responseJson)
    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");       // HttpHeaderValues.TEXT_PLAIN.toString()
    response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
    if (keepAlive) {
        response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
    }
    ctx.writeAndFlush(response);
}

除此之外,xxl-job还有相关的任务分配策略:一致性hash、LRU、LFU、随机、轮询等。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档