我们在分析mybatis执行sql的时候,最终定位到数据库连接池上。当时分析到mybatis通过数据库连接池获取到链接,然后通过连接执行sql。
所以这块作者的想法还是和以前一样,先投入进去不管整体,先搞清楚基础逻辑,然后进行细节思考。最后在考虑springBoot的配置bean。基于此,我们首先看一下获取数据库连接的问题。在之前的学习中,我们知道juc中有线程池,那么druid的数据库连接池和线程池有什么区别?怀着问题,我们开始吧!
我们先学习一下DruidDateSource
我们看到该类下有很多参数,而这些参数基本都是我们配置的druid参数
作者通过debug调试,发现这些配置的读取却是从环境变量中读取的。同时我们也注意到其中的connections是volatile修饰的,说明对线程可见。
再进一步,我们看一下getConnection的实现逻辑
public DruidPooledConnection getConnection() throws SQLException {
//通过maxwait参数获取,这里maxwait表示最大超时时间
return this.getConnection(this.maxWait);
}
public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
//这里进行初始化
this.init();
//对过滤器进行判断
if (this.filters.size() > 0) {
//过滤器链
FilterChainImpl filterChain = new FilterChainImpl(this);
return filterChain.dataSource_connect(this, maxWaitMillis);
} else {
//获取连接
return this.getConnectionDirect(maxWaitMillis);
}
}
在初始化上
public void init() throws SQLException {
//先判断是否已经初始化
if (!this.inited) {
DruidDriver.getInstance();
//这里加锁
ReentrantLock lock = this.lock;
try {
lock.lockInterruptibly();
} catch (InterruptedException var16) {
throw new SQLException("interrupt", var16);
}
boolean init = false;
boolean var15 = false;
String msg;
label444: {
try {
var15 = true;
if (this.inited) {
var15 = false;
break label444;
}
this.initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());
this.id = (long)DruidDriver.createDataSourceId();
if (this.id > 1L) {
long delta = (this.id - 1L) * 100000L;
connectionIdSeedUpdater.addAndGet(this, delta);
statementIdSeedUpdater.addAndGet(this, delta);
resultSetIdSeedUpdater.addAndGet(this, delta);
transactionIdSeedUpdater.addAndGet(this, delta);
}
//删除一些判断性质的代码片段
this.initFromSPIServiceLoader();
this.resolveDriver();
this.initCheck();
this.initExceptionSorter();
this.initValidConnectionChecker();
this.validationQueryCheck();
if (this.isUseGlobalDataSourceStat()) {
this.dataSourceStat = JdbcDataSourceStat.getGlobal();
if (this.dataSourceStat == null) {
this.dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbType);
JdbcDataSourceStat.setGlobal(this.dataSourceStat);
}
if (this.dataSourceStat.getDbType() == null) {
this.dataSourceStat.setDbType(this.dbType);
}
} else {
this.dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbType, this.connectProperties);
}
this.dataSourceStat.setResetStatEnable(this.resetStatEnable);
//初始化连接的数组
this.connections = new DruidConnectionHolder[this.maxActive];
this.evictConnections = new DruidConnectionHolder[this.maxActive];
this.keepAliveConnections = new DruidConnectionHolder[this.maxActive];
SQLException connectError = null;
int i;
//如果是异步创建的或者计划执行的,就提交异步任务进行创建
if (this.createScheduler != null && this.asyncInit) {
for(i = 0; i < this.initialSize; ++i) {
this.submitCreateTask(true);
}
} else if (!this.asyncInit) {
while(true) {
//创建初始容量
if (this.poolingCount < this.initialSize) {
try {
PhysicalConnectionInfo pyConnectInfo = this.createPhysicalConnection();
//创建核心连接
DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
this.connections[this.poolingCount++] = holder;
continue;
} catch (SQLException var17) {
LOG.error("init datasource error, url: " + this.getUrl(), var17);
if (!this.initExceptionThrow) {
Thread.sleep(3000L);
continue;
}
}
connectError = var17;
}
if (this.poolingCount > 0) {
this.poolingPeak = this.poolingCount;
this.poolingPeakTime = System.currentTimeMillis();
}
break;
}
}
//创建线程的监听
this.createAndLogThread();
//创建启动线程的线程
this.createAndStartCreatorThread();
//销毁线程的线程,其中和keepalived有关系
this.createAndStartDestroyThread();
this.initedLatch.await();
init = true;
this.initedTime = new Date();
this.registerMbean();
if (connectError != null && this.poolingCount == 0) {
throw connectError;
}
if (this.keepAlive) {
if (this.createScheduler != null) {
for(i = 0; i < this.minIdle; ++i) {
this.submitCreateTask(true);
}
var15 = false;
} else {
this.emptySignal();
var15 = false;
}
} else {
var15 = false;
}
} catch (SQLException var18) {
LOG.error("{dataSource-" + this.getID() + "} init error", var18);
throw var18;
} catch (InterruptedException var19) {
throw new SQLException(var19.getMessage(), var19);
} catch (RuntimeException var20) {
LOG.error("{dataSource-" + this.getID() + "} init error", var20);
throw var20;
} catch (Error var21) {
LOG.error("{dataSource-" + this.getID() + "} init error", var21);
throw var21;
} finally {
if (var15) {
this.inited = true;
lock.unlock();
if (init && LOG.isInfoEnabled()) {
String msg = "{dataSource-" + this.getID();
if (this.name != null && !this.name.isEmpty()) {
msg = msg + ",";
msg = msg + this.name;
}
msg = msg + "} inited";
LOG.info(msg);
}
}
}
this.inited = true;
lock.unlock();
if (init && LOG.isInfoEnabled()) {
msg = "{dataSource-" + this.getID();
if (this.name != null && !this.name.isEmpty()) {
msg = msg + ",";
msg = msg + this.name;
}
msg = msg + "} inited";
LOG.info(msg);
}
return;
}
this.inited = true;
//解锁,保证过程只执行一次
lock.unlock();
if (init && LOG.isInfoEnabled()) {
msg = "{dataSource-" + this.getID();
if (this.name != null && !this.name.isEmpty()) {
msg = msg + ",";
msg = msg + this.name;
}
msg = msg + "} inited";
LOG.info(msg);
}
}
}
通过上述代码,我们了解到druid其实也是基于数组实现,先创建最大容量的数组,然后初始化核心连接数。其中采用了重入锁ReentrantLock保证代码只执行一次。这里的poolingCount从0开始累计,表征数据库连接的个数。而方法
PhysicalConnectionInfo pyConnectInfo = this.createPhysicalConnection();
才是创建数据库连接核心方法。
这里我们也看到了countdownlatch,我们知道其实计数用的,当其值为0表示线程要执行的任务全部执行了。
继续跟踪代码,我们发现在获取连接的方式是:
poolableConnection = this.getConnectionInternal(maxWaitMillis);
if (maxWait > 0L) {
//从连接数组中获取
holder = this.pollLast(nanos);
} else {
//直接取最后一个
holder = this.takeLast();
}
if (holder == null) {
break;
}
this.decrementPoolingCount();
//从pollingCount的位置取
DruidConnectionHolder last = this.connections[this.poolingCount];
//设置为空
this.connections[this.poolingCount] = null;
long waitNanos = nanos - estimate;
last.setLastNotEmptyWaitNanos(waitNanos);
return last;
在getConnection连接的时候,druid通过配置文件的参数maxWaitMillis决定是否延迟获取连接,然后从连接数组中获取连接。获取完毕之后就将连接设置为空。最后将获取的连接进行封装然后返回。
获取连接的时候
DruidConnectionHolder takeLast() throws InterruptedException, SQLException {
try {
//当没有连接的时候
while(this.poolingCount == 0) {
//发送为空信号,创建连接
this.emptySignal();
if (this.failFast && this.isFailContinuous()) {
throw new DataSourceNotAvailableException(this.createError);
}
++this.notEmptyWaitThreadCount;
if (this.notEmptyWaitThreadCount > this.notEmptyWaitThreadPeak) {
this.notEmptyWaitThreadPeak = this.notEmptyWaitThreadCount;
}
try {
//不为空
this.notEmpty.await();
} finally {
--this.notEmptyWaitThreadCount;
}
++this.notEmptyWaitCount;
if (!this.enable) {
connectErrorCountUpdater.incrementAndGet(this);
if (this.disableException != null) {
throw this.disableException;
}
throw new DataSourceDisableException();
}
}
} catch (InterruptedException var5) {
this.notEmpty.signal();
++this.notEmptySignalCount;
throw var5;
}
//有连接了,然后获取连接,并将poolingcount进行减一操作
this.decrementPoolingCount();
DruidConnectionHolder last = this.connections[this.poolingCount];
this.connections[this.poolingCount] = null;
return last;
}
这块先判断是否有连接,有连接的话直接取,没有连接的时候就发送空信号进行创建,创建结束只之后进行获取连接并修改相关连接参数等。
至此我们大概得了解了druid获取链接的一些信息。首先从环境变量中获取配置参数,在获取连接的时候通过加锁的方式进行连接池的初始化,初始化的时候通过配置设置的连接容量进行实例化连接,在获取连接的时候,先判断是否有连接,没有连接的情况下会通过create线程去创建,在获取连接之后就会将连接数减一,并将连接所在的数据位置置为空。