前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式调度中间件xxl-job(四):执行器Executor--执行器的注册

分布式调度中间件xxl-job(四):执行器Executor--执行器的注册

作者头像
闲宇非鱼
发布2022-02-08 11:28:10
3.3K0
发布2022-02-08 11:28:10
举报

人生苦短,不如养狗

一、前言

  在上一章分布式调度中间件xxl-job(三):执行器Executor—任务注册中我们学习了有关任务注册相关的知识,而任务又与 执行器(Executor)息息相关。所以这一章,我们就来学习一下xxl-job中执行器是 如何进行注册的

二、执行器Executor

1. 基本组成

  在开始学习执行器注册和任务执行的原理之前,先让我们来看一下执行器的基本组成。在xxl-job-core中,专门定义了XxlJobExecutor这个基本的执行器类,同时为了满足Spring中的使用,还定义了XxlJobSpringExecutor这个类进行了一些Spring相关的扩展。下面我们先来看下XxlJobExecutor中的成员变量:

  • accessToken:调度中心通信令牌。该参数非必填,当开启时,就意味着执行器和调度中心进行通信时需要使用令牌进行身份验证,所以此处的令牌必须和调度中心使用的令牌保持一致;
  • address:执行器注册地址。在进行执行器地址注册时优先使用该地址,如果为空则使用内嵌服务以”ip:port”作为执行器的注册地址;
  • adminAdresses:调度中心地址,也即执行器注册中心地址。默认值为http://localhost:8080/xxl-job-admin;
  • adminBizList:注册中心客户端。由于注册中心可能使用了HA机制,所以注册中心可能有多个,这里的变量就是用了List<>的数据结构;
  • appname:应用名称。在前面的学习中我们曾经讲过,在xxl-job中执行器是以应用为单位的,每个应用就是一个执行器集群(也可能是单执行器);
  • embedServer:执行器服务端。在xxl-job中大佬自行实现了一个rpc,这里可以将其理解为rpc服务的服务端,在EmbedServer类中进行了具体的服务注册和服务端方法调用过程;
  • ip:执行器的ip地址;
  • JobHandlerRepository:任务处理程序库,也即任务库。这里使用了ConcurrentHashMap来保证在多线程环境下的线程安全问题,其中key是任务名(这里也就是上一章中为什么要保证任务名称不同的原因),value存放的是具体的任务;
  • JobThreadRepository:任务线程库。其中放置着在该执行器上进行任务执行的执行器线程;
  • logPath:执行器日志地址。
  • logRetentionDays:日志保留时间,单位是天;
  • port:执行器端口号,默认为9999;

  看完了成员变量,下面摘取XxlJobExecutor中比较重要的方法进行分析。

代码语言:javascript
复制
public void start() throws Exception {

    // 初始化日志路径
    XxlJobFileAppender.initLogPath(logPath);

    // 初始化invoker,调度中心客户端
    initAdminBizList(adminAddresses, accessToken);


    // 初始化日志文件清除线程
    JobLogFileCleanThread.getInstance().start(logRetentionDays);

    // 初始化触发器回调线程
    TriggerCallbackThread.getInstance().start();

    // 初始化执行器服务
    initEmbedServer(address, ip, port, appname, accessToken);
}

  可以看到,在start()方法中主要完成了基本的初始化工作。主要是执行器日志路劲初始化、调度中心(即注册中心)客户端初始化、日志文件线程清理、触发器回调线程初始化以及执行器服务初始化。   这里我们主要关注一下initAdminBizList(adminAddresses, accessToken);TriggerCallbackThread.getInstance().start();以及initEmbedServer(address, ip, port, appname, accessToken);这三个方法。

initAdminBizList(adminAddresses, accessToken);

代码语言:javascript
复制
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
    if (adminAddresses!=null && adminAddresses.trim().length()>0) {
// 这里的地址是使用","来进行分割的
        for (String address: adminAddresses.trim().split(",")) {
            if (address!=null && address.trim().length()>0) {

                AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

                if (adminBizList == null) {
                    adminBizList = new ArrayList<AdminBiz>();
                }
                adminBizList.add(adminBiz);
            }
        }
    }
}

  可以看到这里主要是对注册中心客户端中的注册中心地址以及通信令牌进行了初始化。同时需要注意的是,注册中心可能采取了HA机制,所以可能存在多个,这里使用了List数据结构进行注册中心客户端的初始化。

TriggerCallbackThread.getInstance().start();

  这里就不展示具体的源码,我们分析一下此处调用的作用。首先从方法调用形式可以看出,TriggerCallBackThread使用了单例模式,在每个执行器上开启 一个触发器回调线程,在这个回调线程中从执行器本地的执行结果队列中将执行结果取出然后回调给注册中心,同时还会将对应的日志id和日志时间返回给注册中心。

initEmbedServer(address, ip, port, appname, accessToken);

