前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >4. 数据源模块

4. 数据源模块

作者头像
张申傲
发布2023-10-12 09:04:24
1580
发布2023-10-12 09:04:24
举报
文章被收录于专栏:漫漫架构路漫漫架构路

池化技术

在日常开发中,我们经常会接触到池化技术,这是一种非常经典的设计思想。简单来说,池化技术指的是:将一些创建过程较为繁琐的重量级对象,统一维护在一个对象池中进行管理,每次使用对象时都从池中获取,使用完成后再归还给对象池进行回收。

使用池化技术有以下几点优势:

  1. 避免了每次频繁创建和销毁对象所引入的开销,可以有效提升系统性能。
  2. 通过对象池,可以方便地控制对象的生命周期、存活时间,并限制池容量上限等等,增强了系统的灵活性。
  3. 对象在池中统一管理,而不是碎片化地分散在各处,提高了系统的可维护性。

日常我们经常接触到的池化技术包括:

  • 数据库连接池
  • 线程池
  • 缓存对象池

Factory Method Pattern 工厂方法模式

池化技术常常和工厂模式(简单工厂/工厂方法)结合在一起使用,工厂负责对象的创建,而池负责对象的统一管理。 MyBatis 也不例外,它内部就是采用了工厂方法模式+池化技术,来实现了一个轻量级的数据库连接池。 按照惯例,我们先来回顾下工程方法设计模式:

Factory Method Pattern
Factory Method Pattern

(图片来源:https://refactoring.guru/design-patterns/factory-method)

工厂方法模式中主要包括以下核心角色:

  • Product :产品接口,定义了所有产品对象的公共行为。
  • ConcreteProduct:具体的产品实例,实现了 Product 接口,也是工厂最终要创建的产物。
  • Creator:工厂接口,定义了创建对象的工厂方法
  • ConcreteCreator:具体的工厂实例,实现了各自的工厂方法,用于创建对应的产品实例。

在 MyBatis 的数据源模块中,定义了两类数据源产品,分别是:PooledDataSource(池化数据源)与 UnpooledDataSource(非池化数据源),并为其配备了各自的工厂 PooledDataSourceFactoryUnpooledDataSourceFactory。具体的类结构如下:

数据库连接池类结构
数据库连接池类结构

其中 UnpooledDataSource 很简单,我们快速过一下代码:

代码语言:javascript
复制
/**
 * 非池化的数据源
 */
public class UnpooledDataSource implements DataSource {

  private ClassLoader driverClassLoader;
  private Properties driverProperties;
  private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();

  private String driver;
  private String url;
  private String username;
  private String password;

  private Boolean autoCommit;
  private Integer defaultTransactionIsolationLevel;
  private Integer defaultNetworkTimeout;

  //注册数据库驱动
  static {
    Enumeration<Driver> drivers = DriverManager.getDrivers();
    while (drivers.hasMoreElements()) {
      Driver driver = drivers.nextElement();
      registeredDrivers.put(driver.getClass().getName(), driver);
    }
  }


  //创建数据库连接
  private Connection doGetConnection(String username, String password) throws SQLException {
    //加载配置
    Properties props = new Properties();
    if (driverProperties != null) {
      props.putAll(driverProperties);
    }
    if (username != null) {
      props.setProperty("user", username);
    }
    if (password != null) {
      props.setProperty("password", password);
    }

    //创建连接
    return doGetConnection(props);
  }

  private Connection doGetConnection(Properties properties) throws SQLException {
    //初始化数据库驱动
    initializeDriver();

    //创建数据库连接
    Connection connection = DriverManager.getConnection(url, properties);

    //连接配置
    configureConnection(connection);
    return connection;
  }

  private synchronized void initializeDriver() throws SQLException {
    //注册数据库驱动
    if (!registeredDrivers.containsKey(driver)) {
      Class<?> driverType;
      try {
        if (driverClassLoader != null) {
          driverType = Class.forName(driver, true, driverClassLoader);
        } else {
          driverType = Resources.classForName(driver);
        }
        Driver driverInstance = (Driver) driverType.getDeclaredConstructor().newInstance();
        DriverManager.registerDriver(new DriverProxy(driverInstance));
        registeredDrivers.put(driver, driverInstance);
      } catch (Exception e) {
        throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);
      }
    }
  }

  //连接配置
  private void configureConnection(Connection conn) throws SQLException {
    //设置超时时间
    if (defaultNetworkTimeout != null) {
      conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), defaultNetworkTimeout);
    }

    //设置事务自动提交
    if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
      conn.setAutoCommit(autoCommit);
    }

    //设置事务隔离级别
    if (defaultTransactionIsolationLevel != null) {
      conn.setTransactionIsolation(defaultTransactionIsolationLevel);
    }
  }
}
//...省略非必要代码

