前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[MyBatis] MyBatis 数据源模块实现原理剖析

[MyBatis] MyBatis 数据源模块实现原理剖析

作者头像
架构探险之道
发布2020-03-10 16:10:27
6210
发布2020-03-10 16:10:27
举报
文章被收录于专栏:架构探险之道架构探险之道

[MyBatis] MyBatis 数据源模块实现原理剖析

简介

数据库连接池技术是提升数据库访问效率常用的手段,使用连接池可以提高连接资源的复用性,避免频繁创建、关闭连接资源带来的开销,池化技术也是大厂高频面试题。Mybatis 内部就带了一个连接池的实现,接下来重点解析连接池技术的数据结构和算法。


手机用户请横屏获取最佳阅读体验,REFERENCES中是本文参考的链接,如需要链接和更多资源,可以关注其他博客发布地址。

平台

地址

CSDN

https://blog.csdn.net/sinat_28690417

简书

https://www.jianshu.com/u/3032cc862300

个人博客

https://zacsnz.github.io/NoteBooks/


正文

数据库连接池管理

先重点分析下跟连接池相关的关键类:

  1. PooledDataSource:一个简单,同步的、线程安全的数据库连接池
  2. PooledConnection:使用动态代理封装了真正的数据库连接对象,在连接使用之前和关 闭时进行增强;
  3. PoolState:用于管理 PooledConnection 对象状态的组件,通过两个 List 分别管理空闲状态的连接资源和活跃状态的连接资源,

如下图,需要注意的是这两个 List 使用 ArrayList 实现,存在并发安全的问题,因此在使用时,注意加上同步控制;重点解析获取资源和回收资源的流程,获取连接资源的过程如下图:

流程分析

.

源码分析

  • PooledDataSource
代码语言:javascript
复制
/**
 *    Copyright 2009-2017 the original author or authors.
 *
 *    Licensed under the Apache License, Version 2.0 (the "License");
 *    you may not use this file except in compliance with the License.
 *    You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *    Unless required by applicable law or agreed to in writing, software
 *    distributed under the License is distributed on an "AS IS" BASIS,
 *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *    See the License for the specific language governing permissions and
 *    limitations under the License.
 */
package org.apache.ibatis.datasource.pooled;

import java.io.PrintWriter;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import java.util.logging.Logger;

import javax.sql.DataSource;

import org.apache.ibatis.datasource.unpooled.UnpooledDataSource;
import org.apache.ibatis.logging.Log;
import org.apache.ibatis.logging.LogFactory;

/**
 * This is a simple, synchronous, thread-safe database connection pool.
 *
 * @author Clinton Begin
 */
public class PooledDataSource implements DataSource {

  private static final Log log = LogFactory.getLog(PooledDataSource.class);

  private final PoolState state = new PoolState(this);

  private final UnpooledDataSource dataSource;

  // OPTIONAL CONFIGURATION FIELDS
  protected int poolMaximumActiveConnections = 10;
  protected int poolMaximumIdleConnections = 5;
  protected int poolMaximumCheckoutTime = 20000;
  protected int poolTimeToWait = 20000;
  protected int poolMaximumLocalBadConnectionTolerance = 3;
  protected String poolPingQuery = "NO PING QUERY SET";
  protected boolean poolPingEnabled;
  protected int poolPingConnectionsNotUsedFor;

  private int expectedConnectionTypeCode;

  public PooledDataSource() {
    dataSource = new UnpooledDataSource();
  }

  public PooledDataSource(UnpooledDataSource dataSource) {
    this.dataSource = dataSource;
  }

  public PooledDataSource(String driver, String url, String username, String password) {
    dataSource = new UnpooledDataSource(driver, url, username, password);
    expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
  }

  public PooledDataSource(String driver, String url, Properties driverProperties) {
    dataSource = new UnpooledDataSource(driver, url, driverProperties);
    expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
  }

  public PooledDataSource(ClassLoader driverClassLoader, String driver, String url, String username, String password) {
    dataSource = new UnpooledDataSource(driverClassLoader, driver, url, username, password);
    expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
  }

  public PooledDataSource(ClassLoader driverClassLoader, String driver, String url, Properties driverProperties) {
    dataSource = new UnpooledDataSource(driverClassLoader, driver, url, driverProperties);
    expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
  }

  @Override
    public Connection getConnection() throws SQLException {
      return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
  }

