前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式任务调度系统xxl-job源码探究(一、客户端)

分布式任务调度系统xxl-job源码探究(一、客户端)

作者头像
老梁
发布2019-09-10 16:45:05
9080
发布2019-09-10 16:45:05
举报

前面讲了xxl-job的搭建,现在来粗略的解析下该分布式调度系统的源码,先来客户点代码

客户端源码

  1. 客户端开启的时候会向服务中心进行注册,其实现用的是jetty连接,且每隔半分钟会发送一次心跳,来告诉服务中心该执行器是否正常
  2. 查看源码可以从配置文件入手
@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobExecutor xxlJobExecutor() {
    logger.info(">>>>>>>>>>> xxl-jobhandler config init.");
    XxlJobExecutor xxlJobExecutor = new XxlJobExecutor();
    xxlJobExecutor.setAdminAddresses(adminAddresses);
    xxlJobExecutor.setAppName(appName);
    xxlJobExecutor.setIp(ip);
    xxlJobExecutor.setPort(port);
    xxlJobExecutor.setAccessToken(accessToken);
    xxlJobExecutor.setLogPath(logPath);
    xxlJobExecutor.setLogRetentionDays(logRetentionDays);

    return xxlJobExecutor;
}

很明显,在把配置信息注入以后,该配置执行了start方法,进入其中

  1. 可以看到以下代码,英文注释我斗胆翻译下~
// ---------------------- start + stop ----------------------
public void start() throws Exception {
    // init admin-client 初始化服务中心
    initAdminBizList(adminAddresses, accessToken);

    // init executor-jobHandlerRepository 
    // 初始化jobHandler也就是继承了该类的所有定时方法,模仿spring ioc,把实例化对象都保存了起来
    initJobHandlerRepository(applicationContext);

    // init logpath 初始化日志文件,设置文件路径
    XxlJobFileAppender.initLogPath(logPath);

    // init executor-server 初始化执行器服务,看参数知道这里就是jetty连接的主要地方了
    initExecutorServer(port, ip, appName, accessToken);

    // init JobLogFileCleanThread 看名字也知道 初始化日志清理线程,应该是用来定时清理日志的
    JobLogFileCleanThread.getInstance().start(logRetentionDays);
}

国人编写的代码就是有国人自己的风格,至少比国外的开源代码好看懂点

  1. 这里主要深入执行器部分吧,其他还是容易看懂的,继续深入
private void initExecutorServer(int port, String ip, String appName, String accessToken) throws Exception {
    // valid param 可以看到,我们不配置jetty端口,它默认也是9999
    port = port>0?port: NetUtil.findAvailablePort(9999);

    // start server
    NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl());   // rpc-service, base on jetty
    NetComServerFactory.setAccessToken(accessToken);
    //主要就是这个方法了
    serverFactory.start(port, ip, appName); // jetty + registry
}

继续深入

// ---------------------- server start ----------------------
//可以看到用到了jetty服务
JettyServer server = new JettyServer();
public void start(int port, String ip, String appName) throws Exception {
    server.start(port, ip, appName);
}

继续深入

public void start(final int port, final String ip, final String appName) throws Exception {
    thread = new Thread(new Runnable() {
        @Override
        public void run() {

            // The Server
            server = new Server(new ExecutorThreadPool());  // 非阻塞

            // HTTP connector
            ServerConnector connector = new ServerConnector(server);
            if (ip!=null && ip.trim().length()>0) {
                connector.setHost(ip);  // The network interface this connector binds to as an IP address or a hostname.  If null or 0.0.0.0, then bind to all interfaces.
            }
            connector.setPort(port);
            server.setConnectors(new Connector[]{connector});

            // Set a handler
            HandlerCollection handlerc =new HandlerCollection();
            handlerc.setHandlers(new Handler[]{new JettyServerHandler()});
            server.setHandler(handlerc);

            try {
                // Start server
                server.start();
                logger.info(">>>>>>>>>>> xxl-job jetty server start success at port:{}.", port);

                // Start Registry-Server 注册到服务中心方法,单独线程执行
                ExecutorRegistryThread.getInstance().start(port, ip, appName);

                // Start Callback-Server 定时任务回调方法,单独线程执行
                TriggerCallbackThread.getInstance().start();

                server.join();  // block until thread stopped
                logger.info(">>>>>>>>>>> xxl-rpc server join success, netcon={}, port={}", JettyServer.class.getName(), port);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                //destroy();
            }
        }
    });
    thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    thread.start();
}

