最近项目中用到了sharding-jdbc的shardingDataSource来配置MasterSlaveLoadBalanceAlgorithm来做读写分离和从库负载,本文针对使用方法和源码分析来聊聊其中的原理。
<!-- 数据源 --> <bean id="userDataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close" primary="true"> <property name="url" value="${jdbc.url}" /> <property name="username" value="${jdbc.username}" /> <property name="password" value="${jdbc.password}" />
<!-- 初始化连接大小 --> <property name="initialSize" value="${jdbc.initialSize}" /> <!-- 连接池最大使用连接数量 --> <property name="maxActive" value="${jdbc.maxActive}" /> <!-- 获取连接最大等待时间 --> <property name="maxWait" value="${jdbc.maxWait}" /> <!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 --> <property name="timeBetweenEvictionRunsMillis" value="60000" /> <!-- 配置一个连接在池中最小生存的时间,单位是毫秒 --> <property name="minEvictableIdleTimeMillis" value="300000" />
<property name="validationQuery" value="SELECT 1" /> <property name="testWhileIdle" value="true" /> <property name="testOnBorrow" value="false" /> <property name="testOnReturn" value="false" /> <!-- 打开removeAbandoned功能 --> <property name="removeAbandoned" value="true" /> <!-- 1800秒,也就是30分钟 --> <property name="removeAbandonedTimeout" value="1800" /> <!-- 关闭abanded连接时输出错误日志 --> <property name="logAbandoned" value="true" /> <!-- publicke设置 --> <!--<property name="connectionProperties"--> <!--value="config.decrypt=true;config.decrypt.key=${jdbc.publickey}" />--> <!-- 监控数据库 (建议平时开发中,打开该配置,上线的时候关闭掉) --> <!-- 属性类型是字符串,通过别名的方式配置扩展插件,常用的插件有:监控统计用的filter:stat; 日志用的filter:log4j;防御sql注入的filter:wall;合并sql的filter:mergeStat --> <property name="filters" value="${jdbc.filters}" /> <property name="proxyFilters"> <list> <ref bean="log-filter" /> </list> </property> </bean>
<bean id="userDataSourceSlave" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="${jdbc.url.slave}"></property> <property name="username" value="${jdbc.username.slave}"></property> <property name="password" value="${jdbc.password.slave}"></property> <property name="maxActive" value="100" /> <property name="initialSize" value="10" /> <property name="maxWait" value="60000" /> <property name="minIdle" value="5" /> </bean>
<bean id="randomStrategy" class="io.shardingjdbc.core.api.algorithm.masterslave.RandomMasterSlaveLoadBalanceAlgorithm" />
<master-slave:data-source id="shardingDataSource" master-data-source-name="userDataSource" slave-data-source-names="userDataSourceSlave" strategy-ref="randomStrategy" />
<!-- 配置MyBatis session工厂 --> <bean id="userSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> <property name="dataSource" ref="shardingDataSource" /> <property name="configLocation" value="classpath:/conf/mybaits-config.xml" /> <property name="mapperLocations"> <list> <value>classpath:/**/dao/mapper/*Mapper.xml</value> </list> </property> ...
使用上面的配置,就可以使用shardingjdbc的RandomMasterSlaveLoadBalanceAlgorithm算法进行读写分离和从库负载了。那么具体的实现是什么样的呢?下面就来一步步看看。
由于使用的是spring,那么先来看一看读取配置文件的入口spring.handlers文件:
org.apache.shardingsphere.shardingjdbc.spring.namespace.handler.MasterSlaveNamespaceHandler的定义如下:
接下来看里面具体解析部分:
/** * Master-slave data source parser for spring namespace. * * @author zhangliang * @author zhaojun */public final class MasterSlaveDataSourceBeanDefinitionParser extends AbstractBeanDefinitionParser {
@Override protected AbstractBeanDefinition parseInternal(final Element element, final ParserContext parserContext) { //这个BeanDefinitionBuilder主要用于构建SpringMasterSlaveDataSource BeanDefinitionBuilder factory = BeanDefinitionBuilder.rootBeanDefinition(SpringMasterSlaveDataSource.class); //内部是将master和slave的配置信息读取,然后设置好相应的规则 @see org.apache.shardingsphere.shardingjdbc.spring.namespace.parser.MasterSlaveDataSourceBeanDefinitionParser.parseDataSources的注释 MasterSlaveRuleConfigurationBeanDefinition configurationBeanDefinition = new MasterSlaveRuleConfigurationBeanDefinition(element); //用于构建SpringMasterSlaveDataSource构造方法的第一个参数 factory.addConstructorArgValue(parseDataSources(configurationBeanDefinition)); //用于构建SpringMasterSlaveDataSource构造方法的第二个参数 factory.addConstructorArgValue(configurationBeanDefinition.getBeanDefinition()); //用于构建SpringMasterSlaveDataSource的第三个参数 factory.addConstructorArgValue(parseProperties(element, parserContext)); return factory.getBeanDefinition(); }
/** * 为了理解这个方法,可以看下SpringMasterSlaveDataSource的构造方法org.apache.shardingsphere.shardingjdbc.spring.datasource.SpringMasterSlaveDataSource: * public SpringMasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfiguration, final Properties props) throws SQLException { * super(dataSourceMap, new MasterSlaveRule(masterSlaveRuleConfiguration), null == props ? new Properties() : props); * } * 没错,这里就是用来构建这个构造方法的第一个参数 * @param configurationBeanDefinition * @return */ private Map<String, RuntimeBeanReference> parseDataSources(final MasterSlaveRuleConfigurationBeanDefinition configurationBeanDefinition) { List<String> slaveDataSources = Splitter.on(",").trimResults().splitToList(configurationBeanDefinition.getSlaveDataSourceNames()); Map<String, RuntimeBeanReference> result = new ManagedMap<>(slaveDataSources.size()); for (String each : slaveDataSources) { result.put(each, new RuntimeBeanReference(each)); } String masterDataSourceName = configurationBeanDefinition.getMasterDataSourceName(); result.put(masterDataSourceName, new RuntimeBeanReference(masterDataSourceName)); return result; }
private Properties parseProperties(final Element element, final ParserContext parserContext) { Element propsElement = DomUtils.getChildElementByTagName(element, ShardingDataSourceBeanDefinitionParserTag.PROPS_TAG); return null == propsElement ? new Properties() : parserContext.getDelegate().parsePropsElement(propsElement); }}
这里主要用于构建datasource供持久层如hibernate、myibatis等使用,本文示例中配置的是myibatis。
到这里,我们主要开始看org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.MasterSlaveDataSource的源码:
@Getterpublic class MasterSlaveDataSource extends AbstractDataSourceAdapter { /** * 元数据信息 */ private final DatabaseMetaData cachedDatabaseMetaData;
/** * 主从的规则 */ private final MasterSlaveRule masterSlaveRule;
/** * sql解析引擎,将一个sql解析成SQLStatement,对应读的操作就是SelectStatement */ private final MasterSlaveSQLParseEntry parseEngine;
/** * 分片的配置 */ private final ShardingProperties shardingProperties;
public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig, final Properties props) throws SQLException { super(dataSourceMap); cachedDatabaseMetaData = createCachedDatabaseMetaData(dataSourceMap); this.masterSlaveRule = new MasterSlaveRule(masterSlaveRuleConfig); parseEngine = new MasterSlaveSQLParseEntry(getDatabaseType()); shardingProperties = new ShardingProperties(null == props ? new Properties() : props); } ...
@Override public final MasterSlaveConnection getConnection() { return new MasterSlaveConnection(this, getDataSourceMap(), parseEngine); }}
datasource的主要作用是获取connection,这里是MasterSlaveConnection:
@RequiredArgsConstructor@Getterpublic final class MasterSlaveConnection extends AbstractConnectionAdapter {
private final MasterSlaveDataSource masterSlaveDataSource;
private final Map<String, DataSource> dataSourceMap;
private final MasterSlaveSQLParseEntry parseEngine;
@Override public DatabaseMetaData getMetaData() throws SQLException { return getCachedConnections().isEmpty() ? masterSlaveDataSource.getCachedDatabaseMetaData() : getCachedConnections().values().iterator().next().getMetaData(); }
@Override public Statement createStatement() { return new MasterSlaveStatement(this); }
@Override public Statement createStatement(final int resultSetType, final int resultSetConcurrency) { return new MasterSlaveStatement(this, resultSetType, resultSetConcurrency); }
...
@Override public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException { return new MasterSlavePreparedStatement(this, sql, autoGeneratedKeys); }
...}
/** * Get database connection. * * @param dataSourceName data source name * @return database connection * @throws SQLException SQL exception */ public final Connection getConnection(final String dataSourceName) throws SQLException { return getConnections(ConnectionMode.MEMORY_STRICTLY, dataSourceName, 1).get(0); }
public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int[] columnIndexes) throws SQLException { //连接信息 this.connection = connection; //路由,对于本文十分关键 masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(), connection.getParseEngine(), connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW)); for (String each : masterSlaveRouter.route(sql, true)) { PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, columnIndexes); routedStatements.add(preparedStatement); } }
@RequiredArgsConstructorpublic final class MasterSlaveRouter {
private final MasterSlaveRule masterSlaveRule;
private final MasterSlaveSQLParseEntry parseEngine;
private final boolean showSQL;
/** * Route Master slave. * * @param sql SQL * @param useCache use cache or not * @return data source names */ // TODO for multiple masters may return more than one data source public Collection<String> route(final String sql, final boolean useCache) { Collection<String> result = route(parseEngine.parse(sql, useCache)); if (showSQL) { SQLLogger.logSQL(sql, result); } return result; }
private Collection<String> route(final SQLStatement sqlStatement) { if (isMasterRoute(sqlStatement)) { MasterVisitedManager.setMasterVisited(); return Collections.singletonList(masterSlaveRule.getMasterDataSourceName()); } return Collections.singletonList(masterSlaveRule.getLoadBalanceAlgorithm().getDataSource( masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames()))); }
private boolean isMasterRoute(final SQLStatement sqlStatement) { return !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly(); }}
其中读的sql会被解析引擎解析成SelectStatement类型。
@Documented@Inherited@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)public @interface MysqlMaster {
}
@Aspect@Componentpublic class DataSourceAspect {
private static final Logger log = LoggerFactory.getLogger(DataSourceAspect.class);
@Pointcut("@annotation(com.XX.MysqlMaster)") public void anyMethod(){}
@Around("anyMethod()") public Object aroundMethod(ProceedingJoinPoint pjp) { HintManager hintManager = HintManager.getInstance();
try{ hintManager.setMasterRouteOnly(); obj = pjp.proceed(); }catch (Throwable e){ log.error("-----------MySQL强制主库异常!Method name ={}----------------", pjp.getSignature().getName()); throw new DbException(e); }finally { hintManager.close(); } return obj; }
}
然后在aop包裹的service上使用@MysqlMaster注解即可。
到这里我们关于读写分离和从库负载部分的代码已经解析完了。