  @Override
  public Connection getConnection(String username, String password) throws SQLException {
    return popConnection(username, password).getProxyConnection();
  }

  @Override
  public void setLoginTimeout(int loginTimeout) throws SQLException {
    DriverManager.setLoginTimeout(loginTimeout);
  }

  @Override
  public int getLoginTimeout() throws SQLException {
    return DriverManager.getLoginTimeout();
  }

  @Override
  public void setLogWriter(PrintWriter logWriter) throws SQLException {
    DriverManager.setLogWriter(logWriter);
  }

  @Override
  public PrintWriter getLogWriter() throws SQLException {
    return DriverManager.getLogWriter();
  }

  public void setDriver(String driver) {
    dataSource.setDriver(driver);
    forceCloseAll();
  }

  public void setUrl(String url) {
    dataSource.setUrl(url);
    forceCloseAll();
  }

  public void setUsername(String username) {
    dataSource.setUsername(username);
    forceCloseAll();
  }

    public void setPassword(String password) {
      dataSource.setPassword(password);
    forceCloseAll();
  }

  public void setDefaultAutoCommit(boolean defaultAutoCommit) {
    dataSource.setAutoCommit(defaultAutoCommit);
    forceCloseAll();
  }

  public void setDefaultTransactionIsolationLevel(Integer defaultTransactionIsolationLevel) {
    dataSource.setDefaultTransactionIsolationLevel(defaultTransactionIsolationLevel);
    forceCloseAll();
  }

  public void setDriverProperties(Properties driverProps) {
    dataSource.setDriverProperties(driverProps);
    forceCloseAll();
  }

  /*
   * The maximum number of active connections
   *
   * @param poolMaximumActiveConnections The maximum number of active connections
   */
  public void setPoolMaximumActiveConnections(int poolMaximumActiveConnections) {
    this.poolMaximumActiveConnections = poolMaximumActiveConnections;
    forceCloseAll();
  }

  /*
   * The maximum number of idle connections
   *
   * @param poolMaximumIdleConnections The maximum number of idle connections
   */
  public void setPoolMaximumIdleConnections(int poolMaximumIdleConnections) {
    this.poolMaximumIdleConnections = poolMaximumIdleConnections;
    forceCloseAll();
  }

  /*
   * The maximum number of tolerance for bad connection happens in one thread
   * which are applying for new {@link PooledConnection}
   *
   * @param poolMaximumLocalBadConnectionTolerance
   * max tolerance for bad connection happens in one thread
   *
   * @since 3.4.5
   */
  public void setPoolMaximumLocalBadConnectionTolerance(
          int poolMaximumLocalBadConnectionTolerance) {
    this.poolMaximumLocalBadConnectionTolerance = poolMaximumLocalBadConnectionTolerance;
  }

  /*
   * The maximum time a connection can be used before it *may* be
   * given away again.
   *
   * @param poolMaximumCheckoutTime The maximum time
   */
  public void setPoolMaximumCheckoutTime(int poolMaximumCheckoutTime) {
    this.poolMaximumCheckoutTime = poolMaximumCheckoutTime;
    forceCloseAll();
  }

  /*
   * The time to wait before retrying to get a connection
   *
   * @param poolTimeToWait The time to wait
   */
  public void setPoolTimeToWait(int poolTimeToWait) {
    this.poolTimeToWait = poolTimeToWait;
    forceCloseAll();
  }

  /*
   * The query to be used to check a connection
   *
   * @param poolPingQuery The query
   */
  public void setPoolPingQuery(String poolPingQuery) {
    this.poolPingQuery = poolPingQuery;
    forceCloseAll();
  }

  /*
   * Determines if the ping query should be used.
   *
   * @param poolPingEnabled True if we need to check a connection before using it
   */
  public void setPoolPingEnabled(boolean poolPingEnabled) {
    this.poolPingEnabled = poolPingEnabled;
    forceCloseAll();
  }

  /*
   * If a connection has not been used in this many milliseconds, ping the
   * database to make sure the connection is still good.
   *
   * @param milliseconds the number of milliseconds of inactivity that will trigger a ping
   */
  public void setPoolPingConnectionsNotUsedFor(int milliseconds) {
    this.poolPingConnectionsNotUsedFor = milliseconds;
    forceCloseAll();
  }

  public String getDriver() {
    return dataSource.getDriver();
  }

  public String getUrl() {
    return dataSource.getUrl();
  }

  public String getUsername() {
    return dataSource.getUsername();
  }

