前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MyCat - 源代码篇(11)

MyCat - 源代码篇(11)

作者头像
干货满满张哈希
发布2021-04-12 16:17:01
3940
发布2021-04-12 16:17:01
举报

数据库路由中间件MyCat - 源代码篇(11)

4.配置模块

每个MyCatServer初始化时,会初始化: MyCatServer.java:

代码语言:javascript
复制
public static final String NAME = "MyCat";
private static final long LOG_WATCH_DELAY = 60000L;
private static final long TIME_UPDATE_PERIOD = 20L;
private static final MycatServer INSTANCE = new MycatServer();
private static final Logger LOGGER = LoggerFactory.getLogger("MycatServer");
private final RouteService routerService;
private final CacheService cacheService;
private Properties dnIndexProperties;
//AIO连接群组
private AsynchronousChannelGroup[] asyncChannelGroups;
private volatile int channelIndex = 0;

//全局序列号
private final MyCATSequnceProcessor sequnceProcessor = new MyCATSequnceProcessor();
private final DynaClassLoader catletClassLoader;
private final SQLInterceptor sqlInterceptor;
private volatile int nextProcessor;
private BufferPool bufferPool;
private boolean aio = false;

//XA事务全局ID生成
private final AtomicLong xaIDInc = new AtomicLong();
private MycatServer() {
    //读取文件配置
    this.config = new MycatConfig();
    //定时线程池,单线程线程池
    scheduler = Executors.newSingleThreadScheduledExecutor();
    //SQL记录器
    this.sqlRecorder = new SQLRecorder(config.getSystem()
            .getSqlRecordCount());
    /**
     * 是否在线,MyCat manager中有命令控制
     * | offline | Change MyCat status to OFF |
     * | online | Change MyCat status to ON |
     */
    this.isOnline = new AtomicBoolean(true);
    //缓存服务初始化
    cacheService = new CacheService();
    //路由计算初始化
    routerService = new RouteService(cacheService);
    // load datanode active index from properties
    dnIndexProperties = loadDnIndexProps();
    try {
        //SQL解析器
        sqlInterceptor = (SQLInterceptor) Class.forName(
                config.getSystem().getSqlInterceptor()).newInstance();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    //catlet加载器
    catletClassLoader = new DynaClassLoader(SystemConfig.getHomePath()
            + File.separator + "catlet", config.getSystem()
            .getCatletClassCheckSeconds());
    //记录启动时间
    this.startupTime = TimeUtil.currentTimeMillis();
}

第一步是读取文件配置,主要是三个文件:schema.xml,rule.xml和server.xml. 读取后的配置会加载到MyCatConfig中。 MyCatConfig.java:

代码语言:javascript
复制
public MycatConfig() {
//读取schema.xml,rule.xml和server.xml
ConfigInitializer confInit = new ConfigInitializer(true);
this.system = confInit.getSystem();
this.users = confInit.getUsers();
this.schemas = confInit.getSchemas();
this.dataHosts = confInit.getDataHosts();

this.dataNodes = confInit.getDataNodes();
for (PhysicalDBPool dbPool : dataHosts.values()) {
    dbPool.setSchemas(getDataNodeSchemasOfDataHost(dbPool.getHostName()));
}
this.quarantine = confInit.getQuarantine();
this.cluster = confInit.getCluster();
//初始化重加载配置时间
this.reloadTime = TimeUtil.currentTimeMillis();
this.rollbackTime = -1L;
this.status = RELOAD;
//配置加载锁
this.lock = new ReentrantLock();
}

它们都通过ConfigInitializer读取:

代码语言:javascript
复制
public ConfigInitializer(boolean loadDataHost) {
    //读取schema.xml
    SchemaLoader schemaLoader = new XMLSchemaLoader();
    //读取server.xml
    XMLConfigLoader configLoader = new XMLConfigLoader(schemaLoader);
    schemaLoader = null;
    //加载配置
    this.system = configLoader.getSystemConfig();
    this.users = configLoader.getUserConfigs();
    this.schemas = configLoader.getSchemaConfigs();
    //是否重新加载DataHost和对应的DataNode
    if (loadDataHost) {
        this.dataHosts = initDataHosts(configLoader);
        this.dataNodes = initDataNodes(configLoader);
    }
    //权限管理
    this.quarantine = configLoader.getQuarantineConfig();
    this.cluster = initCobarCluster(configLoader);
    //不同类型的全局序列处理器的配置加载
    if (system.getSequnceHandlerType() == SystemConfig.SEQUENCEHANDLER_MYSQLDB) {
        IncrSequenceMySQLHandler.getInstance().load();
    }
    if (system.getSequnceHandlerType() == SystemConfig.SEQUENCEHANDLER_LOCAL_TIME) {
        IncrSequenceTimeHandler.getInstance().load();
    }
    //检查user与schema配置对应以及schema配置不为空
    this.checkConfig();
}

4.1 rule.xml

读取schema之前会先读取rule.xml。 XmlSchemaLoader.java:

代码语言:javascript
复制
public XMLSchemaLoader(String schemaFile, String ruleFile) {
    //先读取rule.xml
    XMLRuleLoader ruleLoader = new XMLRuleLoader(ruleFile);
    this.tableRules = ruleLoader.getTableRules();
    ruleLoader = null;
    this.dataHosts = new HashMap();
    this.dataNodes = new HashMap();
    this.schemas = new HashMap();
    //读取加载schema配置
    this.load(DEFAULT_DTD, schemaFile == null ? DEFAULT_XML : schemaFile);
}

public XMLSchemaLoader() {
    this(null, null);
}

XMLRuleLoader.java:

代码语言:javascript
复制
public XMLRuleLoader(String ruleFile) {
    // this.rules = new HashSet();
    //rule名 -> rule
    this.tableRules = new HashMap();
    //function名 -> 具体分片算法
    this.functions = new HashMap();
    //默认为:/rule.dtd和/rule.xml
    load(DEFAULT_DTD, ruleFile == null ? DEFAULT_XML : ruleFile);
}

public XMLRuleLoader() {
    this(null);
}
代码语言:javascript
复制
private void load(String dtdFile, String xmlFile) {
    InputStream dtd = null;
    InputStream xml = null;
    try {
        dtd = XMLRuleLoader.class.getResourceAsStream(dtdFile);
        xml = XMLRuleLoader.class.getResourceAsStream(xmlFile);
        //读取出语意树
        Element root = ConfigUtil.getDocument(dtd, xml)
                .getDocumentElement();
        //加载Function
        loadFunctions(root);
        //加载TableRule
        loadTableRules(root);
    } catch (ConfigException e) {
        throw e;
    } catch (Exception e) {
        throw new ConfigException(e);
    } finally {
        if (dtd != null) {
            try {
                dtd.close();
            } catch (IOException e) {
            }
        }
        if (xml != null) {
            try {
                xml.close();
            } catch (IOException e) {
            }
        }
    }
}

ConfigUtil.java解析语意树:

代码语言:javascript
复制
public static Document getDocument(final InputStream dtd, InputStream xml) throws ParserConfigurationException,
            SAXException, IOException {
    DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
    factory.setValidating(true);
    factory.setNamespaceAware(false);
    DocumentBuilder builder = factory.newDocumentBuilder();
    builder.setEntityResolver(new EntityResolver() {
        @Override
        public InputSource resolveEntity(String publicId, String systemId) {
            return new InputSource(dtd);
        }
    });
    builder.setErrorHandler(new ErrorHandler() {
        @Override
        public void warning(SAXParseException e) {
        }

        @Override
        public void error(SAXParseException e) throws SAXException {
            throw e;
        }

        @Override
        public void fatalError(SAXParseException e) throws SAXException {
            throw e;
        }
    });
    return builder.parse(xml);
}

加载functions,XmlRuleLoader.java

代码语言:javascript
复制
private void loadFunctions(Element root) throws ClassNotFoundException,
            InstantiationException, IllegalAccessException,
            InvocationTargetException {
        NodeList list = root.getElementsByTagName("function");
        for (int i = 0, n = list.getLength(); i < n; ++i) {
            Node node = list.item(i);
            if (node instanceof Element) {
                Element e = (Element) node;
                //获取name标签
                String name = e.getAttribute("name");
                //如果Map已有,则function重复
                if (functions.containsKey(name)) {
                    throw new ConfigException("rule function " + name
                            + " duplicated!");
                }
                //获取class标签
                String clazz = e.getAttribute("class");
                //根据class利用反射新建分片算法
                AbstractPartitionAlgorithm function = createFunction(name, clazz);

                ParameterMapping.mapping(function, ConfigUtil.loadElements(e));
                //每个AbstractPartitionAlgorithm可能会实现init来初始化
                function.init();
                //放入functions map
                functions.put(name, function);
            }
        }
    }

private AbstractPartitionAlgorithm createFunction(String name, String clazz)
        throws ClassNotFoundException, InstantiationException,
        IllegalAccessException, InvocationTargetException {
    Class clz = Class.forName(clazz);
    //判断是否继承AbstractPartitionAlgorithm
    if (!AbstractPartitionAlgorithm.class.isAssignableFrom(clz)) {
        throw new IllegalArgumentException("rule function must implements "
                + AbstractPartitionAlgorithm.class.getName() + ", name=" + name);
    }
    return (AbstractPartitionAlgorithm) clz.newInstance();
}

加载所有的function的node,每一个node就是一个AbstractPartitionAlgorithm,并放入functions这个map中;

代码语言:javascript
复制
private final Map tableRules;

对于每一个node,通过反射新建对应参数的AbstractPartitionAlgorithm。这样,所有的function就加载到了functions这个map中。 同理,加载TableRule,就加上了function是否存在的判断:

代码语言:javascript
复制
/**
* tableRule标签结构:
 * 
 *     
 *         create_date
 *         partbymonth
 *     
 * 
 * @param root
 * @throws SQLSyntaxErrorException
    */
private void loadTableRules(Element root) throws SQLSyntaxErrorException {
    //获取每个tableRule标签
    NodeList list = root.getElementsByTagName("tableRule");
    for (int i = 0, n = list.getLength(); i < n; ++i) {
        Node node = list.item(i);
        if (node instanceof Element) {
            Element e = (Element) node;
            //先判断是否重复
            String name = e.getAttribute("name");
            if (tableRules.containsKey(name)) {
                throw new ConfigException("table rule " + name
                        + " duplicated!");
            }
            //获取rule标签
            NodeList ruleNodes = e.getElementsByTagName("rule");
            int length = ruleNodes.getLength();
            if (length > 1) {
                throw new ConfigException("only one rule can defined :"
                        + name);
            }
            //目前只处理第一个,未来可能有多列复合逻辑需求
            //RuleConfig是保存着rule与function对应关系的对象
            RuleConfig rule = loadRule((Element) ruleNodes.item(0));
            String funName = rule.getFunctionName();
            //判断function是否存在,获取function
            AbstractPartitionAlgorithm func = functions.get(funName);
            if (func == null) {
                throw new ConfigException("can't find function of name :"
                        + funName);
            }
            rule.setRuleAlgorithm(func);
            //保存到tableRules
            tableRules.put(name, new TableRuleConfig(name, rule));
        }
    }
}

这样,所有的tableRule和function就加载完毕。保存在一个变量中,就是tableRules: XMLRuleLoader.java:

代码语言:javascript
复制
private final Map tableRules;

4.2 schema.xml:

代码语言:javascript
复制
public XMLSchemaLoader(String schemaFile, String ruleFile) {
    //先读取rule.xml
    XMLRuleLoader ruleLoader = new XMLRuleLoader(ruleFile);
    //将tableRules拿出,用于这里加载Schema做rule有效判断,以及之后的分片路由计算
    this.tableRules = ruleLoader.getTableRules();
    //释放ruleLoader
    ruleLoader = null;
    this.dataHosts = new HashMap<String, DataHostConfig>();
    this.dataNodes = new HashMap<String, DataNodeConfig>();
    this.schemas = new HashMap<String, SchemaConfig>();
    //读取加载schema配置
    this.load(DEFAULT_DTD, schemaFile == null ? DEFAULT_XML : schemaFile);
}

private void load(String dtdFile, String xmlFile) {
    InputStream dtd = null;
    InputStream xml = null;
    try {
        dtd = XMLSchemaLoader.class.getResourceAsStream(dtdFile);
        xml = XMLSchemaLoader.class.getResourceAsStream(xmlFile);
        Element root = ConfigUtil.getDocument(dtd, xml).getDocumentElement();
        //先加载所有的DataHost
        loadDataHosts(root);
        //再加载所有的DataNode
        loadDataNodes(root);
        //最后加载所有的Schema
        loadSchemas(root);
    } catch (ConfigException e) {
        throw e;
    } catch (Exception e) {
        throw new ConfigException(e);
    } finally {

        if (dtd != null) {
            try {
                dtd.close();
            } catch (IOException e) {
            }
        }

        if (xml != null) {
            try {
                xml.close();
            } catch (IOException e) {
            }
        }
    }
}

先看下DataHostConfig这个类的结构:

这里写图片描述
这里写图片描述

XMLSchemaLoader.java:

代码语言:javascript
复制
private void loadDataHosts(Element root) {
    NodeList list = root.getElementsByTagName("dataHost");
    for (int i = 0, n = list.getLength(); i < n; ++i) {

        Element element = (Element) list.item(i);
        String name = element.getAttribute("name");
        //判断是否重复
        if (dataHosts.containsKey(name)) {
            throw new ConfigException("dataHost name " + name + "duplicated!");
        }
        //读取最大连接数
        int maxCon = Integer.valueOf(element.getAttribute("maxCon"));
        //读取最小连接数
        int minCon = Integer.valueOf(element.getAttribute("minCon"));
        /**
         * 读取负载均衡配置
         * 1. balance="0", 不开启分离机制,所有读操作都发送到当前可用的 writeHost 上。
         * 2. balance="1",全部的 readHost 和 stand by writeHost 参不 select 的负载均衡
         * 3. balance="2",所有读操作都随机的在 writeHost、readhost 上分发。
         * 4. balance="3",所有读请求随机的分发到 wiriterHost 对应的 readhost 执行,writerHost 不负担读压力
         */
        int balance = Integer.valueOf(element.getAttribute("balance"));
        /**
         * 读取切换类型
         * -1 表示不自动切换
         * 1 默认值,自动切换
         * 2 基于MySQL主从同步的状态决定是否切换
         * 心跳询句为 show slave status
         * 3 基于 MySQL galary cluster 的切换机制
         */
        String switchTypeStr = element.getAttribute("switchType");
        int switchType = switchTypeStr.equals("") ? -1 : Integer.valueOf(switchTypeStr);
        //读取从延迟界限
        String slaveThresholdStr = element.getAttribute("slaveThreshold");
        int slaveThreshold = slaveThresholdStr.equals("") ? -1 : Integer.valueOf(slaveThresholdStr);

        //如果 tempReadHostAvailable 设置大于 0 则表示写主机如果挂掉, 临时的读服务依然可用
        String tempReadHostAvailableStr = element.getAttribute("tempReadHostAvailable");
        boolean tempReadHostAvailable = tempReadHostAvailableStr.equals("") ? false : Integer.valueOf(tempReadHostAvailableStr) > 0;
        /**
         * 读取 写类型
         * 这里只支持 0 - 所有写操作仅配置的第一个 writeHost
         */
        String writeTypStr = element.getAttribute("writeType");
        int writeType = "".equals(writeTypStr) ? PhysicalDBPool.WRITE_ONLYONE_NODE : Integer.valueOf(writeTypStr);


        String dbDriver = element.getAttribute("dbDriver");
        String dbType = element.getAttribute("dbType");
        String filters = element.getAttribute("filters");
        String logTimeStr = element.getAttribute("logTime");
        long logTime = "".equals(logTimeStr) ? PhysicalDBPool.LONG_TIME : Long.valueOf(logTimeStr) ;
        //读取心跳语句
        String heartbeatSQL = element.getElementsByTagName("heartbeat").item(0).getTextContent();
        //读取 初始化sql配置,用于oracle
        NodeList connectionInitSqlList = element.getElementsByTagName("connectionInitSql");
        String initConSQL = null;
        if (connectionInitSqlList.getLength() > 0) {
            initConSQL = connectionInitSqlList.item(0).getTextContent();
        }
        //读取writeHost
        NodeList writeNodes = element.getElementsByTagName("writeHost");
        DBHostConfig[] writeDbConfs = new DBHostConfig[writeNodes.getLength()];
        Map readHostsMap = new HashMap(2);
        for (int w = 0; w < writeDbConfs.length; w++) {
            Element writeNode = (Element) writeNodes.item(w);
            writeDbConfs[w] = createDBHostConf(name, writeNode, dbType, dbDriver, maxCon, minCon,filters,logTime);
            NodeList readNodes = writeNode.getElementsByTagName("readHost");
            //读取对应的每一个readHost
            if (readNodes.getLength() != 0) {
                DBHostConfig[] readDbConfs = new DBHostConfig[readNodes.getLength()];
                for (int r = 0; r < readDbConfs.length; r++) {
                    Element readNode = (Element) readNodes.item(r);
                    readDbConfs[r] = createDBHostConf(name, readNode, dbType, dbDriver, maxCon, minCon,filters, logTime);
                }
                readHostsMap.put(w, readDbConfs);
            }
        }

        DataHostConfig hostConf = new DataHostConfig(name, dbType, dbDriver, 
                writeDbConfs, readHostsMap, switchType, slaveThreshold, tempReadHostAvailable);     

        hostConf.setMaxCon(maxCon);
        hostConf.setMinCon(minCon);
        hostConf.setBalance(balance);
        hostConf.setWriteType(writeType);
        hostConf.setHearbeatSQL(heartbeatSQL);
        hostConf.setConnectionInitSql(initConSQL);
        hostConf.setFilters(filters);
        hostConf.setLogTime(logTime);
        dataHosts.put(hostConf.getName(), hostConf);
    }
}

先读取每个DataHost的通用配置,之后读取每个DataHost对应的writeHost以及每个writeHost对应的readHost。配置好后,保存在:

代码语言:javascript
复制
private final Map dataHosts;

之后读取载入DataHost: XMLSchemaLoader.java:

代码语言:javascript
复制
private void loadDataNodes(Element root) {
    //读取DataNode分支
    NodeList list = root.getElementsByTagName("dataNode");
    for (int i = 0, n = list.getLength(); i < n; i++) {
        Element element = (Element) list.item(i);
        String dnNamePre = element.getAttribute("name");

        String databaseStr = element.getAttribute("database");
        String host = element.getAttribute("dataHost");
        //字符串不为空
        if (empty(dnNamePre) || empty(databaseStr) || empty(host)) {
            throw new ConfigException("dataNode " + dnNamePre + " define error ,attribute can't be empty");
        }
        //dnNames(name),databases(database),hostStrings(dataHost)都可以配置多个,以',', '$', '-'区分,但是需要保证database的个数*dataHost的个数=name的个数
        //多个dataHost与多个database如果写在一个标签,则每个dataHost拥有所有database
        //例如:
        //则为:localhost1拥有dn1$0-75,localhost2也拥有dn1$0-75(对应db$76-151)
        String[] dnNames = io.mycat.util.SplitUtil.split(dnNamePre, ',', '$', '-');
        String[] databases = io.mycat.util.SplitUtil.split(databaseStr, ',', '$', '-');
        String[] hostStrings = io.mycat.util.SplitUtil.split(host, ',', '$', '-');

        if (dnNames.length > 1 && dnNames.length != databases.length * hostStrings.length) {
            throw new ConfigException("dataNode " + dnNamePre
                            + " define error ,dnNames.length must be=databases.length*hostStrings.length");
        }
        if (dnNames.length > 1) {

            List<String[]> mhdList = mergerHostDatabase(hostStrings, databases);
            for (int k = 0; k < dnNames.length; k++) {
                String[] hd = mhdList.get(k);
                String dnName = dnNames[k];
                String databaseName = hd[1];
                String hostName = hd[0];
                createDataNode(dnName, databaseName, hostName);
            }

        } else {
            createDataNode(dnNamePre, databaseStr, host);
        }

    }
}
private void createDataNode(String dnName, String database, String host) {

    DataNodeConfig conf = new DataNodeConfig(dnName, database, host);       
    if (dataNodes.containsKey(conf.getName())) {
        throw new ConfigException("dataNode " + conf.getName() + " duplicated!");
    }

    if (!dataHosts.containsKey(host)) {
        throw new ConfigException("dataNode " + dnName + " reference dataHost:" + host + " not exists!");
    }
    dataNodes.put(conf.getName(), conf);
}

生成的是DataNode类,放入:

代码语言:javascript
复制
private final Map dataNodes;
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-05-03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 数据库路由中间件MyCat - 源代码篇(11)
    • 4.配置模块
      • 4.1 rule.xml
      • 4.2 schema.xml:
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档