前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python 数据库连接池 DBUtils 源码解析

python 数据库连接池 DBUtils 源码解析

作者头像
用户3147702
发布2022-06-27 13:38:02
2.6K0
发布2022-06-27 13:38:02
举报
文章被收录于专栏:小脑斧科技博客

1. 引言

一说到数据库连接池,java 中有很多选择,C3P0、DBCP、Proxool、Tomcat-JDBC、druid 等等等等,五花八门,有着多种多样的特性,可是在 python 中,选择就没有那么多了。 主页君了解到的开源可靠的 python 数据库连接池只有 DBUtils。 DBUtils 作为一个通用数据库连接池,实现非常简洁,功能比较完善,本文我们就来析精剖微,深入源码,详细看看 DBUtils 是如何实现的。

2. DBUtils

正如上文所说,DBUtils 是一个开源的 python 通用数据库连接池,它包含两个模块子集,分别基于 DB-API2 与 PyGreSQL 实现。

基于 DB-API2 的组件

文件

描述

SteadyDB.py

稳定的连接、游标、异常等的实现

PooledDB.py

连接池、池化连接的相关实现

PersistentDB.py

与线程绑定的持久连接

SimplePooledDB.py

简单实现的用于原理展示的连接池

基于 PyGreSQL 的组件

文件

描述

SteadyPG.py

稳定的连接、游标、异常等的实现

PooledPG.py

连接池、池化连接的相关实现

PersistentPG.py

与线程绑定的持久连接

SimplePooledPG.py

简单实现的用于原理展示的连接池

本文我们将详细介绍 PooledDB 中 mysql 连接池的实现。

2.1. 相关地址

  • github 地址是:https://github.com/Cito/DBUtils
  • 官网地址:https://cito.github.io/DBUtils/
  • 官方文档:https://cito.github.io/DBUtils/UsersGuide.html

2.2. 安装

可以通过命令安装:

pip install DBUtils==1.3

3. DB-API2

上面我们提到了 DB-API2,这是 python 制定的一个数据库接口规范: https://www.python.org/dev/peps/pep-0249/ 他定义了数据库操作的一系列接口与规范,正是有这套规范的存在,才让数据库连接池可以通用的为多个不同的数据库提供服务,只要传入的数据库具体操作类符合该规范即可。

4. PooledDB

DBUtils 中的 PooledDB 就是数据库连接池的具体实现。

上图反映了连接池的架构:

  1. 线程之间共享了连接池中的多个连接
  2. 每个连接都来自于 SteadyDB 中的实现
  3. 而 SteadyDB 中连接的实现是基于具体的 DB-API2 实现的操作工具中的连接

5. 连接池的构成

PooledDB.py 包含了连接池需要使用的全部类:

5.1. 异常类

连接池中共封装了下面几种异常:

  1. PooledDBError — 通用 DB 连接池异常,其他所有具体连接池异常均派生自该异常类
  2. InvalidConnection — 当调用了连接类中不存在的方法时,就会抛出该异常
  3. NotSupportedError — 连接池不支持该操作,例如使用非线程安全的 DB 操作模块却向 DB 连接池传入线程安全参数等
  4. TooManyConnections — 连接池中连接数量过多

5.2. 连接池与连接

5.2.1. 连接池 — PooledDB

连接池是一个典型的工厂方法模式,他负责创建连接,同时,连接池类是连接的容器,因此他需要我们最先创建创建和调用,他负责维护线程池参数并基于我们的参数生产实际的连接供我们使用,并维护这些连接。 在 PooledDB 中这个类就名为 PooledDB,他包含了获取连接、释放连接、共享或取消共享连接等一系列方法。 这个类最大的贡献在于向我们隐藏具体的连接创建与维护的细节,让我们能够尽量简单的去实现与数据库的连接、调用工作。

5.2.2. 连接类

PooledDB.py 中封装了两类连接类:

  1. PooledDedicatedDBConnection
  2. PooledSharedDBConnection

顾名思义,他们分别实现了独立连接与线程间可共享连接,他们都需要使用一个连接作为参数来构造。 对于线程间不可共享的 PooledDedicatedDBConnection 连接类,他使用最基本的数据库连接作为参数来构造。 而对于线程间共享的 PooledSharedDBConnection 连接类,则需要使用 SharedDBConnection 连接类来构造。 连接类是典型的装饰模式,他最重要的职责是完成原生数据库操作包中连接对象的所有工作,同时对他们进行增强,以便添加额外的管理功能。