  public String getPassword() {
    return dataSource.getPassword();
  }

  public boolean isAutoCommit() {
    return dataSource.isAutoCommit();
  }

  public Integer getDefaultTransactionIsolationLevel() {
    return dataSource.getDefaultTransactionIsolationLevel();
  }

  public Properties getDriverProperties() {
    return dataSource.getDriverProperties();
  }

  public int getPoolMaximumActiveConnections() {
    return poolMaximumActiveConnections;
  }

  public int getPoolMaximumIdleConnections() {
    return poolMaximumIdleConnections;
  }

  public int getPoolMaximumLocalBadConnectionTolerance() {
    return poolMaximumLocalBadConnectionTolerance;
  }

  public int getPoolMaximumCheckoutTime() {
    return poolMaximumCheckoutTime;
  }

  public int getPoolTimeToWait() {
    return poolTimeToWait;
  }

  public String getPoolPingQuery() {
    return poolPingQuery;
  }

  public boolean isPoolPingEnabled() {
    return poolPingEnabled;
  }

  public int getPoolPingConnectionsNotUsedFor() {
    return poolPingConnectionsNotUsedFor;
  }

  /*
   * Closes all active and idle connections in the pool
   */
  public void forceCloseAll() {
    synchronized (state) {
      expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
      for (int i = state.activeConnections.size(); i > 0; i--) {
        try {
          PooledConnection conn = state.activeConnections.remove(i - 1);
          conn.invalidate();

          Connection realConn = conn.getRealConnection();
          if (!realConn.getAutoCommit()) {
            realConn.rollback();
          }
          realConn.close();
        } catch (Exception e) {
          // ignore
        }
      }
      for (int i = state.idleConnections.size(); i > 0; i--) {
        try {
          PooledConnection conn = state.idleConnections.remove(i - 1);
          conn.invalidate();

          Connection realConn = conn.getRealConnection();
          if (!realConn.getAutoCommit()) {
            realConn.rollback();
          }
          realConn.close();
        } catch (Exception e) {
          // ignore
        }
      }
    }
    if (log.isDebugEnabled()) {
      log.debug("PooledDataSource forcefully closed/removed all connections.");
    }
  }

  public PoolState getPoolState() {
    return state;
  }

  private int assembleConnectionTypeCode(String url, String username, String password) {
    return ("" + url + username + password).hashCode();
  }