可以看到 UnpooledDataSource 就是对原生 JDBC 的 Connection 对象进行了简单的封装,每次都是创建一个新的数据库连接,没有新增额外的功能。

下面我们重点分析下 PooledDataSource 数据库连接池。

MyBatis 数据库连接池

要获取 PooledDataSource 对象需要通过工厂,首先来看一下 PooledDataSourceFactory 连接池工厂:

代码语言:javascript
复制
/**
 * 数据库连接池工厂
 */
public class PooledDataSourceFactory extends UnpooledDataSourceFactory {

  public PooledDataSourceFactory() {
    //用于创建PooledDataSource
    this.dataSource = new PooledDataSource();
  }
}

这里竟然什么都没做,只是在构造器中创建了一个 PooledDataSource 对象。因此我们可以猜到,精髓都在 PooledDataSource 中,接下来就重点分析下它。

PooledDataSource 是 MyBatis 自行实现的一个轻量级的数据库连接池,实现了连接的复用与管理,内部的一些设计还是比较巧妙的。它主要包含以下核心组件:

PooledConnection类结构
PooledConnection类结构

PooledConnection

PooledConnection 是对 JDBC Connection 的动态代理,主要对 java.sql.Connection#close() 方法进行了拦截,它并不会真正去释放数据库连接,而是将连接回收到池中,并修改状态,以便于后续的复用。

代码语言:javascript
复制
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  //拦截close方法,对连接进行回收复用,并不会真正地关闭
  String methodName = method.getName();
  if (CLOSE.equals(methodName)) {
    dataSource.pushConnection(this);
    return null;
  }
  try {
    if (!Object.class.equals(method.getDeclaringClass())) {
      //连接回收后,修改连接状态
      checkConnection();
    }
    return method.invoke(realConnection, args);
  } catch (Throwable t) {
    throw ExceptionUtil.unwrapThrowable(t);
  }
//...省略非必要代码
}

PoolState

PoolState 是一个连接池管理器,它负责管理池中所有连接的生命周期,以及状态的迁移。

代码语言:javascript
复制
//数据源实例
protected PooledDataSource dataSource;

//空闲连接队列
protected final List<PooledConnection> idleConnections = new ArrayList<>();

//活跃连接队列
protected final List<PooledConnection> activeConnections = new ArrayList<>();

//各种统计信息
protected long requestCount = 0;
protected long accumulatedRequestTime = 0;
protected long accumulatedCheckoutTime = 0;
protected long claimedOverdueConnectionCount = 0;
protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
protected long accumulatedWaitTime = 0;
protected long hadToWaitCount = 0;
protected long badConnectionCount = 0;

//...省略非必要代码

可以看出,PoolState 本质上就是一个大的连接集合,并额外附带了一些统计信息。

PooledDataSource

PooledDataSource:即实际操作数据库的连接对象。

最后结合具体的代码与流程图,详细介绍下数据库连接建立与回收的过程。

获取连接过程