代码语言:javascript
复制
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

    // 进行端口处理
    port = port>0?port: NetUtil.findAvailablePort(9999);
    ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

    // 生成执行器地址
	  // 这里默认使用address作为执行器地址,如果address为空,则使用ip:port的形式作为执行器地址
    if (address==null || address.trim().length()==0) {
    String ip_port_address = IpUtil.getIpPort(ip, port);
    address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}


    // 启动嵌入服务
    embedServer = new EmbedServer();
    embedServer.start(address, port, appname, accessToken);
}

  在这个方法中主要是进行执行器初始化相关的操作,首先是进行执行器地址生成,获取到执行器地址后启动嵌入服务,进行进行执行器注册、初始化业务线程等。在strat()方法中使用了标准的Netty服务端启动套路,下面我们会具体分析一下该方法。

2. 执行器注册

  执行器的注册流程主要在EmbedServer类中的start()方法,这里我们具体看下start()方法:

代码语言:javascript
复制
public void start(final String address, final int port, final String appname, final String accessToken) {
    executorBiz = new ExecutorBizImpl();
    thread = new Thread(new Runnable() {

        @Override
        public void run() {
			  // 这里创建两个独立的Reator线程池,一个用于接收客户端的TCP连接,一个而用于处理I/O相关操作
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            // 执行器业务线程,这里的线程池拒绝策略是抛出线程池耗尽异常
            ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                    0,
                    200,
                    60L,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(2000),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable r) {
                            return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
                        }
                    },
                    new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                        }
                    });


            try {
                // 启动服务,这里主要是进行心跳检测和执行执行器业务task
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline()
                                        .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                        .addLast(new HttpServerCodec())
                                        .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                        .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                            }
                        })
                        .childOption(ChannelOption.SO_KEEPALIVE, true);

                // 绑定端口
                ChannelFuture future = bootstrap.bind(port).sync();

                logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

                // 开始注册应用名和执行器地址
                startRegistry(appname, address);

                // 等待直到停止
                future.channel().closeFuture().sync();

            } catch (InterruptedException e) {
                if (e instanceof InterruptedException) {
                    logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
                } else {
                    logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
                }
            } finally {
                // 停止
                try {
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }

        }

    });
    // 将该线程设置为守护线程
    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    thread.setDaemon(true);
    thread.start();
}

  方法略长,这里我们简单分析一下这个方法中做了什么。在start()方法中主要创建了一个 守护线程 ,在这个守护线程中使用标准的Netty服务端启动套路,创建了两个独立的Reactor线程池。一个用于接收客户端的TCP连接,另一个用于处理I/O相关的操作。在启动服务代码块中可以看到,这两个线程池主要将心跳检测和执行器业务调用task注册到ChannelPipeline当中。   除此以外,start()方法还创建了执行器业务线程池,对执行器服务进行端口绑定以及执行器应用名和地址的注册,可以参看上面代码中的注释。这里具体看下startRegistry(appname, address);方法:

代码语言:javascript
复制
public void startRegistry(final String appname, final String address) {
    // start registry
    ExecutorRegistryThread.getInstance().start(appname, address);
}

public class ExecutorRegistryThread {
...
	registryThread = new Thread(new Runnable() {
    	@Override
    	public void run() {
		while (!toStop) {
    try {
// 注册参数
        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
// 对每一个注册中心客户端进行轮询,只要有一个注册中心客户端注册成功就直接跳出循环
        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
            try {
                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                    registryResult = ReturnT.SUCCESS;
                    logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                    break;
                } else {
                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                }
            } catch (Exception e) {
                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
            }

        }
    } catch (Exception e) {
        if (!toStop) {
            logger.error(e.getMessage(), e);
        }

    }
}
...
		}
...
}

ExecutorRegistryThread同样使用了单例模式实现。在这个注册线程的启动方法中同样设置了一个守护线程。在守护线程中每个注册中心客户端都进行了执行器的注册,只要有一个注册中心注册成功就会跳出循环,否则就会进行下一个注册中心注册尝试。由于每个注册中心都是连接的同一个数据库,所以这里只需要一个注册中心注册成功执行器,所有的注册中心都能共享的执行器列表。   其中toStop默认为false,当执行器执行销毁方法是会将其设置为true。

三、总结

  本章主要介绍了xxl-job中执行器的基本组成和执行器注册部分的内容。对于执行器注册部分的内容,从逻辑上来看并不是很难理解,主要还是使用了Netty的线程模型进行了服务端和注册中心的TCP连接以及执行器业务task注册。如果对Netty线程模型还不是很熟的同学,可以咨询一下度娘。   以上内容均为闲鱼个人浅见,如有不对的地方,各位看官大佬轻喷,多多指教~~

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-05-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Brucebat的伪技术鱼塘 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言
  • 二、执行器Executor
    • 1. 基本组成
      • 2. 执行器注册
      • 三、总结
      相关产品与服务
      微服务引擎 TSE
      微服务引擎(Tencent Cloud Service Engine)提供开箱即用的云上全场景微服务解决方案。支持开源增强的云原生注册配置中心(Zookeeper、Nacos 和 Apollo),北极星网格(腾讯自研并开源的 PolarisMesh)、云原生 API 网关(Kong)以及微服务应用托管的弹性微服务平台。微服务引擎完全兼容开源版本的使用方式,在功能、可用性和可运维性等多个方面进行增强。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档