  /**
   * @descrption
   * <p>
   *  创建新的空闲连接
   * </p>
   * @author Yiyuery
   * @date 2020-03-02 22:32
   * @params [conn]
   * @return void
   */
  protected void pushConnection(PooledConnection conn) throws SQLException {

    synchronized (state) {
      //1 从活跃集合中移除当前传入的连接对象
      state.activeConnections.remove(conn);
      //1.1 判断连接是否失效 连接对象引用丢失、数据源Ping不通网络异常
      if (conn.isValid()) {
        //1.1.1 判断当前失效连接总数是否小于最大连接数 && 连接目标地址的hashCode没变
        if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
          //1.1.1.1 记录累计连接释放的时间
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          //1.1.1.2 判断是否自动提交事务,不是的话,在使用链接对象前回滚之前的操作
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          /**
           * 复用{@link Connection}连接对象,并创建新的 {@link PooledConnection}连接对象
           */
          PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
          //1.1.1.3 添加新的连接对象到空闲队列
          state.idleConnections.add(newConn);
          //1.1.1.4 记录连接创建和更新时间
          newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
          newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
          //1.1.1.5 标记连接为失效状态,已处于空闲状态
          conn.invalidate();
          if (log.isDebugEnabled()) {
            log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
          }
          //1.1.1.6 调用锁对象的notifyAll方法,唤醒所有因获取state而阻塞的线程,再次争抢数据库连接对象
          state.notifyAll();
        } else {
          //1.1.2.1 如果最大连接数已满 || 连接对象的目标地址变更
          //1.1.2.2 记录累计释放连接时间
          state.accumulatedCheckoutTime += conn.getCheckoutTime();
          //1.1.2.3 判断是否自动提交事务,不是的话,在使用链接对象前回滚之前的操作
          if (!conn.getRealConnection().getAutoCommit()) {
            conn.getRealConnection().rollback();
          }
          //1.1.2.4 关闭实际数据库连接对象
          conn.getRealConnection().close();
          if (log.isDebugEnabled()) {
            log.debug("Closed connection " + conn.getRealHashCode() + ".");
          }
          //1.1.2.5 标记为失效
          conn.invalidate();
        }
      } else {
        //1.2 如果连接失效
        //1.2.1 连接失效打印错误日志,并记录异常连接数目
        if (log.isDebugEnabled()) {
          log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
        }
        //1.2.2 记录无法使用的连接数
        state.badConnectionCount++;
      }
    }
  }

  /**
   * @descrption
   * <p>
   *  获取数据库链接
   * </p>
   * @author Yiyuery
   * @date 2020-03-02 20:40
   * @params [username, password]
   * @return org.apache.ibatis.datasource.pooled.PooledConnection
   */
  private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;
    //循环以下逻辑,直到 conn 初始化成功
    while (conn == null) {
      //1. 获取锁,对于线程池的管理,需要进行线程安全控制
      synchronized (state) {
        //1.1 如果有空闲连接,获取并移除空闲链接,首位(下标为0)
        if (!state.idleConnections.isEmpty()) {
          // Pool has available connection
          conn = state.idleConnections.remove(0);
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {
          //1.2 没有空闲链接
          // 1.2.1 判断当前激活总数小于最大允许活跃连接数,创建新链接
          // Pool does not have available connection
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            // Can create new connection
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          } else {
            //1.2.2 有空闲连接的话,从激活的空闲链接中获取一个
            // Cannot create new connection
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            //1.2.2.1 检查最上面的链接活跃时间是否超出最大活跃时间(list add 操作, 下标越小的元素越早被加入连接)
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              // 1.2.2.1.1 是的话计数并记录时间,从活跃连接池中移出该连接
              // Can claim overdue connection
              state.claimedOverdueConnectionCount++;
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              state.accumulatedCheckoutTime += longestCheckoutTime;
              state.activeConnections.remove(oldestActiveConnection);
              // 1.2.2.1.2 从该超时限的连接中判断是否要自动提交事务,如果不是的话,回滚当前事务
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                try {
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  /*
                     Just log a message for debug and continue to execute the following
                     statement like nothing happend.
                     Wrap the bad connection with a new PooledConnection, this will help
                     to not intterupt current executing thread and give current thread a
                     chance to join the next competion for another valid/good database
                     connection. At the end of this loop, bad {@link @conn} will be set as null.
                   */
                  log.debug("Bad connection. Could not roll back");
                }
              }
              /**
               * 1.2.2.1.3 复用连接,并构造新的 {@link PooledConnection} 对象,更新创建、更新时间,将老的链接标记为失效状态
               */
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else {
              // 1.2.2.2 如果未超出最大活跃时间,则必须等待 默认 20 s
              // Must wait
              try {
                if (!countedWait) {
                  state.hadToWaitCount++;
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                state.wait(poolTimeToWait);
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        }
        if (conn != null) {
          // ping to server and check the connection is valid or not
          if (conn.isValid()) {
            if (!conn.getRealConnection().getAutoCommit()) {
              conn.getRealConnection().rollback();
            }
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            state.activeConnections.add(conn);
            state.requestCount++;
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            state.badConnectionCount++;
            localBadConnectionCount++;
            conn = null;
            if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }

    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }

    return conn;
  }

  /*
   * Method to check to see if a connection is still usable
   *
   * @param conn - the connection to check
   * @return True if the connection is still usable
   */
  protected boolean pingConnection(PooledConnection conn) {
    boolean result = true;

    try {
      result = !conn.getRealConnection().isClosed();
    } catch (SQLException e) {
      if (log.isDebugEnabled()) {
        log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
      }
      result = false;
    }

    if (result) {
      if (poolPingEnabled) {
        if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
          try {
            if (log.isDebugEnabled()) {
              log.debug("Testing connection " + conn.getRealHashCode() + " ...");
            }
            Connection realConn = conn.getRealConnection();
            Statement statement = realConn.createStatement();
            ResultSet rs = statement.executeQuery(poolPingQuery);
            rs.close();
            statement.close();
            if (!realConn.getAutoCommit()) {
              realConn.rollback();
            }
            result = true;
            if (log.isDebugEnabled()) {
              log.debug("Connection " + conn.getRealHashCode() + " is GOOD!");
            }
          } catch (Exception e) {
            log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());
            try {
              conn.getRealConnection().close();
            } catch (Exception e2) {
              //ignore
            }
            result = false;
            if (log.isDebugEnabled()) {
              log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
            }
          }
        }
      }
    }
    return result;
  }

  /*
   * Unwraps a pooled connection to get to the 'real' connection
   *
   * @param conn - the pooled connection to unwrap
   * @return The 'real' connection
   */
  public static Connection unwrapConnection(Connection conn) {
    if (Proxy.isProxyClass(conn.getClass())) {
      InvocationHandler handler = Proxy.getInvocationHandler(conn);
      if (handler instanceof PooledConnection) {
        return ((PooledConnection) handler).getRealConnection();
      }
    }
    return conn;
  }

  protected void finalize() throws Throwable {
    forceCloseAll();
    super.finalize();
  }

  public <T> T unwrap(Class<T> iface) throws SQLException {
    throw new SQLException(getClass().getName() + " is not a wrapper.");
  }

  public boolean isWrapperFor(Class<?> iface) throws SQLException {
    return false;
  }

  public Logger getParentLogger() {
    return Logger.getLogger(Logger.GLOBAL_LOGGER_NAME); // requires JDK version 1.6
  }

}
  • PoolState 池状态共享变量

该变量被当成操作空闲和活跃池的共享锁,在任一时刻,只能对往池中放入连接或取出连接。

(1)当连接池占满,调用锁的 wait 方法进行等待 (2)当连接释放或是超时时,创建新的连接或封装新的 Pooledconnection 对象放入空闲队列,并调用锁的 notifyAll 方法唤醒因获取连接而阻塞的线程。

代码语言:javascript
复制
public class PoolState {

  protected PooledDataSource dataSource;
  //可以看到此处是两个 ArrayList 分别存放活跃连接和空闲连接,而 ArrayList 的实现不是线程安全的!
  protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>();
  protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>();
  protected long requestCount = 0;
  protected long accumulatedRequestTime = 0;
  protected long accumulatedCheckoutTime = 0;
  protected long claimedOverdueConnectionCount = 0;
  protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
  protected long accumulatedWaitTime = 0;
  protected long hadToWaitCount = 0;
  protected long badConnectionCount = 0;

  //...

代码结构分析

.

  • UnpooledDataSource 未池化的数据库源对象,包含通过配置信息获取连接池的基础代码封装
代码语言:javascript
复制
  private Connection doGetConnection(Properties properties) throws SQLException {
    //初始化驱动
    initializeDriver();
    //获取数据库连接
    Connection connection = DriverManager.getConnection(url, properties);
    //配置数据库连接(自动提交事务、隔离级别)
    configureConnection(connection);
    return connection;
  }
  • PooledDataSource 基于基础数据源的支持连接池管理功能的数据源
代码语言:javascript
复制
 public PooledDataSource() {
    dataSource = new UnpooledDataSource();
  }

通过共享变量 PoolState state 进行连接池获取数据源连接的线程安全操作

  • PooledConnection 池中的连接对象
代码语言:javascript
复制
public PooledConnection(Connection connection, PooledDataSource dataSource) {
    this.hashCode = connection.hashCode();
    this.realConnection = connection;
    this.dataSource = dataSource;
    this.createdTimestamp = System.currentTimeMillis();
    this.lastUsedTimestamp = System.currentTimeMillis();
    this.valid = true;
    this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
  }

对真实数据源连接对象进行封装,本身实现了InvocationHandler接口,可以通过代理类实现 Connection的实例,并完成获取连接操作的增强。

代码语言:javascript
复制
class PooledConnection implements InvocationHandler {

  private static final String CLOSE = "close";
  private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };

  private final int hashCode;
  private final PooledDataSource dataSource;
  //真实传入的连接对象
  private final Connection realConnection;
  //连接的代理对象,对外暴露该对象,并在所有的 Connection 定义的方法调用前,进行逻辑增强,判断是否是失效链接,是的话直接抛出异常
  private final Connection proxyConnection;
  private long checkoutTimestamp;
  private long createdTimestamp;
  private long lastUsedTimestamp;
  private int connectionTypeCode;
  private boolean valid;
  //...

增强逻辑通过实现接口中 invoke 方式实现

代码语言:javascript
复制
  /*
   * Required for InvocationHandler implementation.
   *
   * @param proxy  - not used
   * @param method - the method to be executed
   * @param args   - the parameters to be passed to the method
   * @see java.lang.reflect.InvocationHandler#invoke(Object, java.lang.reflect.Method, Object[])
   */
  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
      dataSource.pushConnection(this);
      return null;
    } else {
      try {
        //设置连接前判断是否失效
        if (!Object.class.equals(method.getDeclaringClass())) {
          // issue #579 toString() should never fail
          // throw an SQLException instead of a Runtime
          checkConnection();
        }
        return method.invoke(realConnection, args);
      } catch (Throwable t) {
        throw ExceptionUtil.unwrapThrowable(t);
      }
    }
  }

