工厂方法模式是使用抽象工厂(抽象类或接口)来生产抽象产品(抽象类或接口)的一个过程,由抽象工厂来决定抽象产品的生产过程,实际生产中由具体的工厂子类或者实现类来完成具体的产品子类或者实现类的生产。具体例子请参考 设计模式整理 。
在mybatis的datasource中,它的抽象工厂为
package org.apache.ibatis.datasource;
import java.util.Properties;
import javax.sql.DataSource;
public interface DataSourceFactory {
//设置DataSource的相关属性,一般紧跟在初始化完成之后
void setProperties(Properties var1);
//获取DataSource对象
DataSource getDataSource();
}
而它的抽象产品则为jdk中的DataSource接口
package javax.sql;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Wrapper;
public interface DataSource extends CommonDataSource, Wrapper {
Connection getConnection() throws SQLException;
Connection getConnection(String username, String password)
throws SQLException;
}
Mybatis中有两个具体的产品——UnpooledDataSource和PooledDataSource,我们先来看一下UnpooledDataSource,这是一个满足基本功能的没有连接池的DataSource产品,跟之前的自实现的事务差不多,具体参考 Spring事务说明与自实现 。可以先看到它的属性以及静态代码块,加载DriverManager中注册的JDBC Driver复制一份到registeredDrivers中。另外它有很多的构造器。而抽象产品(DataSource接口)中的两个getConnection方法都是由doGetConnection方法来实现的。
//加载Driver类的类加载器
private ClassLoader driverClassLoader;
//数据库连接驱动的相关配置
private Properties driverProperties;
//缓存所有已注册的数据库连接驱动
private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap();
//数据库连接的驱动名称
private String driver;
//数据库URL
private String url;
//用户名
private String username;
//密码
private String password;
//是否自动提交
private Boolean autoCommit;
//事务隔离级别
private Integer defaultTransactionIsolationLevel;
//将DriverManager中注册的JDBC驱动复制一份到registeredDrivers中
static {
Enumeration drivers = DriverManager.getDrivers();
while(drivers.hasMoreElements()) {
Driver driver = (Driver)drivers.nextElement();
registeredDrivers.put(driver.getClass().getName(), driver);
}
}
//接口中的方法
public Connection getConnection() throws SQLException {
return this.doGetConnection(this.username, this.password);
}
public Connection getConnection(String username, String password) throws SQLException {
return this.doGetConnection(username, password);
}
private Connection doGetConnection(String username, String password) throws SQLException {
//将各种属性放入props中
Properties props = new Properties();
if(this.driverProperties != null) {
props.putAll(this.driverProperties);
}
if(username != null) {
props.setProperty("user", username);
}
if(password != null) {
props.setProperty("password", password);
}
return this.doGetConnection(props);
}
private Connection doGetConnection(Properties properties) throws SQLException {
//初始化驱动
this.initializeDriver();
//获取数据库连接(properties中包含了username以及password)
Connection connection = DriverManager.getConnection(this.url, properties);
//设置该连接的事务相关配置
this.configureConnection(connection);
return connection;
}
private synchronized void initializeDriver() throws SQLException {
//如果已注册的数据库连接驱动没有当前的驱动
if(!registeredDrivers.containsKey(this.driver)) {
try {
Class driverType;
//反射加载当前驱动,并初始化
if(this.driverClassLoader != null) {
driverType = Class.forName(this.driver, true, this.driverClassLoader);
} else {
driverType = Resources.classForName(this.driver);
}
//根据Class实例来获取当前驱动本身的实例
Driver driverInstance = (Driver)driverType.newInstance();
//在DriverManager中注册该驱动的代理
DriverManager.registerDriver(new UnpooledDataSource.DriverProxy(driverInstance));
//将该驱动添加到registeredDrivers中
registeredDrivers.put(this.driver, driverInstance);
} catch (Exception var3) {
throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + var3);
}
}
}
private void configureConnection(Connection conn) throws SQLException {
//设置是否自动提交事务
if(this.autoCommit != null && this.autoCommit.booleanValue() != conn.getAutoCommit()) {
conn.setAutoCommit(this.autoCommit.booleanValue());
}
//设置事务的隔离等级
if(this.defaultTransactionIsolationLevel != null) {
conn.setTransactionIsolation(this.defaultTransactionIsolationLevel.intValue());
}
}
在DriverManager中是会去注册Drivers的所有信息的
// 注册JDBC驱动的列表
private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();
//注册driver
public static synchronized void registerDriver(java.sql.Driver driver,
DriverAction da)
throws SQLException {
/* Register the driver if it has not already been added to our list */
if(driver != null) {
//如果列表中不存在该驱动,则添加至末尾,否则不添加
registeredDrivers.addIfAbsent(new DriverInfo(driver, da));
} else {
// This is for compatibility with the original DriverManager
throw new NullPointerException();
}
println("registerDriver: " + driver);
}
现在我们来看一下与UnpooledDataSource对应的具体工厂实现类UnpooledDataSourceFactory。这种工厂,它只生产UnpooledDataSource这一种产品。
首先我们来看一下抽象工厂(DataSourceFactory接口)的setProperties方法
private static final String DRIVER_PROPERTY_PREFIX = "driver.";
private static final int DRIVER_PROPERTY_PREFIX_LENGTH = "driver.".length();
protected DataSource dataSource = new UnpooledDataSource();
public void setProperties(Properties properties) {
//Properties是继承于HashTable的子类,不接受null值,线程安全,单线程下比HashMap慢,它的迭代器是枚举型迭代器 enumerator
Properties driverProperties = new Properties();
//MetaObject是mybatis自带的一个反射工具,它的主要作用是通过反射给对象中拥有getter,setter的属性,设置属性值
MetaObject metaDataSource = SystemMetaObject.forObject(this.dataSource);
//获取方法参数属性集合的迭代器
Iterator var4 = properties.keySet().iterator();
//遍历该属性集合
while(var4.hasNext()) {
Object key = var4.next();
String propertyName = (String)key;
String value;
//如果该属性名称以driver.开头
if(propertyName.startsWith("driver.")) {
//获取该属性的值
value = properties.getProperty(propertyName);
//以"driver."开头的配置项是对DataSource的配置,记录到driverProperties中保存
driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
} else { //如果不以driver.开头
//如果该dataSource中没有Setter的属性,则抛出异常
if(!metaDataSource.hasSetter(propertyName)) {
throw new DataSourceException("Unknown DataSource property: " + propertyName);
}
value = (String)properties.get(propertyName);
//根据属性类型进行类型转换
Object convertedValue = this.convertValue(metaDataSource, propertyName, value);
//设置该dataSource的属性
metaDataSource.setValue(propertyName, convertedValue);
}
}
//如果driverProperties有值
if(driverProperties.size() > 0) {
//设置dataSource的该属性
metaDataSource.setValue("driverProperties", driverProperties);
}
}
抽象类工厂(DataSourceFactory接口)的getDataSource()的方法实现比较简单
public DataSource getDataSource() {
return this.dataSource;
}
现在我们来看一下它的另一个产品PooledDataSource,这是一个带数据库连接池的产品。数据库的连接过程非常耗时,数据库能够建立的连接数也非常有限,所以在绝大多数系统中,数据库连接是非常珍贵的资源,使用数据库连接池就显得尤为必要。数据库连接池在初始化时,一般会创建一定数量的数据库连接并添加到连接池中备用。当程序需要使用数据库时,从池中请求连接;当程序不再使用该连接时,会将其返回到池中缓存,等下下次使用,而不是直接关闭。而我们之前写的事务自实现中的连接则是每条线程执行完事务则会把连接关闭。
先看一下Mybatis中连接池类PooledConnection的实现,它是一个实现了动态代理的类
class PooledConnection implements InvocationHandler
它的核心字段如下
//记录当前PooledConnection对象所在的PooledDataSource对象。该PooledConnection是从该PooledDataSource中获取的;当调用close()方法时会将PooledConnection放回
//该PooledDataSource中
private final PooledDataSource dataSource;
//真正的数据库连接
private final Connection realConnection;
//数据库连接的代理对象
private final Connection proxyConnection;
//从连接池中取出该连接的时间戳
private long checkoutTimestamp;
//该连接创建的时间戳
private long createdTimestamp;
//最后一次被使用的时间戳
private long lastUsedTimestamp;
//由数据库URL,用户名,密码计算出来的hash值,可用于标识该连接所在的连接池
private int connectionTypeCode;
//检测当前PooledConnection是否有效,主要是为了防止程序通过close()方法将连接归还给连接池之后,依然通过该连接操作数据库
private boolean valid;
我们来看一下它的动态代理方法invoke,它代理的实际上是UnpooledDataSource中的getConnection方法返回的Connection本身实例的所有方法,用以对该连接进行增强。
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
//如果调用的方法为close,则将其重新放入连接池,而不是真正关闭数据库连接
if("close".hashCode() == methodName.hashCode() && "close".equals(methodName)) {
this.dataSource.pushConnection(this);
return null;
} else {
try {
if(!Object.class.equals(method.getDeclaringClass())) {
this.checkConnection(); //通过valid字段检测连接是否有效
}
//调用真正数据库连接对象的对应方法
return method.invoke(this.realConnection, args);
} catch (Throwable var6) {
throw ExceptionUtil.unwrapThrowable(var6);
}
}
}
private void checkConnection() throws SQLException {
if(!this.valid) {
throw new SQLException("Error accessing PooledConnection. Connection is invalid.");
}
}
PoolState是用于管理PooledConnection对象状态的组件,它通过两个ArrayList<PooledConnection>集合分别管理空闲状态的连接和活跃状态的连接
//空闲的PooledConnection集合
protected final List<PooledConnection> idleConnections = new ArrayList();
//活跃的PooledConnection集合
protected final List<PooledConnection> activeConnections = new ArrayList();
其他统计字段
//请求数据库连接的次数
protected long requestCount = 0L;
//获取连接的累计时间
protected long accumulatedRequestTime = 0L;
//记录所有连接累计的checkout时长
protected long accumulatedCheckoutTime = 0L;
//当连接长时间未归还给连接池时,会被认为该连接超时,记录超时的连接个数
protected long claimedOverdueConnectionCount = 0L;
//累计超时时间
protected long accumulatedCheckoutTimeOfOverdueConnections = 0L;
//累计等待时间
protected long accumulatedWaitTime = 0L;
//等待次数
protected long hadToWaitCount = 0L;
//无效的连接数
protected long badConnectionCount = 0L;
我们再来看一下PooledDataSource这个产品,它的主要字段如下
//通过PoolState管理连接池的状态并记录统计信息
private final PoolState state = new PoolState(this);
//真正管理数据库连接的对象,用于生成真实的数据库连接对象
private final UnpooledDataSource dataSource;
//最大活跃连接数
protected int poolMaximumActiveConnections = 10;
//最大空闲连接数
protected int poolMaximumIdleConnections = 5;
//最大checkout时长
protected int poolMaximumCheckoutTime = 20000;
//在无法获取连接时,线程需要等待的时间
protected int poolTimeToWait = 20000;
// 每一个尝试从缓存池获取连接的线程. 如果这个线程获取到的是一个坏的连接,那么这个数据源允许这个线程尝试重新获取一个新的连接,但是这个重新尝试的次数不应该超
//过 poolMaximumIdleConnections 与 poolMaximumLocalBadConnectionTolerance 之和。
protected int poolMaximumLocalBadConnectionTolerance = 3;
//在检测一个数据库是否可用时,会给数据库发送一个测试SQL语句
protected String poolPingQuery = "NO PING QUERY SET";
//是否允许发送测试SQL语句
protected boolean poolPingEnabled;
//当连接超过poolPingConnectionsNotUsedFor毫秒未使用时,会发送一次测试SQL语句,检测连接是否正常
protected int poolPingConnectionsNotUsedFor;
//根据数据库的URL、用户名和密码生成的一个hash值,该哈希值用于标志着当前的连接池
private int expectedConnectionTypeCode;
接口中要实现的两个方法
public Connection getConnection() throws SQLException {
return this.popConnection(this.dataSource.getUsername(), this.dataSource.getPassword()).getProxyConnection();
}
public Connection getConnection(String username, String password) throws SQLException {
return this.popConnection(username, password).getProxyConnection();
}
我们可以看到它都是由popConnection来实现的
private PooledConnection popConnection(String username, String password) throws SQLException {
boolean countedWait = false;
PooledConnection conn = null;
long t = System.currentTimeMillis();
int localBadConnectionCount = 0;
//如果连接代理为空
while(conn == null) {
//获取连接池状态
PoolState var8 = this.state;
synchronized(this.state) { //加锁同步状态
//检测空闲连接(不为空)
if(!this.state.idleConnections.isEmpty()) {
//取出第一个连接(集合中第0个)
conn = (PooledConnection)this.state.idleConnections.remove(0);
if(log.isDebugEnabled()) {
log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
}
//如果没有空闲连接且活跃连接数没有到达最大值
} else if(this.state.activeConnections.size() < this.poolMaximumActiveConnections) {
//创建一个线程池连接,该连接实际上是UnpooledDataSource的数据库连接的代理对象实例
conn = new PooledConnection(this.dataSource.getConnection(), this);
if(log.isDebugEnabled()) {
log.debug("Created connection " + conn.getRealHashCode() + ".");
}
//如果没有空闲连接且活跃连接数已经最大了,无法创建新连接
} else {
//取得最早的活跃连接
PooledConnection oldestActiveConnection = (PooledConnection)this.state.activeConnections.get(0);
//获取该连接的过期时间
long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
//检测该连接是否超时
if(longestCheckoutTime > (long)this.poolMaximumCheckoutTime) {
//如果已超时,以下三条代码都是对超时连接信息进行统计
++this.state.claimedOverdueConnectionCount;
this.state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
this.state.accumulatedCheckoutTime += longestCheckoutTime;
//将该超时的连接移除
this.state.activeConnections.remove(oldestActiveConnection);
//如果该超时连接的事务未提交
if(!oldestActiveConnection.getRealConnection().getAutoCommit()) {
try {
//对该超时连接的事务进行回滚
oldestActiveConnection.getRealConnection().rollback();
} catch (SQLException var16) {
log.debug("Bad connection. Could not roll back");
}
}
//再次建立该超时连接的真实连接的新代理,因为该超时连接本身就是一个代理,需要拿到它的目标对象实例
conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
//将该超时连接设置为无效
oldestActiveConnection.invalidate();
if(log.isDebugEnabled()) {
log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
}
//无空闲连接,活跃连接数最大无法创建新连接,且无超时连接
} else {
try {
if(!countedWait) {
//统计等待次数
++this.state.hadToWaitCount;
countedWait = true;
}
if(log.isDebugEnabled()) {
log.debug("Waiting as long as " + this.poolTimeToWait + " milliseconds for connection.");
}
long wt = System.currentTimeMillis();
//线程阻塞等待
this.state.wait((long)this.poolTimeToWait);
//统计累计的等待时间
this.state.accumulatedWaitTime += System.currentTimeMillis() - wt;
} catch (InterruptedException var17) {
break;
}
}
}
//如果连接代理不为空
if(conn != null) {
//检测连接代理是否有效
if(conn.isValid()) {
//如果有效且事务未提交则回滚事务
if(!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
//配置连接代理的相关属性
conn.setConnectionTypeCode(this.assembleConnectionTypeCode(this.dataSource.getUrl(), username, password));
conn.setCheckoutTimestamp(System.currentTimeMillis());
conn.setLastUsedTimestamp(System.currentTimeMillis());
//将该连接代理添加到活跃连接中
this.state.activeConnections.add(conn);
//进行相关统计
++this.state.requestCount;
this.state.accumulatedRequestTime += System.currentTimeMillis() - t;
//如果无效
} else {
if(log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
}
//进行一些错误连接的相关统计
++this.state.badConnectionCount;
++localBadConnectionCount;
conn = null;
if(localBadConnectionCount > this.poolMaximumIdleConnections + this.poolMaximumLocalBadConnectionTolerance) {
if(log.isDebugEnabled()) {
log.debug("PooledDataSource: Could not get a good connection to the database.");
}
throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
}
}
}
}
}
if(conn == null) {
if(log.isDebugEnabled()) {
log.debug("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
}
throw new SQLException("PooledDataSource: Unknown severe error condition. The connection pool returned a null connection.");
} else {
return conn;
}
}
由以上代码可知,带线程池的DataSource不过是对连接进行了一次动态代理,增强连接方法,并将代理连接放入集合中处理。
图片来源网络
现在我们来看一下close()后将该连接代理重新放入连接池的相关代码pushConnection(以下没有特别说明,连接均指连接代理)
protected void pushConnection(PooledConnection conn) throws SQLException {
//获取连接池状态
PoolState var2 = this.state;
synchronized(this.state) { //锁同步该状态
//从活动连接中移除连接
this.state.activeConnections.remove(conn);
//判断连接是否有效
if(conn.isValid()) {
//如果有效,检测空闲连接池是否已达到上限以及该连接是否是连接池中的连接
if(this.state.idleConnections.size() < this.poolMaximumIdleConnections && conn.getConnectionTypeCode() == this.expectedConnectionTypeCode) {
//累计checkout时长
this.state.accumulatedCheckoutTime += conn.getCheckoutTime();
//回滚未提交的事务
if(!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
//为返还连接创建新的连接对象
PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
//空闲连接加入该新对象
this.state.idleConnections.add(newConn);
newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
//将原连接对象设置无效
conn.invalidate();
if(log.isDebugEnabled()) {
log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
}
//唤醒所有阻塞中的线程
this.state.notifyAll();
//空闲连接数已达到上限或该连接不属于该连接池
} else {
//累计checkout时长
this.state.accumulatedCheckoutTime += conn.getCheckoutTime();
//回滚未提交的事务
if(!conn.getRealConnection().getAutoCommit()) {
conn.getRealConnection().rollback();
}
//关闭真正的数据库连接
conn.getRealConnection().close();
if(log.isDebugEnabled()) {
log.debug("Closed connection " + conn.getRealHashCode() + ".");
}
//将该连接设置为无效
conn.invalidate();
}
} else {
if(log.isDebugEnabled()) {
log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
}
//统计无效连接对象个数
++this.state.badConnectionCount;
}
}
}
这其中都有对连接池(连接代理)的有效监测conn.isValid(),我们来看一下它的实现
public boolean isValid() {
return this.valid && this.realConnection != null && this.dataSource.pingConnection(this);
}
其中this.dataSource.pingConnection(this)就是进行一个心跳检测
protected boolean pingConnection(PooledConnection conn) {
boolean result = true;
try {
//检测真正的数据库连接是否已经关闭
result = !conn.getRealConnection().isClosed();
} catch (SQLException var8) {
if(log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + var8.getMessage());
}
result = false;
}
//检测是否运行测试SQL语句且长时间未使用的连接才需要ping操作来检测数据库连接是否正常
if(result && this.poolPingEnabled && this.poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > (long)this.poolPingConnectionsNotUsedFor) {
try {
if(log.isDebugEnabled()) {
log.debug("Testing connection " + conn.getRealHashCode() + " ...");
}
//执行心跳检测SQL语句
Connection realConn = conn.getRealConnection();
Statement statement = realConn.createStatement();
ResultSet rs = statement.executeQuery(this.poolPingQuery);
rs.close();
statement.close();
if(!realConn.getAutoCommit()) {
realConn.rollback();
}
result = true;
if(log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is GOOD!");
}
} catch (Exception var7) {
log.warn("Execution of ping query '" + this.poolPingQuery + "' failed: " + var7.getMessage());
try {
conn.getRealConnection().close();
} catch (Exception var6) {
;
}
result = false;
if(log.isDebugEnabled()) {
log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + var7.getMessage());
}
}
}
return result;
}
现在我们来看一下该dataSource对应的工厂PooledDataSourceFactory,它是UnpooledDataSourceFactory的子类,只是将UnpooledDataSourceFactory的dataSource属性由UnpooledDataSource改成了带线程池的PooledDataSource。
public class PooledDataSourceFactory extends UnpooledDataSourceFactory {
public PooledDataSourceFactory() {
this.dataSource = new PooledDataSource();
}
}