前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >sharding-jdbc源码之读写分离和从库负载

sharding-jdbc源码之读写分离和从库负载

作者头像
山行AI
发布2019-07-19 19:14:00
1.9K0
发布2019-07-19 19:14:00
举报
文章被收录于专栏:山行AI

最近项目中用到了sharding-jdbc的shardingDataSource来配置MasterSlaveLoadBalanceAlgorithm来做读写分离和从库负载,本文针对使用方法和源码分析来聊聊其中的原理。

1. 项目配置

代码语言:javascript
复制
<!-- 数据源 -->    <bean id="userDataSource" class="com.alibaba.druid.pool.DruidDataSource"        init-method="init" destroy-method="close" primary="true">        <property name="url" value="${jdbc.url}" />        <property name="username" value="${jdbc.username}" />        <property name="password" value="${jdbc.password}" />
        <!-- 初始化连接大小 -->        <property name="initialSize" value="${jdbc.initialSize}" />        <!-- 连接池最大使用连接数量 -->        <property name="maxActive" value="${jdbc.maxActive}" />        <!-- 获取连接最大等待时间 -->        <property name="maxWait" value="${jdbc.maxWait}" />        <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->        <property name="timeBetweenEvictionRunsMillis" value="60000" />        <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->        <property name="minEvictableIdleTimeMillis" value="300000" />
        <property name="validationQuery" value="SELECT 1" />        <property name="testWhileIdle" value="true" />        <property name="testOnBorrow" value="false" />        <property name="testOnReturn" value="false" />        <!-- 打开removeAbandoned功能 -->        <property name="removeAbandoned" value="true" />        <!-- 1800秒,也就是30分钟 -->        <property name="removeAbandonedTimeout" value="1800" />        <!-- 关闭abanded连接时输出错误日志 -->        <property name="logAbandoned" value="true" />        <!-- publicke设置 -->        <!--<property name="connectionProperties"-->            <!--value="config.decrypt=true;config.decrypt.key=${jdbc.publickey}" />-->        <!-- 监控数据库 (建议平时开发中,打开该配置,上线的时候关闭掉) -->        <!-- 属性类型是字符串,通过别名的方式配置扩展插件,常用的插件有:监控统计用的filter:stat; 日志用的filter:log4j;防御sql注入的filter:wall;合并sql的filter:mergeStat -->        <property name="filters" value="${jdbc.filters}" />        <property name="proxyFilters">            <list>                <ref bean="log-filter" />            </list>        </property>    </bean>
    <bean id="userDataSourceSlave" class="com.alibaba.druid.pool.DruidDataSource"        init-method="init" destroy-method="close">        <property name="driverClassName" value="com.mysql.jdbc.Driver" />        <property name="url" value="${jdbc.url.slave}"></property>        <property name="username" value="${jdbc.username.slave}"></property>        <property name="password" value="${jdbc.password.slave}"></property>        <property name="maxActive" value="100" />        <property name="initialSize" value="10" />        <property name="maxWait" value="60000" />        <property name="minIdle" value="5" />    </bean>
    <bean id="randomStrategy"        class="io.shardingjdbc.core.api.algorithm.masterslave.RandomMasterSlaveLoadBalanceAlgorithm" />
    <master-slave:data-source id="shardingDataSource"        master-data-source-name="userDataSource" slave-data-source-names="userDataSourceSlave"        strategy-ref="randomStrategy" />
    <!-- 配置MyBatis session工厂 -->        <bean id="userSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">            <property name="dataSource" ref="shardingDataSource" />            <property name="configLocation" value="classpath:/conf/mybaits-config.xml" />            <property name="mapperLocations">                <list>                    <value>classpath:/**/dao/mapper/*Mapper.xml</value>                </list>            </property>        ...

使用上面的配置,就可以使用shardingjdbc的RandomMasterSlaveLoadBalanceAlgorithm算法进行读写分离和从库负载了。那么具体的实现是什么样的呢?下面就来一步步看看。

2. 源码分析

由于使用的是spring,那么先来看一看读取配置文件的入口spring.handlers文件:

org.apache.shardingsphere.shardingjdbc.spring.namespace.handler.MasterSlaveNamespaceHandler的定义如下:

接下来看里面具体解析部分:

代码语言:javascript
复制
/** * Master-slave data source parser for spring namespace. * * @author zhangliang * @author zhaojun */public final class MasterSlaveDataSourceBeanDefinitionParser extends AbstractBeanDefinitionParser {
    @Override    protected AbstractBeanDefinition parseInternal(final Element element, final ParserContext parserContext) {        //这个BeanDefinitionBuilder主要用于构建SpringMasterSlaveDataSource        BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringMasterSlaveDataSource.class);        //内部是将master和slave的配置信息读取,然后设置好相应的规则 @see org.apache.shardingsphere.shardingjdbc.spring.namespace.parser.MasterSlaveDataSourceBeanDefinitionParser.parseDataSources的注释        MasterSlaveRuleConfigurationBeanDefinition configurationBeanDefinition = new MasterSlaveRuleConfigurationBeanDefinition(element);        //用于构建SpringMasterSlaveDataSource构造方法的第一个参数        factory.addConstructorArgValue(parseDataSources(configurationBeanDefinition));        //用于构建SpringMasterSlaveDataSource构造方法的第二个参数        factory.addConstructorArgValue(configurationBeanDefinition.getBeanDefinition());        //用于构建SpringMasterSlaveDataSource的第三个参数        factory.addConstructorArgValue(parseProperties(element, parserContext));        return factory.getBeanDefinition();    }
    /**     * 为了理解这个方法,可以看下SpringMasterSlaveDataSource的构造方法org.apache.shardingsphere.shardingjdbc.spring.datasource.SpringMasterSlaveDataSource:     *   public SpringMasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfiguration, final Properties props) throws SQLException {     *         super(dataSourceMap, new MasterSlaveRule(masterSlaveRuleConfiguration), null == props ? new Properties() : props);     *     }     *  没错,这里就是用来构建这个构造方法的第一个参数     * @param configurationBeanDefinition     * @return     */    private Map<String, RuntimeBeanReference> parseDataSources(final MasterSlaveRuleConfigurationBeanDefinition configurationBeanDefinition) {        List<String> slaveDataSources = Splitter.on(",").trimResults().splitToList(configurationBeanDefinition.getSlaveDataSourceNames());        Map<String, RuntimeBeanReference> result = new ManagedMap<>(slaveDataSources.size());        for (String each : slaveDataSources) {            result.put(each, new RuntimeBeanReference(each));        }        String masterDataSourceName = configurationBeanDefinition.getMasterDataSourceName();        result.put(masterDataSourceName, new RuntimeBeanReference(masterDataSourceName));        return result;    }
    private Properties parseProperties(final Element element, final ParserContext parserContext) {        Element propsElement = DomUtils.getChildElementByTagName(element, ShardingDataSourceBeanDefinitionParserTag.PROPS_TAG);        return null == propsElement ? new Properties() : parserContext.getDelegate().parsePropsElement(propsElement);    }}

这里主要用于构建datasource供持久层如hibernate、myibatis等使用,本文示例中配置的是myibatis。

到这里,我们主要开始看org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.MasterSlaveDataSource的源码:

代码语言:javascript
复制
@Getterpublic class MasterSlaveDataSource extends AbstractDataSourceAdapter {    /**     * 元数据信息     */    private final DatabaseMetaData cachedDatabaseMetaData;
    /**     * 主从的规则     */    private final MasterSlaveRule masterSlaveRule;
    /**     * sql解析引擎,将一个sql解析成SQLStatement,对应读的操作就是SelectStatement     */    private final MasterSlaveSQLParseEntry parseEngine;
    /**     * 分片的配置     */    private final ShardingProperties shardingProperties;
    public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final Properties props) throws SQLException {        super(dataSourceMap);        cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap);        this.masterSlaveRule = new MasterSlaveRule(masterSlaveRuleConfig);        parseEngine = new MasterSlaveSQLParseEntry(getDatabaseType());        shardingProperties = new ShardingProperties(null == props ? new Properties() : props);    }    ...
  @Override    public final MasterSlaveConnection getConnection() {        return new MasterSlaveConnection(this, getDataSourceMap(), parseEngine);    }}