proxyConnectionrealConnection连接的代理对象,对外暴露该对象,并在所有的 Connection 定义的方法调用前,进行逻辑增强,判断是否是失效链接,是的话直接抛出异常。

  • 抽象工厂接口 DataSourceFactory用于创建数据源实例的创建(支持链接池的的和默认的数据源)
代码语言:javascript
复制
public interface DataSourceFactory {

  void setProperties(Properties props);

  DataSource getDataSource();

}

实现该接口的工厂也有两个:UnpooledDataSourceFactory,PooledDataSourceFactory,还有一个 JNDI 的数据源创建工厂类 JndiDataSourceFactory

其中,由于支持链接池的工厂和默认的数据源获取工厂,只是对应的数据源不同,其余是一样的,此处是通过继承的方式实现两个工厂类的。

代码语言:javascript
复制
public class UnpooledDataSourceFactory implements DataSourceFactory {

  private static final String DRIVER_PROPERTY_PREFIX = "driver.";
  private static final int DRIVER_PROPERTY_PREFIX_LENGTH = DRIVER_PROPERTY_PREFIX.length();

  protected DataSource dataSource;

  public UnpooledDataSourceFactory() {
    this.dataSource = new UnpooledDataSource();
  }

//...

public class PooledDataSourceFactory extends UnpooledDataSourceFactory {