6. 连接池与线程安全

在 DB-API2 规范中,要求所有的数据库操作组件都拥有一个字段 threadsafety 用来标识自己的线程安全级别:

DB-API2 规范中 threadsafety 字段取值

threadsafety

说明

模块不能在线程间共享

1

模块可以在线程间共享,但连接不能

2

模块和连接均可以在线程间共享

3

模块、连接和游标均可以在线程间共享

7. 创建连接池

使用 PooledDB 连接池,我们首先要创建一个连接池对象。

def __init__( self, creator, mincached=0, maxcached=0, maxshared =0, maxconnections=0, blocking=False, maxusage=None, setsession=None, reset=True, failures=None, ping=1, args, *kwargs)

7.1. 参数介绍

  • creator — 所使用的数据库操作包
  • mincached — 最小缓存连接数,为 0 或 None 则不限制
  • maxcached — 最大缓存连接数,为 0 或 None 则不限制
  • maxshared — 最大共享连接数,为 0 或 None 则不在内存间共享连接
  • maxconnections — 最大连接数,为 0 或 None 则不限制
  • blocking — 获取锁失败是否阻塞等待
  • maxusage — 单个连接最多使用次数,为 0 或 None 则不限制,一旦某个连接使用次数达到该值,则会自动被关闭并重新打开
  • setsession — 连接建立后立即执行的指令列表
  • reset — 当连接放回连接池时,是否每次都调用 rollback 以保证事务终止,为 False 或 None 则不额外调用 rollback 方法
  • failures — 连接发生异常时抛出的异常类或异常类列表,如果为 None 则抛出 OperationalError 或 InternalError
  • ping — 何时测试连接并尝试重新连接,具体取值见下表
PooledDB 构造方法 ping 参数取值

ping

说明

从不测试连接

1

默认取值,当连接从连接池中取出时测试连接并在已断开的情况下尝试重连

2

当创建游标时测试连接并在已断开的情况下尝试重连

4

当 sql 执行时测试连接并在已断开的情况下尝试重连

7

上述所有情况下都测试连接并在已断开的情况下尝试重连

7.2. 执行流程

整个创建流程就是做一些基本的参数校验与初始化工作,比较复杂的在于最后根据 mincached 参数初始化 _idle_cache 缓存,先通过列表生成器创建 mincached 个线程间独立连接,然后调用连接的 close 方法将他们标记为空闲,并放回连接池中,关于连接的 close 操作,我们下文进行详细分析。

8. 创建连接

connection(self, shareable=True)

连接池首先是一个连接创建工厂,因此,连接创建方法是连接池最重要的方法了,他返回一个可用连接,参数 shareable 用来表示是否需要线程间可共享的连接。

8.1. 执行流程

上图中列出了创建连接的详细流程,主要分为两个部分:创建线程独立连接与线程间共享连接。 PooledDB 中有两个 list 用来实现缓存:

  1. _idle_cache — 空闲连接缓存
  2. _shared_cache — 被线程间共享的连接

这个过程也相对简单,就是如果缓存中存在空闲连接则直接从缓存中获取,否则创建连接。 而为了保证线程安全性,整个过程加了 Condition 锁,连接池的构造参数 blocking 就是用来决定在此时一旦获取锁失败是否阻塞等待的。

9. 连接的使用

9.1. 构造方法

def __init__(self, pool, con)

PooledDedicatedDBConnection 与 PooledSharedDBConnection 两个连接类的构造方法都需要传入连接池对象与一个 DB 操作包原生的连接对象。 而整个构造方法所做的,就是将他们分别用类成员存储起来,以便后续使用。 连接池对象用于连接对象的 close 方法被调用时,回调连接池的 cache 方法,将连接放回连接池,而原生连接对象则用来执行所有该原生对象所支持的操作。

9.2. 连接的基本方法

既然创建了连接,我们当然首先希望通过他去调用那些 DB 操作包中原生的连接对象所支持的所有操作,如通过 cursor 方法获取游标,再通过游标进行具体的 execute、fetch 等操作。 封装后的连接对象凭借其类成员中所持有的连接对象就可以轻易实现这样的需求:

代码语言:javascript
复制
def __getattr__(self, name):
    if self._con:        return getattr(self._con, name)    else:        raise InvalidConnection

我们曾经详细介绍过魔术方法 __getattr__ 的用法: python 魔术方法(一) 自定义容器类与类属性控制

9.3. 连接的关闭 — close

当我们企图关闭一个连接时,需要调用连接对象的 close 方法:

代码语言:javascript
复制
def close(self):
    if self._con:
        self._pool.cache(self._con)
        self._con = None

这里可以看到,连接调用了连接池的 cache 方法,由连接池决定究竟是将这个连接放入空闲连接缓存中还是丢弃掉这个连接。 通过 del 关键字删除连接也是相同的效果。

10. 基于连接池 DBUtils 封装单例数据库工具类

我们看到,整个连接池的源码非常简单,同时连接池工具也十分易用,但我们使用的时候,需要先创建连接池,再获取连接,再通过连接获取游标,再通过游标执行 execute、fetch 等操作来执行具体的 sql,整个过程仍然非常繁琐。 同时,我们还是必须在每次调用后立即执行连接的 close 方法将连接放回连接池,否则连接池中的连接很快就会被耗光而无法继续工作。 主页君自行封装了一个单例的工具类,供大家参考:

代码语言:javascript
复制
import osimport pymysqlimport yamlfrom DBUtils.PooledDB import PooledDBfrom django.conf import settingsfrom cupthree.decorator.singleton import Singleton@Singletonclass DBHelper:
    def __new__(cls, env=None, data_source='mysql'):
        filename = os.path.join(os.path.dirname(__file__), '../config', 'config.yml')
        yamlfd = open(filename, encoding="utf-8")
        config = yaml.load(yamlfd)

        obj = object.__new__(cls)        if env is None:
            obj._env = settings.ENV        else:
            obj._env = env

        obj._pool = PooledDB(
            creator=pymysql,
            maxconnections=config['dbpool']['maxactive'],
            mincached=config['dbpool']['minidle'],
            maxcached=config['dbpool']['maxidle'],
            host=config[data_source][obj._env]['host'],
            port=config[data_source][obj._env]['port'],
            user=config[data_source][obj._env]['username'],
            database=config[data_source][obj._env]['database'],
            password=config[data_source][obj._env]['password'],
            charset=config[data_source][obj._env]['charset'],
            blocking=False,
            autocommit=bool(config['dbpool']['autocommit'] != 0)
        )        return obj    def selectall(self, sql, values=()):
        connection = self._pool.connection()
        cursor = connection.cursor(pymysql.cursors.DictCursor)        try:
            cursor.execute(sql, values)            return cursor.fetchall()        finally:
            connection.close()
            cursor.close()    def selectone(self, sql, values=()):
        connection = self._pool.connection()
        cursor = connection.cursor(pymysql.cursors.DictCursor)        try:
            cursor.execute(sql, values)            return cursor.fetchone()        finally:
            connection.close()
            cursor.close()    def insert(self, sql, values=(), commit=False):
        connection = self._pool.connection()
        cursor = connection.cursor(pymysql.cursors.DictCursor)        try:
            cursor.execute(sql, values)
            lastid = cursor.lastrowid            if commit:
                connection.commit()            return lastid        except Exception as e:            if commit:
                connection.rollback()            raise e        finally:
            connection.close()
            cursor.close()    def insertmany(self, sql, values=(), commit=False):
        connection = self._pool.connection()
        cursor = connection.cursor(pymysql.cursors.DictCursor)        try:
            cursor.executemany(sql, values)
            rowcount = cursor.rowcount            if commit:
                connection.commit()            return rowcount        except Exception as e:            if commit:
                connection.rollback()            raise e        finally:
            connection.close()
            cursor.close()    def execute(self, sql, values=(), commit=False):
        connection = self._pool.connection()
        cursor = connection.cursor(pymysql.cursors.DictCursor)        try:
            rowcount = cursor.rowcount            if commit:
                connection.commit()            return rowcount        except Exception as e:            if commit:
                connection.rollback()            raise e        finally:
            connection.close()
            cursor.close()

这样,当需要执行 sql 时只需要执行下面语句即可:

代码语言:javascript
复制
DBHelper().selectall("select * from table where id in (%s, %s)", [10086, 10010])

再也不用担心忘记 close 游标与连接了。

11. 附录 — PooledDB.py 源码

代码语言:javascript
复制
"""PooledDB - pooling for DB-API 2 connections.

Implements a pool of steady, thread-safe cached connections
to a database which are transparently reused,
using an arbitrary DB-API 2 compliant database interface module.

This should result in a speedup for persistent applications such as the
application server of "Webware for Python," without loss of robustness.

Robustness is provided by using "hardened" SteadyDB connections.
Even if the underlying database is restarted and all connections
are lost, they will be automatically and transparently reopened.
However, since you don't want this to happen in the middle of a database
transaction, you must explicitly start transactions with the begin()
method so that SteadyDB knows that the underlying connection shall not
be replaced and errors passed on until the transaction is completed.

Measures are taken to make the pool of connections thread-safe.
If the underlying DB-API module is thread-safe at the connection level,
the requested connections may be shared with other threads by default,
but you can also request dedicated connections in case you need them.

For the Python DB-API 2 specification, see:
    https://www.python.org/dev/peps/pep-0249/
For information on Webware for Python, see:
    https://cito.github.io/w4py/

Usage:

First you need to set up the database connection pool by creating
an instance of PooledDB, passing the following parameters:

    creator: either an arbitrary function returning new DB-API 2
        connection objects or a DB-API 2 compliant database module
    mincached: the initial number of idle connections in the pool
        (the default of 0 means no connections are made at startup)
    maxcached: the maximum number of idle connections in the pool
        (the default value of 0 or None means unlimited pool size)
    maxshared: maximum number of shared connections allowed
        (the default value of 0 or None means all connections are dedicated)
        When this maximum number is reached, connections are
        shared if they have been requested as shareable.
    maxconnections: maximum number of connections generally allowed
        (the default value of 0 or None means any number of connections)
    blocking: determines behavior when exceeding the maximum
        (if this is set to true, block and wait until the number of
        connections decreases, but by default an error will be reported)
    maxusage: maximum number of reuses of a single connection
        (the default of 0 or None means unlimited reuse)
        When this maximum usage number of the connection is reached,
        the connection is automatically reset (closed and reopened).
    setsession: an optional list of SQL commands that may serve to
        prepare the session, e.g. ["set datestyle to german", ...]
    reset: how connections should be reset when returned to the pool
        (False or None to rollback transcations started with begin(),
        the default value True always issues a rollback for safety's sake)
    failures: an optional exception class or a tuple of exception classes
        for which the connection failover mechanism shall be applied,
        if the default (OperationalError, InternalError) is not adequate
    ping: an optional flag controlling when connections are checked
        with the ping() method if such a method is available
        (0 = None = never, 1 = default = whenever fetched from the pool,
        2 = when a cursor is created, 4 = when a query is executed,
        7 = always, and all other bit combinations of these values)

    The creator function or the connect function of the DB-API 2 compliant
    database module specified as the creator will receive any additional
    parameters such as the host, database, user, password etc.  You may
    choose some or all of these parameters in your own creator function,
    allowing for sophisticated failover and load-balancing mechanisms.

For instance, if you are using pgdb as your DB-API 2 database module and
want a pool of at least five connections to your local database 'mydb':

    import pgdb  # import used DB-API 2 module
    from DBUtils.PooledDB import PooledDB
    pool = PooledDB(pgdb, 5, database='mydb')

Once you have set up the connection pool you can request
database connections from that pool:

    db = pool.connection()

You can use these connections just as if they were ordinary
DB-API 2 connections.  Actually what you get is the hardened
SteadyDB version of the underlying DB-API 2 connection.

Please note that the connection may be shared with other threads
by default if you set a non-zero maxshared parameter and the DB-API 2
module allows this.  If you want to have a dedicated connection, use:

    db = pool.connection(shareable=False)

You can also use this to get a dedicated connection:

    db = pool.dedicated_connection()

If you don't need it any more, you should immediately return it to the
pool with db.close().  You can get another connection in the same way.

Warning: In a threaded environment, never do the following:

    pool.connection().cursor().execute(...)

This would release the connection too early for reuse which may be
fatal if the connections are not thread-safe.  Make sure that the
connection object stays alive as long as you are using it, like that:

    db = pool.connection()
    cur = db.cursor()
    cur.execute(...)
    res = cur.fetchone()
    cur.close()  # or del cur
    db.close()  # or del db

Note that you need to explicitly start transactions by calling the
begin() method.  This ensures that the connection will not be shared
with other threads, that the transparent reopening will be suspended
until the end of the transaction, and that the connection will be rolled
back before being given back to the connection pool.

Ideas for improvement:

* Add a thread for monitoring, restarting (or closing) bad or expired
  connections (similar to DBConnectionPool/ResourcePool by Warren Smith).
* Optionally log usage, bad connections and exceeding of limits.

Copyright, credits and license:

* Contributed as supplement for Webware for Python and PyGreSQL
  by Christoph Zwerschke in September 2005
* Based on the code of DBPool, contributed to Webware for Python
  by Dan Green in December 2000

Licensed under the MIT license.

"""from threading import Conditionfrom DBUtils.SteadyDB import connect