代码语言:javascript
复制
private PooledConnection popConnection(String username, String password) throws SQLException {
    boolean countedWait = false;
    PooledConnection conn = null;
    long t = System.currentTimeMillis();
    int localBadConnectionCount = 0;

    while (conn == null) {
      //同步控制
      synchronized (state) {
        //1. 首先尝试从空闲队列中获取
        if (!state.idleConnections.isEmpty()) {
          // Pool has available connection
          conn = state.idleConnections.remove(0);
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        }

        //2. 如果没有空闲连接,但活跃连接数没有达到上限,则创建新连接
        else {
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }
          }

          //3. 如果活跃连接数已达上限,则检查最老的活跃连接是否已超时。如果已超时,则剔除最老的连接,并创建新连接
          else {
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              state.claimedOverdueConnectionCount++;
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              state.accumulatedCheckoutTime += longestCheckoutTime;
              state.activeConnections.remove(oldestActiveConnection);

              //剔除超时连接前,首先尝试对事务进行回滚
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                try {
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  log.debug("Bad connection. Could not roll back");
                }
              }
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            }
            //4. 如果没有超时的活跃连接,阻塞等待
            else {
              try {
                if (!countedWait) {
                  state.hadToWaitCount++;
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                state.wait(poolTimeToWait);
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                break;
              }
            }
          }
        }
        if (conn != null) {
          //5.校验连接状态
          if (conn.isValid()) {
            if (!conn.getRealConnection().getAutoCommit()) {
              conn.getRealConnection().rollback();
            }
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            state.activeConnections.add(conn);
            state.requestCount++;
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            state.badConnectionCount++;
            localBadConnectionCount++;
            conn = null;
            if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }

    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }

    return conn;
  }
//...省略非必要代码
获取连接过程
获取连接过程

回收连接过程

代码语言:javascript
复制
//回收数据库连接
protected void pushConnection(PooledConnection conn) throws SQLException {
  //同步控制
  synchronized (state) {
    //1. 将连接从活跃队列中移除
    state.activeConnections.remove(conn);

    //2. 如果连接仍然有效,且空闲队列未满,则复用底层的连接,并创建空闲连接,放入空闲队列中
    if (conn.isValid()) {
      if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
        state.accumulatedCheckoutTime += conn.getCheckoutTime();
        if (!conn.getRealConnection().getAutoCommit()) {
          conn.getRealConnection().rollback();
        }
        PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
        state.idleConnections.add(newConn);
        newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
        newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
        conn.invalidate();
        if (log.isDebugEnabled()) {
          log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
        }
        state.notifyAll(); //唤醒等待连接的线程
      }
      //3. 如果空闲队列已满,直接关闭底层连接,并将当前连接失效
      else {
        state.accumulatedCheckoutTime += conn.getCheckoutTime();
        if (!conn.getRealConnection().getAutoCommit()) {
          conn.getRealConnection().rollback();
        }
        conn.getRealConnection().close();
        if (log.isDebugEnabled()) {
          log.debug("Closed connection " + conn.getRealHashCode() + ".");
        }
        conn.invalidate();
      }
    }
    //4. 如果当前连接已失效,则无需任何处理
    else {
      if (log.isDebugEnabled()) {
        log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
      }
      state.badConnectionCount++;
    }
  }
}
//...省略非必要代码
回收连接过程
回收连接过程

小结

本篇详细介绍了 MyBatis 数据库连接池的底层实现。池化是我们日常开发中经常会使用到的技术,本质上体现了对资源的使用和回收进行优化的思想。MyBatis 基于池化技术和工厂方法设计模式,实现了一款轻量级的连接池,它的功能虽然简单,但是已经涵盖了连接池最核心的功能。目前市面上主流的连接池产品,如 DruidHikariCP 等,底层也是采用了类似的实现,不过是对性能进行了一系列的优化,并且扩展了如监控、统计等额外的功能。掌握了 MyBatis 连接池,其它类似的产品万变不离其宗。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-07-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 池化技术
  • Factory Method Pattern 工厂方法模式
  • MyBatis 数据库连接池
    • PooledConnection
    • PoolState
      • PooledDataSource
        • 获取连接过程
          • 回收连接过程
          • 小结
          相关产品与服务
          数据库
          云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档