  public PooledDataSourceFactory() {
    this.dataSource = new PooledDataSource();
  }

}

设计模式分析

MyBatis中数据源模块,通过工厂模式实现了数据源的创建。

什么是工厂模式?

设计模式-工厂模式

数据源对象是比较复杂的对象,其创建过程相对比较复杂,对于 MyBatis 创建一个数据源, 具体来讲有如下难点:

  1. 常见的数据源组件都实现了 javax.sql.DataSource 接口;
  2. MyBatis 不但要能集成第三方的数据源组件,自身也提供了数据源的实现;
  3. 一般情况下,数据源的初始化过程参数较多,比较复杂;综上所述,数据源的创建是一个典型使用工厂模式的场景,实现类图如前文所示:
  • DataSource:数据源接口,JDBC 标准规范之一,定义了获取获取 Connection 的方法;
  • UnPooledDataSource:不带连接池的数据源,获取连接的方式和手动通过 JDBC 获取连 接的方式是一样的;
  • PooledDataSource:带连接池的数据源,提高连接资源的复用性,避免频繁创建、关闭 连接资源带来的开销;
  • DataSourceFactory:工厂接口,定义了创建 Datasource 的方法;
  • UnpooledDataSourceFactory:工厂接口的实现类之一,用于创建 UnpooledDataSource(不带连接池的数据源);
  • PooledDataSourceFactory:工厂接口的实现类之一,用于创建 PooledDataSource(带连 接池的数据源);

为什么要使用工厂模式?

答:对象可以通过 new 关键字、反射、clone 等方式创建,也可以通过工厂模式创建。对于 复杂对象,使用 new 关键字、反射、clone 等方式创建存在如下缺点:

  • 对象创建和对象使用的职责耦合在一起,违反单一原则;
  • 当业务扩展时,必须修改代业务代码,违反了开闭原则;而使用工厂模式将对象的创建和使用进行解耦,并屏蔽了创建对象可能的复杂过程,相对简单工厂模式,又具备更好的扩展性和可维护性,优点具体如下:
  • 把对象的创建和使用的过程分开,对象创建和对象使用使用的职责解耦;
  • 如果创建对象的过程很复杂,创建过程统一到工厂里管理,既减少了重复代码,也方便 以后对创建过程的修改维护;
  • 当业务扩展时,只需要增加工厂子类,符合开闭原则;

总结

本文就 MyBatis 数据源模块的实现原理进行了源码层面的分析,并以此点出工厂模式的使用场景和好处。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-03-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 架构探险之道 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • [MyBatis] MyBatis 数据源模块实现原理剖析
    • 数据库连接池管理
      • 设计模式分析
        • 总结
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档