__version__ = '1.3'class PooledDBError(Exception):
    """General PooledDB error."""class InvalidConnection(PooledDBError):
    """Database connection is invalid."""class NotSupportedError(PooledDBError):
    """DB-API module not supported by PooledDB."""class TooManyConnections(PooledDBError):
    """Too many database connections were opened."""class PooledDB:
    """Pool for DB-API 2 connections.

    After you have created the connection pool, you can use
    connection() to get pooled, steady DB-API 2 connections.

    """

    version = __version__    def __init__(
            self, creator, mincached=0, maxcached=0,
            maxshared=0, maxconnections=0, blocking=False,
            maxusage=None, setsession=None, reset=True,
            failures=None, ping=1,
            *args, **kwargs):
        """Set up the DB-API 2 connection pool.

        creator: either an arbitrary function returning new DB-API 2
            connection objects or a DB-API 2 compliant database module
        mincached: initial number of idle connections in the pool
            (0 means no connections are made at startup)
        maxcached: maximum number of idle connections in the pool
            (0 or None means unlimited pool size)
        maxshared: maximum number of shared connections
            (0 or None means all connections are dedicated)
            When this maximum number is reached, connections are
            shared if they have been requested as shareable.
        maxconnections: maximum number of connections generally allowed
            (0 or None means an arbitrary number of connections)
        blocking: determines behavior when exceeding the maximum
            (if this is set to true, block and wait until the number of
            connections decreases, otherwise an error will be reported)
        maxusage: maximum number of reuses of a single connection
            (0 or None means unlimited reuse)
            When this maximum usage number of the connection is reached,
            the connection is automatically reset (closed and reopened).
        setsession: optional list of SQL commands that may serve to prepare
            the session, e.g. ["set datestyle to ...", "set time zone ..."]
        reset: how connections should be reset when returned to the pool
            (False or None to rollback transcations started with begin(),
            True to always issue a rollback for safety's sake)
        failures: an optional exception class or a tuple of exception classes
            for which the connection failover mechanism shall be applied,
            if the default (OperationalError, InternalError) is not adequate
        ping: determines when the connection should be checked with ping()
            (0 = None = never, 1 = default = whenever fetched from the pool,
            2 = when a cursor is created, 4 = when a query is executed,
            7 = always, and all other bit combinations of these values)
        args, kwargs: the parameters that shall be passed to the creator
            function or the connection constructor of the DB-API 2 module

        """
        try:
            threadsafety = creator.threadsafety        except AttributeError:            try:                if not callable(creator.connect):                    raise AttributeError            except AttributeError:
                threadsafety = 2
            else:
                threadsafety = 0
        if not threadsafety:            raise NotSupportedError("Database module is not thread-safe.")
        self._creator = creator
        self._args, self._kwargs = args, kwargs
        self._blocking = blocking
        self._maxusage = maxusage
        self._setsession = setsession
        self._reset = reset
        self._failures = failures
        self._ping = ping        if mincached is None:
            mincached = 0
        if maxcached is None:
            maxcached = 0
        if maxconnections is None:
            maxconnections = 0
        if maxcached:            if maxcached < mincached:
                maxcached = mincached
            self._maxcached = maxcached        else:
            self._maxcached = 0
        if threadsafety > 1 and maxshared:
            self._maxshared = maxshared
            self._shared_cache = []  # the cache for shared connections
        else:
            self._maxshared = 0
        if maxconnections:            if maxconnections < maxcached:
                maxconnections = maxcached            if maxconnections < maxshared:
                maxconnections = maxshared
            self._maxconnections = maxconnections        else:
            self._maxconnections = 0
        self._idle_cache = []  # the actual pool of idle connections
        self._lock = Condition()
        self._connections = 0
        # Establish an initial number of idle database connections:
        idle = [self.dedicated_connection() for i in range(mincached)]        while idle:
            idle.pop().close()    def steady_connection(self):
        """Get a steady, unpooled DB-API 2 connection."""
        return connect(
            self._creator, self._maxusage, self._setsession,
            self._failures, self._ping, True, *self._args, **self._kwargs)    def connection(self, shareable=True):
        """Get a steady, cached DB-API 2 connection from the pool.

        If shareable is set and the underlying DB-API 2 allows it,
        then the connection may be shared with other threads.

        """
        if shareable and self._maxshared:
            self._lock.acquire()            try:                while (not self._shared_cache and self._maxconnections                        and self._connections >= self._maxconnections):
                    self._wait_lock()                if len(self._shared_cache) < self._maxshared:                    # shared cache is not full, get a dedicated connection
                    try:  # first try to get it from the idle cache
                        con = self._idle_cache.pop(0)                    except IndexError:  # else get a fresh connection
                        con = self.steady_connection()                    else:
                        con._ping_check()  # check this connection
                    con = SharedDBConnection(con)
                    self._connections += 1
                else:  # shared cache full or no more connections allowed
                    self._shared_cache.sort()  # least shared connection first
                    con = self._shared_cache.pop(0)  # get it
                    while con.con._transaction:                        # do not share connections which are in a transaction
                        self._shared_cache.insert(0, con)
                        self._wait_lock()
                        self._shared_cache.sort()
                        con = self._shared_cache.pop(0)
                    con.con._ping_check()  # check the underlying connection
                    con.share()  # increase share of this connection
                # put the connection (back) into the shared cache
                self._shared_cache.append(con)
                self._lock.notify()            finally:
                self._lock.release()
            con = PooledSharedDBConnection(self, con)        else:  # try to get a dedicated connection
            self._lock.acquire()            try:                while (self._maxconnections                        and self._connections >= self._maxconnections):
                    self._wait_lock()                # connection limit not reached, get a dedicated connection
                try:  # first try to get it from the idle cache
                    con = self._idle_cache.pop(0)                except IndexError:  # else get a fresh connection
                    con = self.steady_connection()                else:
                    con._ping_check()  # check connection
                con = PooledDedicatedDBConnection(self, con)
                self._connections += 1
            finally:
                self._lock.release()        return con    def dedicated_connection(self):
        """Alias for connection(shareable=False)."""
        return self.connection(False)    def unshare(self, con):
        """Decrease the share of a connection in the shared cache."""
        self._lock.acquire()        try:
            con.unshare()
            shared = con.shared            if not shared:  # connection is idle,
                try:  # so try to remove it
                    self._shared_cache.remove(con)  # from shared cache
                except ValueError:                    pass  # pool has already been closed
        finally:
            self._lock.release()        if not shared:  # connection has become idle,
            self.cache(con.con)  # so add it to the idle cache

    def cache(self, con):
        """Put a dedicated connection back into the idle cache."""
        self._lock.acquire()        try:            if not self._maxcached or len(self._idle_cache) < self._maxcached:
                con._reset(force=self._reset)  # rollback possible transaction
                # the idle cache is not full, so put it there
                self._idle_cache.append(con)  # append it to the idle cache
            else:  # if the idle cache is already full,
                con.close()  # then close the connection
            self._connections -= 1
            self._lock.notify()        finally:
            self._lock.release()    def close(self):
        """Close all connections in the pool."""
        self._lock.acquire()        try:            while self._idle_cache:  # close all idle connections
                con = self._idle_cache.pop(0)                try:
                    con.close()                except Exception:                    pass
            if self._maxshared:  # close all shared connections
                while self._shared_cache:
                    con = self._shared_cache.pop(0).con                    try:
                        con.close()                    except Exception:                        pass
                    self._connections -= 1
            self._lock.notifyAll()        finally:
            self._lock.release()    def __del__(self):
        """Delete the pool."""
        try:
            self.close()        except Exception:            pass

    def _wait_lock(self):
        """Wait until notified or report an error."""
        if not self._blocking:            raise TooManyConnections
        self._lock.wait()# Auxiliary classes for pooled connectionsclass PooledDedicatedDBConnection:
    """Auxiliary proxy class for pooled dedicated connections."""

    def __init__(self, pool, con):
        """
        Create a pooled dedicated connection.

        pool: the corresponding PooledDB instance
        con: the underlying SteadyDB connection

        """
        # basic initialization to make finalizer work
        self._con = None
        # proper initialization of the connection
        if not con.threadsafety():            raise NotSupportedError("Database module is not thread-safe.")
        self._pool = pool
        self._con = con    def close(self):
        """Close the pooled dedicated connection."""
        # Instead of actually closing the connection,
        # return it to the pool for future reuse.
        if self._con:
            self._pool.cache(self._con)
            self._con = None

    def __getattr__(self, name):
        """Proxy all members of the class."""
        if self._con:            return getattr(self._con, name)        else:            raise InvalidConnection    def __del__(self):
        """Delete the pooled connection."""
        try:
            self.close()        except Exception:            passclass SharedDBConnection:
    """Auxiliary class for shared connections."""

    def __init__(self, con):
        """Create a shared connection.

        con: the underlying SteadyDB connection

        """
        self.con = con
        self.shared = 1

    def __lt__(self, other):
        if self.con._transaction == other.con._transaction:            return self.shared < other.shared        else:            return not self.con._transaction    def __le__(self, other):
        if self.con._transaction == other.con._transaction:            return self.shared <= other.shared        else:            return not self.con._transaction    def __eq__(self, other):
        return (self.con._transaction == other.con._transaction                and self.shared == other.shared)    def __ne__(self, other):
        return not self.__eq__(other)    def __gt__(self, other):
        return other.__lt__(self)    def __ge__(self, other):
        return other.__le__(self)    def share(self):
        """Increase the share of this connection."""
        self.shared += 1

    def unshare(self):
        """Decrease the share of this connection."""
        self.shared -= 1class PooledSharedDBConnection:
    """Auxiliary proxy class for pooled shared connections."""

    def __init__(self, pool, shared_con):
        """Create a pooled shared connection.

        pool: the corresponding PooledDB instance
        con: the underlying SharedDBConnection

        """
        # basic initialization to make finalizer work
        self._con = None
        # proper initialization of the connection
        con = shared_con.con        if not con.threadsafety() > 1:            raise NotSupportedError("Database connection is not thread-safe.")
        self._pool = pool
        self._shared_con = shared_con
        self._con = con    def close(self):
        """Close the pooled shared connection."""
        # Instead of actually closing the connection,
        # unshare it and/or return it to the pool.
        if self._con:
            self._pool.unshare(self._shared_con)
            self._shared_con = self._con = None

    def __getattr__(self, name):
        """Proxy all members of the class."""
        if self._con:            return getattr(self._con, name)        else:            raise InvalidConnection    def __del__(self):
        """Delete the pooled connection."""
        try:
            self.close()        except Exception:            pass
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-07-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小脑斧科技博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 引言
  • 2. DBUtils
    • 2.1. 相关地址
      • 2.2. 安装
      • 3. DB-API2
      • 4. PooledDB
      • 5. 连接池的构成
        • 5.1. 异常类
          • 5.2. 连接池与连接
            • 5.2.1. 连接池 — PooledDB
            • 5.2.2. 连接类
        • 6. 连接池与线程安全
        • 7. 创建连接池
          • 7.1. 参数介绍
            • 7.2. 执行流程
            • 8. 创建连接
              • 8.1. 执行流程
              • 9. 连接的使用
                • 9.1. 构造方法
                  • 9.2. 连接的基本方法
                    • 9.3. 连接的关闭 — close
                    • 10. 基于连接池 DBUtils 封装单例数据库工具类
                    • 11. 附录 — PooledDB.py 源码
                    相关产品与服务
                    数据库
                    云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档