可以看出这里主要关注

ExecutorRegistryThread.getInstance().start(port, ip, appName);
TriggerCallbackThread.getInstance().start();

这两个方法 继续深入

public void start(final int port, final String ip, final String appName){

    // valid
    if (appName==null || appName.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appName is null.");
        return;
    }
    if (XxlJobExecutor.getAdminBizList() == null) {
        logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
        return;
    }

    // executor address (generate addredd = ip:port)
    final String executorAddress;
    if (ip != null && ip.trim().length()>0) {
        executorAddress = ip.trim().concat(":").concat(String.valueOf(port));
    } else {
        executorAddress = IpUtil.getIpPort(port);
    }

    registryThread = new Thread(new Runnable() {
        @Override
        public void run() {

            // registry 此线程为守护线程,不销毁,循环执行
            while (!toStop) {
                try {
                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, executorAddress);
                    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                        try {
                            ReturnT<String> registryResult = adminBiz.registry(registryParam);
                            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                registryResult = ReturnT.SUCCESS;
                                logger.info(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                break;
                            } else {
                                logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            }
                        } catch (Exception e) {
                            logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                        }

                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
                // 睡眠30秒
                try {
                    TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                }
            }
            
            // registry remove 服务中心移除此任务起效果
            try {
                RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, executorAddress);
                for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                    try {
                        ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                        if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                            registryResult = ReturnT.SUCCESS;
                            logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            break;
                        } else {
                            logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                        }
                    } catch (Exception e) {
                        logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                    }

                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
            logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");

        }
    });
    registryThread.setDaemon(true);
    registryThread.start();
}

继续

public void start() {

    // valid
    if (XxlJobExecutor.getAdminBizList() == null) {
        logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
        return;
    }

    triggerCallbackThread = new Thread(new Runnable() {

        @Override
        public void run() {

            // normal callback
            while(!toStop){
                try {
                //这里采用了阻塞队列,可以看出,当服务中心发送任务到此队列,就会被消费
                    HandleCallbackParam callback = getInstance().callBackQueue.take();
                    if (callback != null) {

                        // callback list param
                        List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                        int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                        callbackParamList.add(callback);

                        // callback, will retry if error
                        if (callbackParamList!=null && callbackParamList.size()>0) {
                            doCallback(callbackParamList);
                        }
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }

            // last callback
            try {
                List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                if (callbackParamList!=null && callbackParamList.size()>0) {
                    doCallback(callbackParamList);
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
            logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");

        }
    });
    triggerCallbackThread.setDaemon(true);
    triggerCallbackThread.start();
}
可以看到此方法放入任务
public static void pushCallBack(HandleCallbackParam callback){
    getInstance().callBackQueue.add(callback);
    logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());
}

后面可以追溯好多层级,最顶上发现

private RpcResponse doInvoke(HttpServletRequest request) {
    try {
        // deserialize request
        byte[] requestBytes = HttpClientUtil.readBytes(request);
        if (requestBytes == null || requestBytes.length==0) {
            RpcResponse rpcResponse = new RpcResponse();
            rpcResponse.setError("RpcRequest byte[] is null");
            return rpcResponse;
        }
        RpcRequest rpcRequest = (RpcRequest) HessianSerializer.deserialize(requestBytes, RpcRequest.class);

        // invoke 主要就是这个调用了,调用一次会执行一次对应任务
        RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null);
        return rpcResponse;
    } catch (Exception e) {
        logger.error(e.getMessage(), e);

        RpcResponse rpcResponse = new RpcResponse();
        rpcResponse.setError("Server-error:" + e.getMessage());
        return rpcResponse;
    }
}

而这个对象实际在JettyServer服务类中已经加入

// Set a handler
HandlerCollection handlerc =new HandlerCollection();
handlerc.setHandlers(new Handler[]{new JettyServerHandler()});
server.setHandler(handlerc);
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018-09-18 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 客户端源码
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档