datasource的主要作用是获取connection,这里是MasterSlaveConnection:

代码语言:javascript
复制
@RequiredArgsConstructor@Getterpublic final class MasterSlaveConnection extends AbstractConnectionAdapter {
    private final MasterSlaveDataSource masterSlaveDataSource;
    private final Map<String, DataSource> dataSourceMap;
    private final MasterSlaveSQLParseEntry parseEngine;
    @Override    public DatabaseMetaData getMetaData() throws SQLException {        return getCachedConnections().isEmpty() ? masterSlaveDataSource.getCachedDatabaseMetaData() : getCachedConnections().values().iterator().next().getMetaData();    }
    @Override    public Statement createStatement() {        return new MasterSlaveStatement(this);    }
    @Override    public Statement createStatement(final int resultSetType, final int resultSetConcurrency) {        return new MasterSlaveStatement(this, resultSetType, resultSetConcurrency);    }
       ...
    @Override    public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException {        return new MasterSlavePreparedStatement(this, sql, autoGeneratedKeys);    }
      ...}
  • 在MasterSlaveConnection的父类AbstractConnectionAdapter中有一个获取连接的方法,是要传入dataSourceName的:
代码语言:javascript
复制
 /**     * Get database connection.     *     * @param dataSourceName data source name     * @return database connection     * @throws SQLException SQL exception     */    public final Connection getConnection(final String dataSourceName) throws SQLException {        return getConnections(ConnectionMode.MEMORY_STRICTLY, dataSourceName, 1).get(0);    }
  • 在MasterSlaveConnection中有很多创建Statement和PreparedStatement的方法,熟悉jdbc编程的应该对这两个类的用途并不陌生吧,它是借助mysql-connector与数据库交互的非常重要的两个类。MasterSlavePreparedStatement是PreparedStatement的一个实现。我们继续看MasterSlavePreparedStatement的源码:
代码语言:javascript
复制
 public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int[] columnIndexes) throws SQLException {        //连接信息        this.connection = connection;        //路由,对于本文十分关键        masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(), connection.getParseEngine(),                connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW));        for (String each : masterSlaveRouter.route(sql, true)) {            PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, columnIndexes);            routedStatements.add(preparedStatement);        }    }
  • 接下来我们看一下MasterSlaveRouter,这个是讲解本文的核心类:
代码语言:javascript
复制
@RequiredArgsConstructorpublic final class MasterSlaveRouter {
    private final MasterSlaveRule masterSlaveRule;
    private final MasterSlaveSQLParseEntry parseEngine;
    private final boolean showSQL;
    /**     * Route Master slave.     *     * @param sql SQL     * @param useCache use cache or not     * @return data source names     */    // TODO for multiple masters may return more than one data source    public Collection<String> route(final String sql, final boolean useCache) {        Collection<String> result = route(parseEngine.parse(sql, useCache));        if (showSQL) {            SQLLogger.logSQL(sql, result);        }        return result;    }
    private Collection<String> route(final SQLStatement sqlStatement) {        if (isMasterRoute(sqlStatement)) {            MasterVisitedManager.setMasterVisited();            return Collections.singletonList(masterSlaveRule.getMasterDataSourceName());        }        return Collections.singletonList(masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(                masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames())));    }
    private boolean isMasterRoute(final SQLStatement sqlStatement) {        return !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();    }}
  • route方法会根据sql解析的结果返回需要路由到的datasources的datasourceName列表。这个route方法是在MasterSlavePreparedStatement的构造方法中调用的,它的主要作用是使用MasterSlaveConnection的 父类中的getConnection(final String dataSourceName)方法获取真正的dataSource连接并生成真正的prepareStatement放入MasterSlavePreparedStatement的routedStatements列表中。
  • route方法的masterSlaveRule.getLoadBalanceAlgorithm().getDataSource就是最终使用负载均衡算法的地方,这里使用的算法就是前面配置文件中配置后解析得到的。
  • parseEngine.parse(sql, useCache)会将sql解析成SQLStatement类型,我们看下这种类型的实现类:

其中读的sql会被解析引擎解析成SelectStatement类型。

  • isMasterRoute方法中!(sqlStatement instanceof SelectStatement)是读写分离的关键,如果不是SelectStatement类型的将会走主库。
  • isMasterRoute方法中HintManager.isMasterRouteOnly()是用来判断是否强制走主库的方法,它内部是通过一个threadLocal变量来维护是否走主库的状态值。很多业务处理上想让某个数据库服务强制走主库也是利用这一点来处理的。实现方式一般为注解+AOP:
代码语言:javascript
复制
@Documented@Inherited@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)public @interface MysqlMaster {
}
@Aspect@Componentpublic class DataSourceAspect {

    private static final Logger log = LoggerFactory.getLogger(DataSourceAspect.class);

    @Pointcut("@annotation(com.XX.MysqlMaster)")    public void anyMethod(){}
    @Around("anyMethod()")    public Object aroundMethod(ProceedingJoinPoint pjp) {        HintManager hintManager = HintManager.getInstance();
        try{            hintManager.setMasterRouteOnly();            obj =    pjp.proceed();        }catch (Throwable e){            log.error("-----------MySQL强制主库异常!Method name ={}----------------", pjp.getSignature().getName());            throw new DbException(e);        }finally {            hintManager.close();        }        return  obj;    }
}

然后在aop包裹的service上使用@MysqlMaster注解即可。

到这里我们关于读写分离和从库负载部分的代码已经解析完了。

3. 源码提供的负载算法(主要针对从库)

  • RandomMasterSlaveLoadBalanceAlgorithm:slaveDataSourceNames.get(new Random().nextInt(slaveDataSourceNames.size())) 在提供的从库里随机取一个。
  • RoundRobinMasterSlaveLoadBalanceAlgorithm:slaveDataSourceNames.get(Math.abs(count.getAndIncrement()) % slaveDataSourceNames.size()); 在提供的从库列表中顺序轮询。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-07-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 项目配置
  • 2. 源码分析
  • 3. 源码提供的负载算法(主要针对从库)
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档