前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >SqlAlchemy 2.0 中文文档(五十三)

SqlAlchemy 2.0 中文文档(五十三)

作者头像
ApacheCN_飞龙
发布2024-08-01 11:30:03
950
发布2024-08-01 11:30:03
举报
文章被收录于专栏:信数据得永生

原文:docs.sqlalchemy.org/en/20/contents.html

常见问题

原文:docs.sqlalchemy.org/en/20/faq/index.html

常见问题部分是一系列常见问题的不断增长的集合,涵盖了众多已知问题。

  • 安装
    • 当我尝试使用 asyncio 时,为什么会出现关于未安装 greenlet 的错误?
  • 连接 / 引擎
    • 如何配置日志记录?
    • 如何池化数据库连接?我的连接是否被池化?
    • 如何将自定义连接参数传递给我的数据库 API?
    • “MySQL 服务器已断开连接”
    • “命令不同步;你现在无法运行此命令” / “此结果对象不返回行。它已被自动关闭”
    • 如何自动“重试”语句执行?
    • 为什么 SQLAlchemy 发出了这么多 ROLLBACKs?
    • 我正在使用 SQLite 数据库的多个连接(通常用于测试事务操作),但我的测试程序无法工作!
    • 在使用 Engine 时,如何获取原始的 DBAPI 连接?
    • 如何在 Python 多进程或 os.fork() 中使用引擎 / 连接 / 会话?
  • 元数据 / 模式
    • 当我使用 table.drop() / metadata.drop_all() 时,我的程序挂起了
    • SQLAlchemy 是否支持 ALTER TABLE、CREATE VIEW、CREATE TRIGGER、模式升级功能?
    • 如何按依赖顺序对 Table 对象进行排序?
    • 如何将 CREATE TABLE / DROP TABLE 输出作为字符串获取?
    • 如何派生 Table/Column 以提供某些行为/配置?
  • SQL 表达式
    • 如何将 SQL 表达式呈现为字符串,可能包含内联的绑定参数?
    • 当将 SQL 语句字符串化时,为什么百分号会被加倍?
    • 我正在使用 op() 生成自定义运算符,但我的括号不正确
  • ORM 配置
    • 如何映射没有主键的表?
    • 如何配置一个列,该列是 Python 保留字或类似的?
    • 如何在给定映射类的情况下获取所有列、关系、映射属性等的列表?
    • 我收到关于“隐式组合列 X 在属性 Y 下”的警告或错误
    • 我使用声明性,并使用 and_()or_() 设置 primaryjoin/secondaryjoin,并且收到有关外键的错误消息。
    • 为什么推荐 LIMIT 结合 ORDER BY(尤其是与 subqueryload() 一起)?
  • 性能
    • 为什么我升级到 1.4 和/或 2.x 后应用程序变慢?
    • 我如何对基于 SQLAlchemy 的应用程序进行性能分析?
    • 我正在使用 ORM 插入 400,000 行,但速度非常慢!
  • 会话 / 查询
    • 我正在使用我的会话重新加载数据,但它没有看到我在其他地方提交的更改
    • “由于 flush 期间的前一个异常,此会话的事务已回滚。”(或类似的)
    • 如何制作一个查询,始终向每个查询添加特定的过滤器?
    • 我的查询没有返回与 query.count() 告诉我的相同数量的对象 - 为什么?
    • 我已经创建了一个对外连接的映射,虽然查询返回了行,但没有返回对象。为什么?
    • 我使用 joinedload()lazy=False 创建了一个 JOIN/OUTER JOIN,但是当我尝试添加 WHERE、ORDER BY、LIMIT 等条件时,SQLAlchemy 并没有构造正确的查询(依赖于 (OUTER) JOIN)。
    • 查询没有 __len__(),为什么?
    • 如何在 ORM 查询中使用文本 SQL?
    • 我调用 Session.delete(myobject),但它没有从父集合中删除!
    • 为什么在加载对象时我的 __init__() 没有被调用?
    • 我如何在 SA 的 ORM 中使用 ON DELETE CASCADE?
    • 我将我的实例的“foo_id”属性设置为“7”,但“foo”属性仍然是None - 难道它不应该加载 id 为 #7 的 Foo 吗?
    • 如何遍历与给定对象相关的所有对象?
    • 是否有一种方法可以自动地只拥有唯一的关键词(或其他类型的对象),而不必查询关键词并获得包含该关键词的行的引用?
    • 为什么 post_update 除了第一个 UPDATE 外还会发出 UPDATE?
  • 第三方集成问题
    • 我遇到了与“numpy.int64”、“numpy.bool_”等相关的错误。
    • SQL 表达式中期望 WHERE/HAVING 角色,实际得到了 True

安装

原文:docs.sqlalchemy.org/en/20/faq/installation.html

  • 当我尝试使用 asyncio 时,出现了关于未安装 greenlet 的错误

当我尝试使用 asyncio 时,出现了关于未安装 greenlet 的错误

对于不提供预构建二进制轮的 CPU 架构,默认情况下不会安装 greenlet 依赖项。特别是,这包括 Apple M1。要安装包括 greenlet 的内容,请将 asyncio setuptools 额外内容添加到 pip install 命令中:

代码语言:javascript
复制
pip install sqlalchemy[asyncio]

欲了解更多背景信息,请参阅 Asyncio 平台安装说明(包括 Apple M1)。

另请参阅

Asyncio 平台安装说明(包括 Apple M1) ## 当我尝试使用 asyncio 时,出现了关于未安装 greenlet 的错误

对于不提供预构建二进制轮的 CPU 架构,默认情况下不会安装 greenlet 依赖项。特别是,这包括 Apple M1。要安装包括 greenlet 的内容,请将 asyncio setuptools 额外内容添加到 pip install 命令中:

代码语言:javascript
复制
pip install sqlalchemy[asyncio]

欲了解更多背景信息,请参阅 Asyncio 平台安装说明(包括 Apple M1)。

另请参阅

Asyncio 平台安装说明(包括 Apple M1)

连接 / 引擎

原文:docs.sqlalchemy.org/en/20/faq/connections.html

  • 我如何配置日志记录?
  • 我如何池化数据库连接?我的连接被池化了吗?
  • 我如何传递自定义连接参数给我的数据库 API?
  • “MySQL 服务器已断开连接”
  • “命令不同步;您现在无法运行此命令” / “此结果对象不返回行。它已被自动关闭”
  • 如何自动“重试”语句执行?
    • 使用 DBAPI 自动提交允许透明重连的只读版本
  • 为什么 SQLAlchemy 发出那么多回滚?
    • 我正在使用 MyISAM - 如何关闭它?
    • 我正在使用 SQL Server - 如何将那些回滚变成提交?
  • 我正在使用 SQLite 数据库的多个连接(通常用于测试事务操作),但我的测试程序不起作用!
  • 在使用引擎时如何获取原始 DBAPI 连接?
    • 访问 asyncio 驱动程序的底层连接
  • 如何在 Python 多进程或 os.fork() 中使用引擎 / 连接 / 会话?

我如何配置日志记录?

参见 配置日志记录。

我如何池化数据库连接?我的连接被池化了吗?

SQLAlchemy 在大多数情况下会自动执行应用程序级别的连接池。对于所有包含的方言(除了在使用“内存”数据库时的 SQLite 外),Engine 对象都指向 QueuePool 作为连接的来源。

更多细节,请参阅 引擎配置 和 连接池。

我如何传递自定义连接参数给我的数据库 API?

create_engine() 调用可以通过 connect_args 关键字参数直接接受附加参数:

代码语言:javascript
复制
e = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/test", connect_args={"encoding": "utf8"}
)

或者对于基本的字符串和整数参数,它们通常可以在 URL 的查询字符串中指定:

代码语言:javascript
复制
e = create_engine("mysql+mysqldb://scott:tiger@localhost/test?encoding=utf8")

另请参见

自定义 DBAPI connect() 参数 / 连接时例程

“MySQL 服务器已断开连接”

此错误的主要原因是 MySQL 连接已超时并已被服务器关闭。 MySQL 服务器会关闭空闲了一段时间(默认为八小时)的连接。 为了适应此情况,可立即设置为启用 create_engine.pool_recycle 设置,这将确保比一定时间旧的连接在下次检出时将被丢弃并替换为新连接。

对于更一般的情况,如适应数据库重新启动和由于网络问题而导致的临时连接丢失,池中的连接可能会在响应更广泛的断开连接检测技术时进行回收利用。 章节 处理断开连接 提供了关于“悲观”(例如预检)和“乐观”(例如优雅恢复)技术的背景。 现代 SQLAlchemy 倾向于采用“悲观”方法。

另请参见

处理断开连接

“命令不同步;您现在无法运行此命令” / “此结果对象不返回行。 它已被自动关闭”

MySQL 驱动程序存在一类失败模式,其中与服务器的连接状态处于无效状态。 通常,当再次使用连接时,将出现这两种错误消息之一。 原因是服务器的状态已更改为客户端库不期望的状态,因此当客户端库在连接上发出新语句时,服务器不会如预期地响应。

在 SQLAlchemy 中,由于数据库连接是池化的,连接上的消息不同步的问题变得更加重要,因为当操作失败时,如果连接本身处于不可用状态,如果它再次返回到连接池中,那么在再次检出时将会发生故障。 对此问题的缓解措施是当发生这种故障模式时连接被作废,以便底层 MySQL 数据库连接被丢弃。 对于许多已知的故障模式,此作废会自动发生,也可以通过 Connection.invalidate() 方法显式调用。

在此类别中还存在第二类故障模式,其中上下文管理器(例如with session.begin_nested():)希望在发生错误时“回滚”事务; 但是在某些连接的故障模式中,回滚本身(也可以是 RELEASE SAVEPOINT 操作)也会失败,导致误导性的堆栈跟踪。

最初,此错误的原因相当简单,它意味着多线程程序从多个线程调用单个连接上的命令。 这适用于原始的“MySQLdb”本机 C 驱动程序,这几乎是唯一使用的驱动程序。 但是,随着纯 Python 驱动程序(如 PyMySQL 和 MySQL-connector-Python)的引入,以及诸如 gevent/eventlet、多处理(通常与 Celery 一起使用)等工具的增加使用,已知有一整套因素会导致这个问题,其中一些因素已经在 SQLAlchemy 的不同版本中得到改进,但其他因素是无法避免的:

  • 在线程之间共享连接 - 这是这类错误发生的最初原因。 程序在同一时间在两个或多个线程中使用同一个连接,这意味着多组消息在连接上混合在一起,将服务器端会话置于客户端不再知道如何解释的状态。 但是,如今通常更有可能出现其他原因。
  • 在进程之间共享连接的文件句柄 - 这通常发生在程序使用os.fork()生成新进程时,父进程中存在的 TCP 连接被共享到一个或多个子进程。 由于多个进程现在向本质上是相同文件句柄的服务器发送消息,因此服务器接收到交错的消息并破坏连接的状态。 如果程序使用 Python 的“multiprocessing”模块,并使用在父进程中创建的Engine,则此场景可能非常容易发生。 使用工具如 Celery 时通常会使用“multiprocessing”。 正确的方法应该是在子进程首次启动时生成一个新的Engine,丢弃从父进程传递下来的任何Engine; 或者,从父进程继承的Engine可以通过调用Engine.dispose()来处理其内部连接池。
  • 使用 Greenlet Monkeypatching w/ Exits - 当使用像 gevent 或 eventlet 这样的库对 Python 网络 API 进行 monkeypatch 时,像 PyMySQL 这样的库现在以异步模式运行,即使它们并没有明确针对这种模型开发。一个常见问题是 greenthread 被中断,通常是由于应用程序中的超时逻辑。这导致引发GreenletExit异常,并且纯 Python MySQL 驱动程序被中断了其工作,可能是正在接收来自服务器的响应或准备以其他方式重置连接状态。当异常中断所有这些工作时,客户端和服务器之间的对话现在不同步,后续使用连接可能会失败。从版本 1.1.0 开始,SQLAlchemy 知道如何防范这种情况,如果数据库操作被所谓的“退出异常”中断,其中包括GreenletExit和任何不是Exception的 Python BaseException的子类,连接将被作废。
  • 回滚/SAVEPOINT 释放失败 - 某些类别的错误导致连接在事务上下文中无法使用,以及在“SAVEPOINT”块中操作时。在这些情况下,连接上的失败使任何 SAVEPOINT 不再存在,但当 SQLAlchemy 或应用程序尝试“回滚”此保存点时,“RELEASE SAVEPOINT”操作失败,通常会显示类似“savepoint does not exist”的消息。在这种情况下,在 Python 3 下会输出一系列异常,其中最终的错误“原因”也将被显示。在 Python 2 下,没有“链接”异常,但是最近的 SQLAlchemy 版本将尝试发出警告,说明原始失败原因,同时仍会抛出立即错误,即 ROLLBACK 的失败。## 如何自动“重试”语句执行?

文档部分处理断开连接讨论了对已自上次检查特定连接以来已断开的连接可用的策略。在这方面最现代的功能是create_engine.pre_ping参数,它允许在从池中检索数据库连接时发出“ping”,如果当前连接已断开,则重新连接。

需要注意的是,“ping” 仅在连接实际用于操作之前发出。一旦连接交付给调用方,根据 Python DBAPI 规范,它现在将受到 自动启动 操作的影响,这意味着当首次使用连接时将自动开始一个新事务,该事务将在后续语句中保持有效,直到调用 DBAPI 级别的 connection.commit()connection.rollback() 方法。

在 SQLAlchemy 的现代用法中,一系列 SQL 语句始终在这个事务状态下调用,假设未启用 DBAPI 自动提交模式(关于此后面会有更多介绍),这意味着没有单个语句会自动提交;如果操作失败,当前事务中所有语句的效果将丢失。

这对于“重试”语句的概念意味着在默认情况下,当连接丢失时,整个事务都将丢失。数据库无法“重新连接和重试”并继续之前的操作,因为数据已经丢失。因此,SQLAlchemy 没有一个在事务中途重新连接的透明“重连”功能。处理中途断开连接的操作的标准方法是从事务开始处重新尝试整个操作,通常通过使用一个自定义的 Python 装饰器,该装饰器会多次“重试”特定函数直到成功,或以其他方式设计应用程序以使其能够抵御因事务断开而导致操作失败。

还有一个扩展概念,可以跟踪事务中执行的所有语句,然后在新事务中重播它们以近似“重试”操作。SQLAlchemy 的 事件系统 确实允许构建这样一个系统,但这种方法通常也不太有用,因为无法保证这些 DML 语句是否针对相同的状态进行操作,一旦事务结束,新事务中的数据库状态可能完全不同。在事务操作开始和提交的地方显式地构建“重试”到应用程序中仍然是更好的方法,因为应用程序级别的事务方法最了解如何重新运行它们的步骤。

否则,如果 SQLAlchemy 提供了一个在事务中途自动且悄无声息地“重新连接”连接的功能,那么效果将是数据被悄无声息地丢失。通过试图隐藏问题,SQLAlchemy 将使情况变得更糟。

但是,如果我们 使用事务,那么就会有更多的选项可用,下一节将描述这些选项。

使用 DBAPI 自动提交允许透明重新连接的只读版本

在说明不具有透明重新连接机制的基础上,上一节假设应用实际上正在使用 DBAPI 级别的事务。由于大多数 DBAPI 现在提供了本机的“自动提交”设置,我们可以利用这些特性为只读,仅自动提交操作提供有限的透明重新连接形式。可以将透明语句重试应用于 DBAPI 的cursor.execute()方法,但是仍然不安全应用于 DBAPI 的cursor.executemany()方法,因为语句可能已经消耗了给定参数的任何部分。

警告

不应将以下方案用于编写数据的操作。用户应该仔细阅读并了解该方案的工作原理,并在将该方案投入生产使用之前对特定的目标 DBAPI 驱动程序非常仔细地测试故障模式。重试机制不能保证在所有情况下防止断开连接错误。

可以通过利用DialectEvents.do_execute()DialectEvents.do_execute_no_params()钩子向 DBAPI 级别的cursor.execute()方法应用简单的重试机制,该机制将能够在语句执行期间拦截断开连接。它将不会拦截在结果集获取操作期间的连接失败,对于那些不完全缓冲结果集的 DBAPI。该方案要求数据库支持 DBAPI 级别的自动提交,并且不能保证适用于特定的后端。提供了一个名为reconnecting_engine()的单一函数,它将事件钩子应用于给定的Engine对象,返回一个总是自动提交的版本,该版本支持 DBAPI 级别的自动提交。连接将在单参数和无参数语句执行时自动重新连接:

代码语言:javascript
复制
import time

from sqlalchemy import event

def reconnecting_engine(engine, num_retries, retry_interval):
    def _run_with_retries(fn, context, cursor_obj, statement, *arg, **kw):
        for retry in range(num_retries + 1):
            try:
                fn(cursor_obj, statement, context=context, *arg)
            except engine.dialect.dbapi.Error as raw_dbapi_err:
                connection = context.root_connection
                if engine.dialect.is_disconnect(raw_dbapi_err, connection, cursor_obj):
                    if retry > num_retries:
                        raise
                    engine.logger.error(
                        "disconnection error, retrying operation",
                        exc_info=True,
                    )
                    connection.invalidate()

                    # use SQLAlchemy 2.0 API if available
                    if hasattr(connection, "rollback"):
                        connection.rollback()
                    else:
                        trans = connection.get_transaction()
                        if trans:
                            trans.rollback()

                    time.sleep(retry_interval)
                    context.cursor = cursor_obj = connection.connection.cursor()
                else:
                    raise
            else:
                return True

    e = engine.execution_options(isolation_level="AUTOCOMMIT")

    @event.listens_for(e, "do_execute_no_params")
    def do_execute_no_params(cursor_obj, statement, context):
        return _run_with_retries(
            context.dialect.do_execute_no_params, context, cursor_obj, statement
        )

    @event.listens_for(e, "do_execute")
    def do_execute(cursor_obj, statement, parameters, context):
        return _run_with_retries(
            context.dialect.do_execute, context, cursor_obj, statement, parameters
        )

    return e

给定上述方案,可以使用以下概念验证脚本演示事务中的重新连接。运行一次后,它将每五秒向数据库发出一个SELECT 1语句:

代码语言:javascript
复制
from sqlalchemy import create_engine
from sqlalchemy import select

if __name__ == "__main__":
    engine = create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True)

    def do_a_thing(engine):
        with engine.begin() as conn:
            while True:
                print("ping: %s" % conn.execute(select([1])).scalar())
                time.sleep(5)

    e = reconnecting_engine(
        create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True),
        num_retries=5,
        retry_interval=2,
    )

    do_a_thing(e)

在脚本运行时重新启动数据库,以演示透明重新连接操作:

代码语言:javascript
复制
$ python reconnect_test.py
ping: 1
ping: 1
disconnection error, retrying operation
Traceback (most recent call last):
  ...
MySQLdb._exceptions.OperationalError: (2006, 'MySQL server has gone away')
2020-10-19 16:16:22,624 INFO sqlalchemy.pool.impl.QueuePool Invalidate connection <_mysql.connection open to 'localhost' at 0xf59240>
ping: 1
ping: 1
...

上述方案已针对 SQLAlchemy 1.4 进行了测试。

为什么 SQLAlchemy 会发出那么多的 ROLLBACK?

SQLAlchemy 目前假定 DBAPI 连接处于“非自动提交”模式 - 这是 Python 数据库 API 的默认行为,这意味着必须假定事务始终在进行中。当连接返回时,连接池会发出connection.rollback()。这样可以释放连接上剩余的任何事务资源。在像 PostgreSQL 或 MSSQL 这样的数据库中,表资源会被积极锁定,这一点至关重要,以防止行和表在不再使用的连接中保持锁定。否则应用程序可能会挂起。然而,这不仅仅是为了锁定,对于任何具有任何类型事务隔离的数据库,包括具有 InnoDB 的 MySQL,这同样至关重要。如果任何连接仍在旧事务中,那么该连接返回的数据将是过时的,如果在隔离中已经在该连接上查询了该数据。有关为什么甚至在 MySQL 上可能看到过时数据的背景,请参阅dev.mysql.com/doc/refman/5.1/en/innodb-transaction-model.html

我在 MyISAM 上 - 如何关闭它?

连接池的连接返回行为的行为可以使用reset_on_return进行配置:

代码语言:javascript
复制
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/myisam_database",
    pool=QueuePool(reset_on_return=False),
)
我在 SQL Server 上 - 如何将那些 ROLLBACKs 转换为 COMMITs?

reset_on_return接受commitrollback的值,以及TrueFalseNone。设置为commit将导致任何连接返回到池时进行 COMMIT:

代码语言:javascript
复制
engine = create_engine(
    "mssql+pyodbc://scott:tiger@mydsn", pool=QueuePool(reset_on_return="commit")
)

我正在使用 SQLite 数据库的多个连接(通常用于测试事务操作),但我的测试程序不起作用!

如果使用 SQLite 的:memory:数据库,默认连接池是SingletonThreadPool,每个线程保持一个 SQLite 连接。因此,在同一线程中使用两个连接实际上是相同的 SQLite 连接。确保您不使用:memory:数据库,以便引擎将使用QueuePool(当前 SQLAlchemy 版本中非内存数据库的默认值)。

另请参见

线程/池行为 - 有关 PySQLite 行为的信息。

当使用引擎时,如何访问原始 DBAPI 连接?

使用常规的 SA 引擎级 Connection,您可以通过Connection.connection属性在Connection上获取一个经过池代理的 DBAPI 连接版本,并且对于真正的 DBAPI 连接,您可以在其上调用PoolProxiedConnection.dbapi_connection属性。在常规的同步驱动程序中,通常不需要访问非经过池代���的 DBAPI 连接,因为所有方法都经过代理:

代码语言:javascript
复制
engine = create_engine(...)
conn = engine.connect()

# pep-249 style PoolProxiedConnection (historically called a "connection fairy")
connection_fairy = conn.connection

# typically to run statements one would get a cursor() from this
# object
cursor_obj = connection_fairy.cursor()
# ... work with cursor_obj

# to bypass "connection_fairy", such as to set attributes on the
# unproxied pep-249 DBAPI connection, use .dbapi_connection
raw_dbapi_connection = connection_fairy.dbapi_connection

# the same thing is available as .driver_connection (more on this
# in the next section)
also_raw_dbapi_connection = connection_fairy.driver_connection

自版本 1.4.24 起发生了变化:添加了PoolProxiedConnection.dbapi_connection属性,取代了先前的PoolProxiedConnection.connection属性,但该属性仍然可用;该属性始终提供一个符合 pep-249 同步风格的连接对象。还添加了PoolProxiedConnection.driver_connection属性,它将始终引用真实的驱动程序级连接,无论它展示了什么 API。

访问 asyncio 驱动程序的基础连接

当使用 asyncio 驱动程序时,上述方案有两个变化。首先是当使用AsyncConnection时,必须使用可等待方法AsyncConnection.get_raw_connection()来访问PoolProxiedConnection。在这种情况下返回的PoolProxiedConnection保留了一个同步风格的 pep-249 使用模式,而PoolProxiedConnection.dbapi_connection属性指的是一个将 asyncio 连接适配为同步风格 pep-249 API 的 SQLAlchemy 适配连接对象,换句话说,在使用 asyncio 驱动程序时会有两层代理。实际的 asyncio 连接可以从driver_connection属性中获取。以 asyncio 方式重新表述前面的示例如下:

代码语言:javascript
复制
async def main():
    engine = create_async_engine(...)
    conn = await engine.connect()

    # pep-249 style ConnectionFairy connection pool proxy object
    # presents a sync interface
    connection_fairy = await conn.get_raw_connection()

    # beneath that proxy is a second proxy which adapts the
    # asyncio driver into a pep-249 connection object, accessible
    # via .dbapi_connection as is the same with a sync API
    sqla_sync_conn = connection_fairy.dbapi_connection

    # the really-real innermost driver connection is available
    # from the .driver_connection attribute
    raw_asyncio_connection = connection_fairy.driver_connection

    # work with raw asyncio connection
    result = await raw_asyncio_connection.execute(...)

从版本 1.4.24 开始更改:添加了PoolProxiedConnection.dbapi_connectionPoolProxiedConnection.driver_connection属性,以允许通过一致的接口访问 pep-249 连接、pep-249 适配层和底层驱动程序连接。

使用 asyncio 驱动程序时,上述“DBAPI”连接实际上是一个经过 SQLAlchemy 适配的连接形式,它呈现同步风格的 pep-249 风格 API。要访问实际的 asyncio 驱动程序连接,可以通过PoolProxiedConnectionPoolProxiedConnection.driver_connection属性进行访问。对于标准的 pep-249 驱动程序,PoolProxiedConnection.dbapi_connectionPoolProxiedConnection.driver_connection是同义词。

在将连接返回到池之前,您必须确保将任何隔离级别设置或其他特定操作设置恢复为正常状态。

作为恢复设置的替代方法,您可以在Connection或代理连接上调用Connection.detach()方法,这将使连接与池解除关联,从而在调用Connection.close()时关闭并丢弃连接:

代码语言:javascript
复制
conn = engine.connect()
conn.detach()  # detaches the DBAPI connection from the connection pool
conn.connection.<go nuts>
conn.close()  # connection is closed for real, the pool replaces it with a new connection

如何在 Python 多进程或 os.fork()中使用引擎/连接/会话?

这在使用连接池与多进程或 os.fork()一节中有所涉及。

如何配置日志记录?

参见配置日志记录。

如何池化数据库连接?我的连接是否被池化了?

SQLAlchemy 在大多数情况下会自动执行应用程序级别的连接池。对于所有包含的方言(除了使用“内存”数据库的 SQLite),Engine 对象指的是一个 QueuePool 作为连接的来源。

更多详细信息,请参阅 引擎配置 和 连接池。

如何向我的数据库 API 传递自定义连接参数?

create_engine() 调用可以通过 connect_args 关键字参数直接接受附加参数:

代码语言:javascript
复制
e = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/test", connect_args={"encoding": "utf8"}
)

或者对于基本的字符串和整数参数,它们通常可以在 URL 的查询字符串中指定:

代码语言:javascript
复制
e = create_engine("mysql+mysqldb://scott:tiger@localhost/test?encoding=utf8")

另请参见

自定义 DBAPI connect() 参数 / 连接时例程

“MySQL 服务器已关闭连接”

此错误的主要原因是 MySQL 连接已超时并已被服务器关闭。MySQL 服务器会关闭空闲一段时间(默认为八小时)的连接。为了适应这一点,立即设置是启用 create_engine.pool_recycle 设置,这将确保超过一定秒数的连接在下次检出时被丢弃并替换为新连接。

对于更一般的情况,即适应数据库重新启动和由于网络问题导致的临时连接丢失,池中的连接可能会根据更广义的断开连接检测技术进行回收。章节 处理断开连接 提供了关于“悲观”(例如,预先 ping)和“乐观”(例如,优雅恢复)技术的背景。现代 SQLAlchemy 倾向于采用“悲观”方法。

另请参见

处理断开连接

“命令不同步;您现在无法运行此命令” / “此结果对象不返回行。它已被自动关闭”

MySQL 驱动程序存在一类相当广泛的故障模式,其中与服务器的连接状态处于无效状态。通常情况下,当再次使用连接时,将出现以下两个错误消息之一。原因是因为服务器的状态已更改为客户端库不期望的状态,因此当客户端库在连接上发出新语句时,服务器不会如预期地响应。

在 SQLAlchemy 中,由于数据库连接是池化的,连接上的消息不同步的问题变得更加重要,因为当一个操作失败时,如果连接本身处于不可用状态,如果它重新进入连接池,当再次检出时将发生故障。对于这个问题的缓解措施是,当出现这种故障模式时,连接被作废,以便底层数据库连接到 MySQL 被丢弃。这种作废对于许多已知的故障模式会自动发生,也可以通过Connection.invalidate()方法显式调用。

在这个类别中还有第二类故障模式,其中上下文管理器(如with session.begin_nested():)在发生错误时希望“回滚”事务;然而在某些连接的故障模式中,回滚本身(也可以是一个 RELEASE SAVEPOINT 操作)也会失败,导致误导性的堆栈跟踪。

最初,这种错误的原因通常很简单,意味着一个多线程程序从多个线程调用单个连接上的命令。这适用于最初几乎是唯一使用的原始“MySQLdb”本机 C 驱动程序。然而,随着纯 Python 驱动程序(如 PyMySQL 和 MySQL-connector-Python)的引入,以及诸如 gevent/eventlet、多进程(通常与 Celery 一起使用)等工具的增加使用,已知存在一整套因素会导致这个问题,其中一些已经在 SQLAlchemy 版本中得到改进,但另一些是不可避免的:

  • 在线程之间共享连接 - 这是这类错误发生的最初原因。程序在两个或多个线程中同时使用相同的连接,意味着多组消息在连接上混在一起,使得服务器端会话进入一个客户端不再知道如何解释的状态。然而,今天通常更可能出现其他原因。
  • 在进程之间共享连接的文件句柄 - 这通常发生在程序使用os.fork()生成新进程时,父进程中存在的 TCP 连接被共享到一个或多个子进程中。由于多个进程现在向基本上相同的文件句柄发送消息,服务器接收到交错的消息并破坏连接的状态。 如果程序使用 Python 的“multiprocessing”模块,并且使用了在父进程中创建的 Engine,则可能会很容易发生此情况。在使用 Celery 等工具时,使用“multiprocessing”是很常见的。正确的方法应该是在子进程第一次启动时生成一个新的 Engine,丢弃从父进程继承下来的任何 Engine;或者,从父进程继承的 Engine 可以通过调用 Engine.dispose() 来处理其内部的连接池。
  • 使用 Exit 的 Greenlet Monkeypatching - 当使用类似 gevent 或 eventlet 的库对 Python 网络 API 进行 monkeypatch 时,像 PyMySQL 这样的库现在以异步模式运行,即使它们并没有明确针对此模型开发。一个常见问题是 greenthread 被中断,通常是由于应用程序中的超时逻辑。这导致引发GreenletExit异常,并且纯 Python MySQL 驱动程序被中断了其工作,可能是正在接收来自服务器的响应或准备重新设置连接的状态。当异常中断了所有这些工作时,客户端和服务器之间的对话现在不再同步,连接的后续使用可能会失败。截至版本 1.1.0,SQLAlchemy 知道如何防范这种情况,因为如果数据库操作被所谓的“退出异常”中断,这包括GreenletExit和任何不是也是Exception子类的 Python BaseException的子类,则连接将无效。
  • 回滚 / SAVEPOINT 释放失败 - 某些类别的错误会导致连接在事务上下文中无法使用,以及在“SAVEPOINT”块中操作时无法使用。在这些情况下,连接上的故障使任何 SAVEPOINT 都不再存在,然而当 SQLAlchemy 或应用程序尝试“回滚”此 savepoint 时,“RELEASE SAVEPOINT”操作会失败,通常会出现“savepoint 不存在”的消息。在这种情况下,在 Python 3 下将输出一系列异常,其中最终的错误“原因”也将被显示出来。在 Python 2 下,没有“链接”异常,但是 SQLAlchemy 的最新版本将尝试发出警告,说明原始故障原因,同时仍然抛出 ROLLBACK 失败的立即错误。

如何自动“重试”语句执行?

文档部分 处理断开连接 讨论了对已经断开连接的池化连接可用的策略。在这方面最现代的特性是 create_engine.pre_ping 参数,它允许在从池中检索数据库连接时发出“ping”,如果当前连接已断开,则重新连接。

需要注意的是,此“ping”仅在连接实际用于操作之前发出。一旦连接被提供给调用者,根据 Python DBAPI 规范,它现在已经受到autobegin操作的影响,这意味着当首次使用时,它将自动开始一个新事务,该事务在后续语句中仍然有效,直到调用 DBAPI 级别的 connection.commit()connection.rollback() 方法。

在现代使用 SQLAlchemy 中,一系列 SQL 语句总是在事务状态下调用,假设未启用 DBAPI 自动提交模式(下一节将详细介绍),这意味着没有单个语句会自动提交;如果操作失败,当前事务内所有语句的影响都将丢失。

对于“重试”语句的含义是,默认情况下,当连接丢失时,整个事务都将丢失。数据库无法以有用的方式“重新连接和重试”,并继续上次执行的位置,因为数据已经丢失。因此,SQLAlchemy 没有一个能在事务进行中工作时透明地进行“重新连接”的功能,以处理数据库连接在使用过程中断开的情况。处理中途断开连接的规范方法是从事务开始处重试整个操作,通常通过使用自定义 Python 装饰器多次“重试”特定函数直到成功,或者以其他方式设计应用程序,使其能够抵御事务被中断而导致操作失败的情况。

还有一个概念,即扩展程序可以跟踪事务中已经执行的所有语句,然后在新事务中重新执行它们,以近似实现“重试”操作。SQLAlchemy 的事件系统确实允许构建这样一个系统,但这种方法通常也不实用,因为没有办法保证这些 DML 语句将针对相同的状态进行操作,一旦事务结束,数据库在新事务中的状态可能会完全不同。在事务操作开始和提交的点明确将“重试”架构化到应用程序中仍然是更好的方法,因为应用程序级别的事务方法最了解如何重新运行它们的步骤。

否则,如果 SQLAlchemy 提供了一个透明且静默地在事务中重新连接连接的功能,则效果将是数据被静默丢失。通过试图隐藏问题,SQLAlchemy 将使情况变得更糟。

然而,如果我们使用事务,则会有更多的选择,如下一节所述。

使用 DBAPI 自动提交允许只读版本的透明重新连接

由于没有透明的重新连接机制的理由已经说明,上一节建立在这样一个假设之上,即应用程序实际上正在使用 DBAPI 级别的事务。由于大多数 DBAPI 现在提供了本地的“自动提交”设置,我们可以利用这些特性来为只读、自动提交的操作提供有限形式的透明重新连接。可以将透明的语句重试应用于 DBAPI 的cursor.execute()方法,但仍然不安全应用于 DBAPI 的cursor.executemany()方法,因为该语句可能已经消耗了给定参数的任何部分。

警告

下面的方法应用于写入数据的操作。用户应该仔细阅读和理解该方法的工作原理,并仔细针对具体目标的 DBAPI 驱动程序测试故障模式,然后再在生产中使用该方法。重试机制不能保证在所有情况下都防止断开连接错误。

可以通过利用DialectEvents.do_execute()DialectEvents.do_execute_no_params()钩子来应用于 DBAPI 级别的cursor.execute()方法的简单重试机制,这将能够在语句执行期间拦截断开连接。对于那些不完全缓冲结果集的 DBAPI,它不会拦截在结果集获取操作期间的连接故障。该配方要求数据库支持 DBAPI 级别的自动提交,并且对于特定后端不能保证。提供了一个名为reconnecting_engine()的单个函数,它将事件钩子应用于给定的Engine对象,返回一个始终自动提交的版本,该版本启用了 DBAPI 级别的自动提交。连接将透明地重新连接以进行单参数和无参数语句执行:

代码语言:javascript
复制
import time

from sqlalchemy import event

def reconnecting_engine(engine, num_retries, retry_interval):
    def _run_with_retries(fn, context, cursor_obj, statement, *arg, **kw):
        for retry in range(num_retries + 1):
            try:
                fn(cursor_obj, statement, context=context, *arg)
            except engine.dialect.dbapi.Error as raw_dbapi_err:
                connection = context.root_connection
                if engine.dialect.is_disconnect(raw_dbapi_err, connection, cursor_obj):
                    if retry > num_retries:
                        raise
                    engine.logger.error(
                        "disconnection error, retrying operation",
                        exc_info=True,
                    )
                    connection.invalidate()

                    # use SQLAlchemy 2.0 API if available
                    if hasattr(connection, "rollback"):
                        connection.rollback()
                    else:
                        trans = connection.get_transaction()
                        if trans:
                            trans.rollback()

                    time.sleep(retry_interval)
                    context.cursor = cursor_obj = connection.connection.cursor()
                else:
                    raise
            else:
                return True

    e = engine.execution_options(isolation_level="AUTOCOMMIT")

    @event.listens_for(e, "do_execute_no_params")
    def do_execute_no_params(cursor_obj, statement, context):
        return _run_with_retries(
            context.dialect.do_execute_no_params, context, cursor_obj, statement
        )

    @event.listens_for(e, "do_execute")
    def do_execute(cursor_obj, statement, parameters, context):
        return _run_with_retries(
            context.dialect.do_execute, context, cursor_obj, statement, parameters
        )

    return e

给定上述配方,可以使用以下概念验证脚本演示事务中的重新连接。运行后,它将每五秒向数据库发出一个SELECT 1语句:

代码语言:javascript
复制
from sqlalchemy import create_engine
from sqlalchemy import select

if __name__ == "__main__":
    engine = create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True)

    def do_a_thing(engine):
        with engine.begin() as conn:
            while True:
                print("ping: %s" % conn.execute(select([1])).scalar())
                time.sleep(5)

    e = reconnecting_engine(
        create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True),
        num_retries=5,
        retry_interval=2,
    )

    do_a_thing(e)

在脚本运行时重新启动数据库以演示透明重连接操作:

代码语言:javascript
复制
$ python reconnect_test.py
ping: 1
ping: 1
disconnection error, retrying operation
Traceback (most recent call last):
  ...
MySQLdb._exceptions.OperationalError: (2006, 'MySQL server has gone away')
2020-10-19 16:16:22,624 INFO sqlalchemy.pool.impl.QueuePool Invalidate connection <_mysql.connection open to 'localhost' at 0xf59240>
ping: 1
ping: 1
...

上述配方已在 SQLAlchemy 1.4 中进行了测试。### 使用 DBAPI 自动提交允许透明重连接的只读版本

在未说明透明重连接机制的理由的情况下,前一节基于这样一种假设,即应用程序实际上正在使用 DBAPI 级别的事务。由于大多数 DBAPI 现在提供本地“自动提交”设置,我们可以利用这些特性为只读,仅自动提交操作提供一种有限形式的透明重连接。透明语句重试可以应用于 DBAPI 的cursor.execute()方法,但是仍然不安全应用于 DBAPI 的cursor.executemany()方法,因为该语句可能已经消耗了给定参数的任何部分。

警告

不应将以下配方用于写入数据的操作。用户应仔细阅读和理解配方的工作原理,并在生产使用此配方之前针对特定的 DBAPI 驱动程序非常仔细地测试故障模式。重试机制不能保证在所有情况下防止断开连接错误。

可以通过使用DialectEvents.do_execute()DialectEvents.do_execute_no_params()钩子向 DBAPI 级别的 cursor.execute() 方法应用简单的重试机制,这些钩子将能够在语句执行期间拦截断开连接。对于那些不完全缓冲结果集的 DBAPI,它将不会拦截结果集获取操作期间的连接故障。该方案要求数据库支持 DBAPI 级别的自动提交,并且不能保证适用于特定的后端。提供了一个名为 reconnecting_engine() 的单个函数,它将事件钩子应用于给定的 Engine 对象,返回一个始终启用 DBAPI 级别自动提交的版本。连接将自动重新连接以用于单参数和无参数语句执行:

代码语言:javascript
复制
import time

from sqlalchemy import event

def reconnecting_engine(engine, num_retries, retry_interval):
    def _run_with_retries(fn, context, cursor_obj, statement, *arg, **kw):
        for retry in range(num_retries + 1):
            try:
                fn(cursor_obj, statement, context=context, *arg)
            except engine.dialect.dbapi.Error as raw_dbapi_err:
                connection = context.root_connection
                if engine.dialect.is_disconnect(raw_dbapi_err, connection, cursor_obj):
                    if retry > num_retries:
                        raise
                    engine.logger.error(
                        "disconnection error, retrying operation",
                        exc_info=True,
                    )
                    connection.invalidate()

                    # use SQLAlchemy 2.0 API if available
                    if hasattr(connection, "rollback"):
                        connection.rollback()
                    else:
                        trans = connection.get_transaction()
                        if trans:
                            trans.rollback()

                    time.sleep(retry_interval)
                    context.cursor = cursor_obj = connection.connection.cursor()
                else:
                    raise
            else:
                return True

    e = engine.execution_options(isolation_level="AUTOCOMMIT")

    @event.listens_for(e, "do_execute_no_params")
    def do_execute_no_params(cursor_obj, statement, context):
        return _run_with_retries(
            context.dialect.do_execute_no_params, context, cursor_obj, statement
        )

    @event.listens_for(e, "do_execute")
    def do_execute(cursor_obj, statement, parameters, context):
        return _run_with_retries(
            context.dialect.do_execute, context, cursor_obj, statement, parameters
        )

    return e

根据上述方案,可以使用以下概念证明脚本演示事务中重新连接。运行一次后,它将每五秒向数据库发出一个SELECT 1语句:

代码语言:javascript
复制
from sqlalchemy import create_engine
from sqlalchemy import select

if __name__ == "__main__":
    engine = create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True)

    def do_a_thing(engine):
        with engine.begin() as conn:
            while True:
                print("ping: %s" % conn.execute(select([1])).scalar())
                time.sleep(5)

    e = reconnecting_engine(
        create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True),
        num_retries=5,
        retry_interval=2,
    )

    do_a_thing(e)

在脚本运行时重新启动数据库以演示透明的重新连接操作:

代码语言:javascript
复制
$ python reconnect_test.py
ping: 1
ping: 1
disconnection error, retrying operation
Traceback (most recent call last):
  ...
MySQLdb._exceptions.OperationalError: (2006, 'MySQL server has gone away')
2020-10-19 16:16:22,624 INFO sqlalchemy.pool.impl.QueuePool Invalidate connection <_mysql.connection open to 'localhost' at 0xf59240>
ping: 1
ping: 1
...

上述方案已经在 SQLAlchemy 1.4 上进行了测试。

为什么 SQLAlchemy 发出了那么多个 ROLLBACK?

SQLAlchemy 目前假设 DBAPI 连接处于“非自动提交”模式 - 这是 Python 数据库 API 的默认行为,这意味着必须假定事务始终在进行中。连接池在连接返回时发出 connection.rollback()。这是为了释放连接上仍然存在的任何事务资源。在像 PostgreSQL 或 MSSQL 这样的数据库上,表资源被积极地锁定,这一点至关重要,以确保行和表不会在不再使用的连接中保持锁定状态。否则,应用程序可能会挂起。然而,这不仅仅是为了锁定,并且在具有任何类型的事务隔离的任何数据库上同样关键,包括具有 InnoDB 的 MySQL。如果在隔离内在连接上已经查询了该数据,任何仍然处于旧事务中的连接将返回陈旧的数据。有关为什么即使在 MySQL 上也可能看到陈旧数据的背景,请参阅dev.mysql.com/doc/refman/5.1/en/innodb-transaction-model.html

我使用的是 MyISAM - 如何关闭它?

连接池的连接返回行为的行为可以使用 reset_on_return 进行配置:

代码语言:javascript
复制
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/myisam_database",
    pool=QueuePool(reset_on_return=False),
)
我使用的是 SQL Server - 如何将那些 ROLLBACKs 转换为 COMMITs?

reset_on_return 接受值 commitrollback,除了 TrueFalseNone。设置为 commit 将导致任何连接返回到池时进行 COMMIT:

代码语言:javascript
复制
engine = create_engine(
    "mssql+pyodbc://scott:tiger@mydsn", pool=QueuePool(reset_on_return="commit")
)
我正在使用 MyISAM - 如何关闭它?

可以使用 reset_on_return 配置连接池的连接返回行为:

代码语言:javascript
复制
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/myisam_database",
    pool=QueuePool(reset_on_return=False),
)
我正在使用 SQL Server - 如何将那些 ROLLBACKs 转换为 COMMITs?

reset_on_return 接受值 commitrollback,除了 TrueFalseNone。设置为 commit 将导致任何连接返回到池时进行 COMMIT:

代码语言:javascript
复制
engine = create_engine(
    "mssql+pyodbc://scott:tiger@mydsn", pool=QueuePool(reset_on_return="commit")
)

我正在使用 SQLite 数据库的多个连接(通常用于测试事务操作),但我的测试程序不起作用!

如果使用 SQLite 的 :memory: 数据库,默认连接池是 SingletonThreadPool,它每个线程维护一个 SQLite 连接。因此,在同一线程中使用两个连接实际上是相同的 SQLite 连接。确保您不是使用 :memory: 数据库,以便引擎将使用 QueuePool(当前 SQLAlchemy 版本中非内存数据库的默认值)。

另请参阅

线程/池行为 - 有关 PySQLite 行为的信息。

在使用 Engine 时如何访问原始的 DBAPI 连接?

使用常规的 SA 引擎级 Connection,您可以通过 Connection.connection 属性获取到一个池代理版本的 DBAPI 连接,并且对于真正的 DBAPI 连接,您可以在此调用 PoolProxiedConnection.dbapi_connection 属性。在常规的同步驱动程序中,通常不需要访问非池代理的 DBAPI 连接,因为所有方法都是通过代理的:

代码语言:javascript
复制
engine = create_engine(...)
conn = engine.connect()

# pep-249 style PoolProxiedConnection (historically called a "connection fairy")
connection_fairy = conn.connection

# typically to run statements one would get a cursor() from this
# object
cursor_obj = connection_fairy.cursor()
# ... work with cursor_obj

# to bypass "connection_fairy", such as to set attributes on the
# unproxied pep-249 DBAPI connection, use .dbapi_connection
raw_dbapi_connection = connection_fairy.dbapi_connection

# the same thing is available as .driver_connection (more on this
# in the next section)
also_raw_dbapi_connection = connection_fairy.driver_connection

在版本 1.4.24 中更改:添加了 PoolProxiedConnection.dbapi_connection 属性,它取代了以前的 PoolProxiedConnection.connection 属性,后者仍然可用;此属性始终提供 pep-249 同步风格的连接对象。还添加了 PoolProxiedConnection.driver_connection 属性,它将始终引用真正的驱动程序级连接,无论它呈现什么 API。

访问 asyncio 驱动程序的底层连接

在使用 asyncio 驱动程序时,上述方案有两个变化。首先是在使用AsyncConnection时,必须使用可等待方法AsyncConnection.get_raw_connection()来访问PoolProxiedConnection。在这种情况下返回的PoolProxiedConnection保留了同步风格的 pep-249 使用模式,而PoolProxiedConnection.dbapi_connection属性指的是一个将 asyncio 连接适配为同步风格 pep-249 API 的 SQLAlchemy 适配连接对象,换句话说,在使用 asyncio 驱动程序时存在两层代理。实际的 asyncio 连接可以从driver_connection属性中获取。将上述示例重新阐述为 asyncio 的形式如下:

代码语言:javascript
复制
async def main():
    engine = create_async_engine(...)
    conn = await engine.connect()

    # pep-249 style ConnectionFairy connection pool proxy object
    # presents a sync interface
    connection_fairy = await conn.get_raw_connection()

    # beneath that proxy is a second proxy which adapts the
    # asyncio driver into a pep-249 connection object, accessible
    # via .dbapi_connection as is the same with a sync API
    sqla_sync_conn = connection_fairy.dbapi_connection

    # the really-real innermost driver connection is available
    # from the .driver_connection attribute
    raw_asyncio_connection = connection_fairy.driver_connection

    # work with raw asyncio connection
    result = await raw_asyncio_connection.execute(...)

自版本 1.4.24 起更改:添加了PoolProxiedConnection.dbapi_connectionPoolProxiedConnection.driver_connection属性,以允许通过一致的接口访问 pep-249 连接、pep-249 适配层和底层驱动程序连接。

在使用 asyncio 驱动程序时,上述“DBAPI”连接实际上是 SQLAlchemy 适配的连接形式,它呈现了同步风格的 pep-249 风格 API。要访问实际的 asyncio 驱动程序连接,可以通过PoolProxiedConnection.driver_connection属性来访问PoolProxiedConnection。对于标准的 pep-249 驱动程序,PoolProxiedConnection.dbapi_connectionPoolProxiedConnection.driver_connection是同义词。

在将连接返回到池之前,必须确保将连接上的任何隔离级别设置或其他操作特定设置恢复为正常状态。

作为恢复设置的替代方案,您可以在 Connection 或代理连接上调用 Connection.detach() 方法,这将使连接与池解除关联,从而在调用 Connection.close() 时关闭和丢弃它:

代码语言:javascript
复制
conn = engine.connect()
conn.detach()  # detaches the DBAPI connection from the connection pool
conn.connection.<go nuts>
conn.close()  # connection is closed for real, the pool replaces it with a new connection
使用 asyncio 驱动程序访问底层连接

当使用 asyncio 驱动程序时,对上述方案有两个变化。首先是当使用 AsyncConnection 时,必须使用可等待方法 AsyncConnection.get_raw_connection() 访问 PoolProxiedConnection。在这种情况下返回的 PoolProxiedConnection 保留了同步样式 pep-249 使用模式,并且 PoolProxiedConnection.dbapi_connection 属性指向一个 SQLAlchemy 适配的连接对象,将 asyncio 连接适配为同步样式 pep-249 API,换句话说,当使用 asyncio 驱动程序时会有两层代理。实际的 asyncio 连接可以从 driver_connection 属性获得。在 asyncio 方面重新表述上一个示例如下:

代码语言:javascript
复制
async def main():
    engine = create_async_engine(...)
    conn = await engine.connect()

    # pep-249 style ConnectionFairy connection pool proxy object
    # presents a sync interface
    connection_fairy = await conn.get_raw_connection()

    # beneath that proxy is a second proxy which adapts the
    # asyncio driver into a pep-249 connection object, accessible
    # via .dbapi_connection as is the same with a sync API
    sqla_sync_conn = connection_fairy.dbapi_connection

    # the really-real innermost driver connection is available
    # from the .driver_connection attribute
    raw_asyncio_connection = connection_fairy.driver_connection

    # work with raw asyncio connection
    result = await raw_asyncio_connection.execute(...)

从版本 1.4.24 开始更改:添加了 PoolProxiedConnection.dbapi_connectionPoolProxiedConnection.driver_connection 属性,以允许使用一致的接口访问 pep-249 连接、pep-249 适配层和底层驱动程序连接。

在使用 asyncio 驱动程序时,上述“DBAPI”连接实际上是一个经过 SQLAlchemy 适配的连接形式,它呈现了一个同步风格的 pep-249 风格 API。要访问实际的 asyncio 驱动程序连接,它将呈现所使用驱动程序的原始 asyncio API,可以通过PoolProxiedConnectionPoolProxiedConnection.driver_connection属性进行访问。对于标准的 pep-249 驱动程序,PoolProxiedConnection.dbapi_connectionPoolProxiedConnection.driver_connection 是同义词。

在将连接返回到池之前,您必须确保将任何隔离级别设置或其他特定操作设置恢复为正常状态。

作为恢复设置的替代方案,您可以在Connection或代理连接上调用Connection.detach()方法,这将使连接与池解除关联,从而在调用Connection.close()时关闭并丢弃连接:

代码语言:javascript
复制
conn = engine.connect()
conn.detach()  # detaches the DBAPI connection from the connection pool
conn.connection.<go nuts>
conn.close()  # connection is closed for real, the pool replaces it with a new connection

我如何在 Python 多进程或 os.fork() 中使用引擎/连接/会话?

这在使用连接池与多进程或 os.fork()一节中有详细介绍。

元数据 / 模式

原文:docs.sqlalchemy.org/en/20/faq/metadata_schema.html

  • 当我说 table.drop() / metadata.drop_all() 时,我的程序挂起了
  • SQLAlchemy 支持 ALTER TABLE、CREATE VIEW、CREATE TRIGGER、Schema 升级功能吗?
  • 我如何按照它们的依赖关系对 Table 对象进行排序?
  • 我如何将 CREATE TABLE/ DROP TABLE 输出作为字符串获取?
  • 我如何子类化 Table/Column 以提供某些行为/配置?

当我说 table.drop() / metadata.drop_all() 时,我的程序挂起了

这通常对应两种情况:1. 使用 PostgreSQL,它对表锁非常严格,2. 您仍然打开了一个包含对表的锁定的连接,并且与用于 DROP 语句的连接不同。这是模式的最简化版本:

代码语言:javascript
复制
connection = engine.connect()
result = connection.execute(mytable.select())

mytable.drop(engine)

上述,连接池连接仍然被检出;此外,上述结果对象还保持对此连接的链接。如果使用“隐式执行”,结果将保持此连接打开,直到结果对象关闭或所有行都被耗尽。

调用 mytable.drop(engine) 试图在从 Engine 获取的第二连接上发出 DROP TABLE 操作,这将会被锁定。

解决方法是在发出 DROP TABLE 前关闭所有连接:

代码语言:javascript
复制
connection = engine.connect()
result = connection.execute(mytable.select())

# fully read result sets
result.fetchall()

# close connections
connection.close()

# now locks are removed
mytable.drop(engine)

SQLAlchemy 支持 ALTER TABLE、CREATE VIEW、CREATE TRIGGER、Schema 升级功能吗?

SQLAlchemy 直接不支持通用 ALTER。对于特殊的 ad-hoc 基础 DDL,可以使用 DDL 和相关构造。有关此主题的讨论,请参阅 自定义 DDL。

更全面的选择是使用模式迁移工具,例如 Alembic 或 SQLAlchemy-Migrate;有关此问题的讨论,请参阅 通过迁移修改数据库对象。

我如何按照它们的依赖关系对 Table 对象进行排序?

这可以通过 MetaData.sorted_tables 函数获得:

代码语言:javascript
复制
metadata_obj = MetaData()
# ... add Table objects to metadata
ti = metadata_obj.sorted_tables
for t in ti:
    print(t)

如何将 CREATE TABLE/ DROP TABLE 输出作为字符串获取?

现代的 SQLAlchemy 具有表示 DDL 操作的从句构造。这些可以像任何其他 SQL 表达式一样渲染为字符串:

代码语言:javascript
复制
from sqlalchemy.schema import CreateTable

print(CreateTable(mytable))

要获得特定于某个引擎的字符串:

代码语言:javascript
复制
print(CreateTable(mytable).compile(engine))

还有一种特殊形式的Engine可通过create_mock_engine()访问,允许将整个元数据创建序列转储为字符串,使用以下方法:

代码语言:javascript
复制
from sqlalchemy import create_mock_engine

def dump(sql, *multiparams, **params):
    print(sql.compile(dialect=engine.dialect))

engine = create_mock_engine("postgresql+psycopg2://", dump)
metadata_obj.create_all(engine, checkfirst=False)

Alembic 工具还支持一种“离线”SQL 生成模式,将数据库迁移呈现为 SQL 脚本。

如何对表/列进行子类化以提供特定的行为/配置?

TableColumn 不是直接子类化的良好目标。但是,可以使用创建函数来获取构造时的行为,并使用附加事件来处理与模式对象之间的链接,例如约束约定或命名约定。可以在 命名约定 中看到许多这些技术的示例。

当我说table.drop() / metadata.drop_all()时,我的程序挂起了。

这通常对应于两个条件:1. 使用 PostgreSQL,它对表锁非常严格,2. 你有一个仍然打开的连接,其中包含对表的锁,并且与用于 DROP 语句的连接不同。以下是该模式的最简版本:

代码语言:javascript
复制
connection = engine.connect()
result = connection.execute(mytable.select())

mytable.drop(engine)

上面,连接池连接仍然被检出;此外,上述结果对象还维护对此连接的链接。如果使用“隐式执行”,结果将保持此连接打开,直到关闭结果对象或耗尽所有行。

mytable.drop(engine)的调用尝试在从Engine获取的第二个连接上发出 DROP TABLE,这会导致锁定。

解决方案是在发出 DROP TABLE 前关闭所有连接:

代码语言:javascript
复制
connection = engine.connect()
result = connection.execute(mytable.select())

# fully read result sets
result.fetchall()

# close connections
connection.close()

# now locks are removed
mytable.drop(engine)

SQLAlchemy 支持 ALTER TABLE、CREATE VIEW、CREATE TRIGGER、Schema 升级功能吗?

SQLAlchemy 并不直接支持一般的 ALTER。对于特殊的按需 DDL,可以使用DDL和相关构造。有关此主题的讨论,请参阅 自定义 DDL。

更全面的选项是使用模式迁移工具,例如 Alembic 或 SQLAlchemy-Migrate;请参阅 通过迁移更改数据库对象 以讨论此问题。

如何按其依赖顺序对 Table 对象进行排序?

可通过MetaData.sorted_tables函数进行访问:

代码语言:javascript
复制
metadata_obj = MetaData()
# ... add Table objects to metadata
ti = metadata_obj.sorted_tables
for t in ti:
    print(t)

如何将 CREATE TABLE/DROP TABLE 输出为字符串?

现代的 SQLAlchemy 有表示 DDL 操作的子句构造。这些可以像任何其他 SQL 表达式一样渲染为字符串:

代码语言:javascript
复制
from sqlalchemy.schema import CreateTable

print(CreateTable(mytable))

要获取特定引擎的字符串:

代码语言:javascript
复制
print(CreateTable(mytable).compile(engine))

还有一种特殊形式的 Engine 可以通过 create_mock_engine() 获得,它允许将整个元数据创建序列转储为字符串,使用以下方法:

代码语言:javascript
复制
from sqlalchemy import create_mock_engine

def dump(sql, *multiparams, **params):
    print(sql.compile(dialect=engine.dialect))

engine = create_mock_engine("postgresql+psycopg2://", dump)
metadata_obj.create_all(engine, checkfirst=False)

Alembic 工具还支持一种“离线”SQL 生成模式,将数据库迁移呈现为 SQL 脚本。

我如何子类化 Table/Column 以提供某些行为/配置?

TableColumn 不适合直接进行子类化。但是,可以使用创建函数来获得在构造时的行为,以及使用附加事件来处理模式对象之间的链接行为,比如约束惯例或命名惯例。许多这些技术的示例可以在 命名约定 中看到。

SQL 表达式

原文:docs.sqlalchemy.org/en/20/faq/sqlexpressions.html

  • 如何将 SQL 表达式呈现为字符串,可能包含内联的绑定参数?
    • 针对特定数据库进行字符串化
    • 内联呈现绑定参数
    • 将“POSTCOMPILE”参数呈现为绑定参数
  • 在字符串化 SQL 语句时为什么百分号会被双倍显示?
  • 我正在使用 op() 生成自定义运算符,但我的括号没有正确显示
    • 为什么括号规则是这样的?

如何将 SQL 表达式呈现为字符串,可能包含内联的绑定参数?

SQLAlchemy Core 语句对象或表达式片段的“字符串化”,以及 ORM Query 对象,在大多数简单情况下都可以简单地使用 str() 内置函数来实现,如下所示,当与 print 函数一起使用时(请注意 Python print 函数如果不显式使用 str(),也会自动调用它):

代码语言:javascript
复制
>>> from sqlalchemy import table, column, select
>>> t = table("my_table", column("x"))
>>> statement = select(t)
>>> print(str(statement))
SELECT  my_table.x
FROM  my_table 

str() 内置函数或等效函数,可在 ORM Query 对象上调用,也可在诸如 select()insert() 等语句上调用,还可在任何表达式片段上调用,例如:

代码语言:javascript
复制
>>> from sqlalchemy import column
>>> print(column("x") == "some value")
x  =  :x_1 
针对特定数据库进行字符串化

当我们要将语句或片段字符串化时,如果包含具有特定于数据库的字符串格式的元素,或者包含仅在某种类型的数据库中可用的元素,则会出现复杂情况。在这些情况下,我们可能会得到一个不符合我们目标数据库正确语法的字符串化语句,或者操作可能会引发一个UnsupportedCompilationError异常。在这些情况下,有必要使用ClauseElement.compile()方法将语句字符串化,同时传递一个代表目标数据库的EngineDialect对象。例如,如果我们有一个 MySQL 数据库引擎,我们可以按照 MySQL 方言字符串化一个语句:

代码语言:javascript
复制
from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://scott:tiger@localhost/test")
print(statement.compile(engine))

更直接地,不需要构建一个Engine对象,我们可以直接实例化一个Dialect对象,如下所示,我们使用一个 PostgreSQL 方言:

代码语言:javascript
复制
from sqlalchemy.dialects import postgresql

print(statement.compile(dialect=postgresql.dialect()))

请注意,任何方言都可以使用create_engine()本身组装,使用一个虚拟 URL,然后访问Engine.dialect属性,比如如果我们想要一个 psycopg2 的方言对象:

代码语言:javascript
复制
e = create_engine("postgresql+psycopg2://")
psycopg2_dialect = e.dialect

给定一个 ORM Query对象时,为了访问ClauseElement.compile()方法,我们只需要首先访问Query.statement访问器:

代码语言:javascript
复制
statement = query.statement
print(statement.compile(someengine))
内联渲染绑定参数

警告

永远不要使用这些技术处理来自不受信任输入的字符串内容,比如来自 Web 表单或其他用户输入应用程序。SQLAlchemy 将 Python 值强制转换为直接 SQL 字符串值的功能不安全,并且不验证传递的数据类型。在针对关系数据库编程调用非 DDL SQL 语句时,始终使用绑定参数。

上述形式将渲染 SQL 语句,因为它被传递到 Python DBAPI,其中包括绑定参数不会内联渲染。SQLAlchemy 通常不会对绑定参数进行字符串化处理,因为这由 Python DBAPI 适当处理,更不用说绕过绑定参数可能是现代 Web 应用中被广泛利用的安全漏洞之一了。SQLAlchemy 在某些情况下有限的能力执行此字符串化,例如发出 DDL 时。为了访问此功能,可以使用传递给compile_kwargsliteral_binds标志:

代码语言:javascript
复制
from sqlalchemy.sql import table, column, select

t = table("t", column("x"))

s = select(t).where(t.c.x == 5)

# **do not use** with untrusted input!!!
print(s.compile(compile_kwargs={"literal_binds": True}))

# to render for a specific dialect
print(s.compile(dialect=dialect, compile_kwargs={"literal_binds": True}))

# or if you have an Engine, pass as first argument
print(s.compile(some_engine, compile_kwargs={"literal_binds": True}))

此功能主要用于日志记录或调试目的,其中获得查询的原始 sql 字符串可能会很有用。

上述方法的注意事项是,它仅支持基本类型,如整数和字符串,而且如果直接使用未设置预设值的bindparam(),它也无法对其进行字符串化处理。下面详细介绍了无条件对所有参数进行字符串化的方法。

提示

SQLAlchemy 不支持对所有数据类型进行完全字符串化的原因有三个:

  1. 当正常使用 DBAPI 时,该功能已被当前 DBAPI 支持。SQLAlchemy 项目不能被要求为所有后端的所有数据类型重复这种功能,因为这是多余的工作,还带来了重大的测试和持续支持开销。
  2. 对于特定数据库的绑定参数进行字符串化建议一种实际上将这些完全字符串化的语句传递给数据库以进行执行的用法。这是不必要和不安全的,SQLAlchemy 不希望以任何方式鼓励这种用法。
  3. 渲染字面值的区域是最有可能报告安全问题的地方。SQLAlchemy 尽量将安全参数字符串化的区域留给 DBAPI 驱动程序,这样每个 DBAPI 的具体细节可以得到适当和安全地处理。

由于 SQLAlchemy 故意不支持对所有数据类型的完全字符串化,因此在特定调试场景下执行此操作的技术包括以下内容。作为示例,我们将使用 PostgreSQL 的UUID数据类型:

代码语言:javascript
复制
import uuid

from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class A(Base):
    __tablename__ = "a"

    id = Column(Integer, primary_key=True)
    data = Column(UUID)

stmt = select(A).where(A.data == uuid.uuid4())

给定上述模型和语句,将比较一列与单个 UUID 值,将此语句与内联值一起进行字符串化的选项包括:

一些 DBAPI,如 psycopg2,支持像mogrify()这样的辅助函数,提供对它们的字面渲染功能的访问。要使用此类功能,请渲染 SQL 字符串而不使用literal_binds,并通过SQLCompiler.params访问器分别传递参数:

代码语言:javascript
复制
e = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

with e.connect() as conn:
    cursor = conn.connection.cursor()
    compiled = stmt.compile(e)

    print(cursor.mogrify(str(compiled), compiled.params))

上述代码将生成 psycopg2 的原始字节串:

代码语言:javascript
复制
b"SELECT a.id, a.data \nFROM a \nWHERE a.data = 'a511b0fc-76da-4c47-a4b4-716a8189b7ac'::uuid"

直接将SQLCompiler.params渲染到语句中,使用目标 DBAPI 的适当paramstyle。例如,psycopg2 DBAPI 使用命名的pyformat样式。render_postcompile的含义将在下一节中讨论。警告,这是不安全的,请勿使用不受信任的输入

代码语言:javascript
复制
e = create_engine("postgresql+psycopg2://")

# will use pyformat style, i.e. %(paramname)s for param
compiled = stmt.compile(e, compile_kwargs={"render_postcompile": True})

print(str(compiled) % compiled.params)

这将产生一个非工作的字符串,但仍然适合调试:

代码语言:javascript
复制
SELECT  a.id,  a.data
FROM  a
WHERE  a.data  =  9eec1209-50b4-4253-b74b-f82461ed80c1

另一个示例使用位置参数风格,如qmark,我们可以使用SQLCompiler.positiontup集合与SQLCompiler.params一起编译我们上面的语句,以便按其位置顺序检索语句的参数:

代码语言:javascript
复制
import re

e = create_engine("sqlite+pysqlite://")

# will use qmark style, i.e. ? for param
compiled = stmt.compile(e, compile_kwargs={"render_postcompile": True})

# params in positional order
params = (repr(compiled.params[name]) for name in compiled.positiontup)

print(re.sub(r"\?", lambda m: next(params), str(compiled)))

上述片段打印:

代码语言:javascript
复制
SELECT  a.id,  a.data
FROM  a
WHERE  a.data  =  UUID('1bd70375-db17-4d8c-94f1-fc2ef3aada26')

使用自定义 SQL 构造和编译扩展扩展,在用户定义的标志存在时以自定义方式渲染BindParameter对象。这个标志通过compile_kwargs字典发送,就像任何其他标志一样:

代码语言:javascript
复制
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import BindParameter

@compiles(BindParameter)
def _render_literal_bindparam(element, compiler, use_my_literal_recipe=False, **kw):
    if not use_my_literal_recipe:
        # use normal bindparam processing
        return compiler.visit_bindparam(element, **kw)

    # if use_my_literal_recipe was passed to compiler_kwargs,
    # render the value directly
    return repr(element.value)

e = create_engine("postgresql+psycopg2://")
print(stmt.compile(e, compile_kwargs={"use_my_literal_recipe": True}))

上述配方将打印:

代码语言:javascript
复制
SELECT  a.id,  a.data
FROM  a
WHERE  a.data  =  UUID('47b154cd-36b2-42ae-9718-888629ab9857')

对于内置于模型或语句中的特定类型的字符串化,可以使用TypeDecorator类使用TypeDecorator.process_literal_param()方法来提供任何数据类型的自定义字符串化:

代码语言:javascript
复制
from sqlalchemy import TypeDecorator

class UUIDStringify(TypeDecorator):
    impl = UUID

    def process_literal_param(self, value, dialect):
        return repr(value)

上述数据类型需要在模型内部明确使用,或者在语句内部使用type_coerce(),例如

代码语言:javascript
复制
from sqlalchemy import type_coerce

stmt = select(A).where(type_coerce(A.data, UUIDStringify) == uuid.uuid4())

print(stmt.compile(e, compile_kwargs={"literal_binds": True}))

再次打印相同形式:

代码语言:javascript
复制
SELECT  a.id,  a.data
FROM  a
WHERE  a.data  =  UUID('47b154cd-36b2-42ae-9718-888629ab9857')
将“POSTCOMPILE”参数渲染为绑定参数

SQLAlchemy 包含一个称为 BindParameter.expanding 的绑定参数变体,这是一个“延迟评估”的参数,当 SQL 构造编译时以中间状态呈现,然后在语句执行时进一步处理,当传递实际已知值时。默认情况下,通过 ColumnOperators.in_() 表达式使用“扩展”参数,以便 SQL 字符串可以安全地独立缓存,而不受传递给 ColumnOperators.in_() 的特定调用的实际值列表的影响:

代码语言:javascript
复制
>>> stmt = select(A).where(A.id.in_([1, 2, 3]))

若要将 IN 子句呈现为真实的绑定参数符号,请在 ClauseElement.compile() 中使用 render_postcompile=True 标志:

代码语言:javascript
复制
>>> e = create_engine("postgresql+psycopg2://")
>>> print(stmt.compile(e, compile_kwargs={"render_postcompile": True}))
SELECT  a.id,  a.data
FROM  a
WHERE  a.id  IN  (%(id_1_1)s,  %(id_1_2)s,  %(id_1_3)s) 

在关于呈现绑定参数的前一节中描述的 literal_binds 标志会自动将 render_postcompile 设置为 True,因此对于带有简单整数/字符串的语句,这些可以直接转换为字符串:

代码语言:javascript
复制
# render_postcompile is implied by literal_binds
>>> print(stmt.compile(e, compile_kwargs={"literal_binds": True}))
SELECT  a.id,  a.data
FROM  a
WHERE  a.id  IN  (1,  2,  3) 

SQLCompiler.paramsSQLCompiler.positiontup 也与 render_postcompile 兼容,因此在这里以相同的方式工作,例如 SQLite 的位置形式:

代码语言:javascript
复制
>>> u1, u2, u3 = uuid.uuid4(), uuid.uuid4(), uuid.uuid4()
>>> stmt = select(A).where(A.data.in_([u1, u2, u3]))

>>> import re
>>> e = create_engine("sqlite+pysqlite://")
>>> compiled = stmt.compile(e, compile_kwargs={"render_postcompile": True})
>>> params = (repr(compiled.params[name]) for name in compiled.positiontup)
>>> print(re.sub(r"\?", lambda m: next(params), str(compiled)))
SELECT  a.id,  a.data
FROM  a
WHERE  a.data  IN  (UUID('aa1944d6-9a5a-45d5-b8da-0ba1ef0a4f38'),  UUID('a81920e6-15e2-4392-8a3c-d775ffa9ccd2'),  UUID('b5574cdb-ff9b-49a3-be52-dbc89f087bfa')) 

警告

请记住,所有上述代码配方都是用于字符串化字面值,在将语句发送到数据库时绕过绑定参数的情况下,仅适用于:

  1. 使用仅限于调试目的
  2. 字符串不应传递到活动的生产数据库
  3. 仅与本地、可信赖的输入一起使用

上述用于字符串化字面值的配方在任何情况下都不安全,绝不应该用于生产数据库。## 字符串化 SQL 语句时为什么要双倍百分号?

许多 DBAPI 实现使用 pyformatformat paramstyle,其语法中必然涉及百分号。大多数这样做的 DBAPI 期望在用于语句的字符串形式中,百分号用于其他目的时应该是双倍的(即转义),例如:

代码语言:javascript
复制
SELECT  a,  b  FROM  some_table  WHERE  a  =  %s  AND  c  =  %s  AND  num  %%  modulus  =  0

当 SQL 语句由 SQLAlchemy 传递给底层的 DBAPI 时,绑定参数的替换方式与 Python 字符串插值运算符 % 相同,在许多情况下,DBAPI 实际上直接使用此运算符。以上,绑定参数的替换看起来像:

代码语言:javascript
复制
SELECT  a,  b  FROM  some_table  WHERE  a  =  5  AND  c  =  10  AND  num  %  modulus  =  0

像 PostgreSQL(默认 DBAPI 是 psycopg2)和 MySQL(默认 DBAPI 是 mysqlclient)这样的数据库的默认编译器将具有百分号转义行为:

代码语言:javascript
复制
>>> from sqlalchemy import table, column
>>> from sqlalchemy.dialects import postgresql
>>> t = table("my_table", column("value % one"), column("value % two"))
>>> print(t.select().compile(dialect=postgresql.dialect()))
SELECT  my_table."value %% one",  my_table."value %% two"
FROM  my_table 

当使用此类方言时,如果需要非 DBAPI 语句,而这些语句不包括绑定的参数符号,则可通过直接使用 Python 的 % 运算符来简单地替换空参数集来删除百分号:

代码语言:javascript
复制
>>> strstmt = str(t.select().compile(dialect=postgresql.dialect()))
>>> print(strstmt % ())
SELECT  my_table."value % one",  my_table."value % two"
FROM  my_table 

另一种方法是在使用的方言上设置不同的参数样式;所有 Dialect 实现都接受一个参数 paramstyle,将导致该方言的编译器使用给定的参数样式。下面,在用于编译的方言中设置了非常常见的 named 参数样式,以便百分号在 SQL 的编译形式中不再具有重要意义,并且将不再被转义:

代码语言:javascript
复制
>>> print(t.select().compile(dialect=postgresql.dialect(paramstyle="named")))
SELECT  my_table."value % one",  my_table."value % two"
FROM  my_table 
```## 我使用 op() 来生成自定义操作符,但是我的括号没有正确显示

`Operators.op()` 方法允许创建自定义数据库操作符,否则 SQLAlchemy 不会识别:

```py
>>> print(column("q").op("->")(column("p")))
q  ->  p 

但是,当在复合表达式的右侧使用时,它不会按我们的预期生成括号:

代码语言:javascript
复制
>>> print((column("q1") + column("q2")).op("->")(column("p")))
q1  +  q2  ->  p 

在上面的情况下,我们可能希望 (q1 + q2) -> p

对于此情况的解决方案是设置操作符的优先级,使用 Operators.op.precedence 参数,将其设置为一个较高的数字,其中 100 是最大值,而 SQLAlchemy 当前使用的任何操作符的最高数字为 15

代码语言:javascript
复制
>>> print((column("q1") + column("q2")).op("->", precedence=100)(column("p")))
(q1  +  q2)  ->  p 

我们还可以使用 ColumnElement.self_group() 方法通常强制将二元表达式(例如具有左/右操作数和运算符的表达式)括在括号中:

代码语言:javascript
复制
>>> print((column("q1") + column("q2")).self_group().op("->")(column("p")))
(q1  +  q2)  ->  p 
为什么括号规则是这样的?

当存在过多的括号或括号处于它们不期望的不寻常位置时,很多数据库都会出现问题,因此 SQLAlchemy 不会基于分组生成括号,它使用操作符优先级,如果操作符已知是可结合的,那么生成的括号将最小化。否则,像下面这样的表达式:

代码语言:javascript
复制
column("a") & column("b") & column("c") & column("d")

将产生:

代码语言:javascript
复制
(((a  AND  b)  AND  c)  AND  d)

这是可以接受的,但可能会让人们感到恼火(并被报告为错误)。在其他情况下,它会导致更容易混淆数据库或至少可读性更差的事物,例如:

代码语言:javascript
复制
column("q", ARRAY(Integer, dimensions=2))[5][6]

将产生:

代码语言:javascript
复制
((q[5])[6])

也有一些边缘情况,我们会得到类似"(x) = 7"这样的东西,数据库真的不喜欢这样。所以括号化并不是简单地加括号,它使用运算符优先级和结合性来确定分组。

对于Operators.op(),优先级的值默认为零。

如果我们将Operators.op.precedence的值默认为 100,例如最高值,会怎么样?然后这个表达式会加更多括号,但其他方面都没问题,也就是说,这两个是等价的:

代码语言:javascript
复制
>>> print((column("q") - column("y")).op("+", precedence=100)(column("z")))
(q  -  y)  +  z
>>> print((column("q") - column("y")).op("+")(column("z")))
q  -  y  +  z 

但这两个不是:

代码语言:javascript
复制
>>> print(column("q") - column("y").op("+", precedence=100)(column("z")))
q  -  y  +  z
>>> print(column("q") - column("y").op("+")(column("z")))
q  -  (y  +  z) 

目前,尚不清楚只要我们根据运算符优先级和结合性进行括号化,是否真的有一种方法可以自动为没有给定优先级的通用运算符进行括号化,以使其在所有情况下都有效,因为有时您希望自定义运算符具有比其他运算符更低的优先级,有时您希望它更高。

可能如果上面的“二元”表达式在调用op()时强制使用self_group()方法,假设左侧的复合表达式总是可以无害地加括号。也许这种改变可以在某个时候实现,但是目前保持括号化规则更加内部一致似乎是更安全的方法。 ## 如何将 SQL 表达式呈现为字符串,可能包含内联的绑定参数?

在大多数简单情况下,将 SQLAlchemy Core 语句对象或表达式片段以及 ORM Query 对象“字符串化”,就像在使用str()内置函数时一样简单,如下所示,当与print函数一起使用时(请注意 Python 的print函数如果我们不显式使用str(),也会自动调用它):

代码语言:javascript
复制
>>> from sqlalchemy import table, column, select
>>> t = table("my_table", column("x"))
>>> statement = select(t)
>>> print(str(statement))
SELECT  my_table.x
FROM  my_table 

内置函数str(),或者等效函数,可以在 ORM Query 对象上调用,也可以在任何语句上调用,比如select()insert()等,以及任何表达式片段,比如:

代码语言:javascript
复制
>>> from sqlalchemy import column
>>> print(column("x") == "some value")
x  =  :x_1 
针对特定数据库的字符串化

当我们要字符串化的语句或片段包含具有数据库特定字符串格式的元素,或者包含仅在某种类型的数据库中可用的元素时,会出现一个复杂性。在这些情况下,我们可能会得到一个不符合我们所针对的数据库的正确语法的字符串化语句,或者该操作可能会引发一个UnsupportedCompilationError异常。在这些情况下,必须使用ClauseElement.compile()方法对语句进行字符串化,同时传递一个表示目标数据库的EngineDialect对象。例如,如果我们有一个 MySQL 数据库引擎,我们可以如下将语句字符串化为 MySQL 方言:

代码语言:javascript
复制
from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://scott:tiger@localhost/test")
print(statement.compile(engine))

更直接地,不需要构建Engine对象,我们可以直接实例化一个Dialect对象,如下所示,我们使用 PostgreSQL 方言:

代码语言:javascript
复制
from sqlalchemy.dialects import postgresql

print(statement.compile(dialect=postgresql.dialect()))

请注意,可以使用create_engine()本身来组装任何方言,只需使用一个虚拟 URL 并访问Engine.dialect属性即可,例如,如果我们想要 psycopg2 的方言对象:

代码语言:javascript
复制
e = create_engine("postgresql+psycopg2://")
psycopg2_dialect = e.dialect

给定一个 ORM Query 对象,为了获取ClauseElement.compile()方法,我们只需要先访问Query.statement访问器:

代码语言:javascript
复制
statement = query.statement
print(statement.compile(someengine))
将绑定参数嵌入渲染

警告

永远不要将这些技术与来自不受信任输入的字符串内容一起使用,例如来自 Web 表单或其他用户输入应用程序。SQLAlchemy 将 Python 值强制转换为直接 SQL 字符串值的设施不安全,不安全地针对不受信任的输入,并且不验证传递的数据类型。在针对关系数据库程序化地调用非 DDL SQL 语句时,始终使用绑定参数。

上述形式将呈现 SQL 语句,就像它传递给 Python DBAPI 一样,其中绑定参数不会被内联呈现。SQLAlchemy 通常不会将绑定参数字符串化,因为这由 Python DBAPI 适当处理,更不用说绕过绑定参数可能是现代 Web 应用程序中最广泛利用的安全漏洞之一。SQLAlchemy 在某些情况下有限地能够执行此字符串化,比如发出 DDL。为了访问此功能,可以使用传递给 compile_kwargsliteral_binds 标志:

代码语言:javascript
复制
from sqlalchemy.sql import table, column, select

t = table("t", column("x"))

s = select(t).where(t.c.x == 5)

# **do not use** with untrusted input!!!
print(s.compile(compile_kwargs={"literal_binds": True}))

# to render for a specific dialect
print(s.compile(dialect=dialect, compile_kwargs={"literal_binds": True}))

# or if you have an Engine, pass as first argument
print(s.compile(some_engine, compile_kwargs={"literal_binds": True}))

此功能主要用于记录或调试目的,其中查询的原始 SQL 字符串可能会证明有用。

上述方法的注意事项是它仅支持基本类型,如整数和字符串,而且如果直接使用没有预设值的 bindparam(),它也无法将其字符串化。无条件地将所有参数字符串化的方法如下所述。

提示

SQLAlchemy 不支持所有数据类型的完全字符串化的原因有三个:

  1. 当正常使用 DBAPI 时,这是已经受支持的功能。SQLAlchemy 项目无法被要求为所有后端的每种数据类型复制这种功能,因为这是多余的工作,还会带来重大的测试和持续支持开销。
  2. 使用内联绑定参数进行字符串化,针对特定数据库,表明了一种实际将这些完全字符串化的语句传递到数据库执行的用法。这是不必要且不安全的,SQLAlchemy 不希望以任何方式鼓励这种用法。
  3. 渲染字面值的领域是最有可能报告安全问题的领域。SQLAlchemy 尽量将安全参数字符串化的问题留给 DBAPI 驱动程序处理,其中每个 DBAPI 的具体情况可以得到适当和安全的处理。

由于 SQLAlchemy 故意不支持对字面值的完全字符串化,因此在特定调试场景中执行此操作的技术包括以下内容。作为示例,我们将使用 PostgreSQL 的 UUID 数据类型:

代码语言:javascript
复制
import uuid

from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class A(Base):
    __tablename__ = "a"

    id = Column(Integer, primary_key=True)
    data = Column(UUID)

stmt = select(A).where(A.data == uuid.uuid4())

鉴于上述模型和语句,将比较列与单个 UUID 值,将此语句与内联值字符串化的选项包括:

一些 DBAPI,如 psycopg2,支持像 mogrify() 这样的辅助函数,提供对它们的字面渲染功能的访问。要使用这些功能,渲染 SQL 字符串时不要使用 literal_binds,并通过 SQLCompiler.params 访问器单独传递参数:

代码语言:javascript
复制
e = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

with e.connect() as conn:
    cursor = conn.connection.cursor()
    compiled = stmt.compile(e)

    print(cursor.mogrify(str(compiled), compiled.params))

上述代码将生成 psycopg2 的原始字节串:

代码语言:javascript
复制
b"SELECT a.id, a.data \nFROM a \nWHERE a.data = 'a511b0fc-76da-4c47-a4b4-716a8189b7ac'::uuid"

直接将SQLCompiler.params渲染到语句中,使用目标 DBAPI 的适当paramstyle。例如,psycopg2 DBAPI 使用命名的pyformat样式。render_postcompile的含义将在下一节中讨论。警告这不安全,请勿使用不受信任的输入

代码语言:javascript
复制
e = create_engine("postgresql+psycopg2://")

# will use pyformat style, i.e. %(paramname)s for param
compiled = stmt.compile(e, compile_kwargs={"render_postcompile": True})

print(str(compiled) % compiled.params)

这将产生一个非工作的字符串,但适合用于调试:

代码语言:javascript
复制
SELECT  a.id,  a.data
FROM  a
WHERE  a.data  =  9eec1209-50b4-4253-b74b-f82461ed80c1

另一个例子是使用位置参数风格,例如qmark,我们可以结合使用SQLCompiler.positiontup集合和SQLCompiler.params来在 SQLite 中呈现上述语句,以便按照编译后的顺序检索参数:

代码语言:javascript
复制
import re

e = create_engine("sqlite+pysqlite://")

# will use qmark style, i.e. ? for param
compiled = stmt.compile(e, compile_kwargs={"render_postcompile": True})

# params in positional order
params = (repr(compiled.params[name]) for name in compiled.positiontup)

print(re.sub(r"\?", lambda m: next(params), str(compiled)))

上述片段打印:

代码语言:javascript
复制
SELECT  a.id,  a.data
FROM  a
WHERE  a.data  =  UUID('1bd70375-db17-4d8c-94f1-fc2ef3aada26')

当存在用户定义的标志时,使用自定义 SQL 构造和编译扩展扩展以自定义方式呈现BindParameter对象。此标志通过compile_kwargs字典像其他标志一样发送:

代码语言:javascript
复制
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import BindParameter

@compiles(BindParameter)
def _render_literal_bindparam(element, compiler, use_my_literal_recipe=False, **kw):
    if not use_my_literal_recipe:
        # use normal bindparam processing
        return compiler.visit_bindparam(element, **kw)

    # if use_my_literal_recipe was passed to compiler_kwargs,
    # render the value directly
    return repr(element.value)

e = create_engine("postgresql+psycopg2://")
print(stmt.compile(e, compile_kwargs={"use_my_literal_recipe": True}))

上述配方将打印:

代码语言:javascript
复制
SELECT  a.id,  a.data
FROM  a
WHERE  a.data  =  UUID('47b154cd-36b2-42ae-9718-888629ab9857')

用于内置于模型或语句的特定类型字符串化的TypeDecorator类可使用TypeDecorator.process_literal_param()方法来提供任何数据类型的自定义字符串化:

代码语言:javascript
复制
from sqlalchemy import TypeDecorator

class UUIDStringify(TypeDecorator):
    impl = UUID

    def process_literal_param(self, value, dialect):
        return repr(value)

上述数据类型需要在模型内明确使用或在语句内部使用type_coerce(),例如

代码语言:javascript
复制
from sqlalchemy import type_coerce

stmt = select(A).where(type_coerce(A.data, UUIDStringify) == uuid.uuid4())

print(stmt.compile(e, compile_kwargs={"literal_binds": True}))

再次打印相同形式:

代码语言:javascript
复制
SELECT  a.id,  a.data
FROM  a
WHERE  a.data  =  UUID('47b154cd-36b2-42ae-9718-888629ab9857')
将“POSTCOMPILE”参数呈现为绑定参数

SQLAlchemy 包括一个变体绑定参数,称为 BindParameter.expanding,它是一个“延迟评估”的参数,在编译 SQL 构造时呈现为中间状态,然后在语句执行时进一步处理,当实际已知值传递时。 “扩展”参数默认用于 ColumnOperators.in_() 表达式,以便 SQL 字符串可以安全地独立于传递给 ColumnOperators.in_() 的特定值列表进行缓存:

代码语言:javascript
复制
>>> stmt = select(A).where(A.id.in_([1, 2, 3]))

要使用实际的绑定参数符号呈现 IN 子句,请在 ClauseElement.compile() 中使用 render_postcompile=True 标志:

代码语言:javascript
复制
>>> e = create_engine("postgresql+psycopg2://")
>>> print(stmt.compile(e, compile_kwargs={"render_postcompile": True}))
SELECT  a.id,  a.data
FROM  a
WHERE  a.id  IN  (%(id_1_1)s,  %(id_1_2)s,  %(id_1_3)s) 

前一节中关于渲染绑定参数的 literal_binds 标志自动将 render_postcompile 设置为 True,因此对于具有简单整数/字符串的语句,可以直接进行字符串化:

代码语言:javascript
复制
# render_postcompile is implied by literal_binds
>>> print(stmt.compile(e, compile_kwargs={"literal_binds": True}))
SELECT  a.id,  a.data
FROM  a
WHERE  a.id  IN  (1,  2,  3) 

SQLCompiler.paramsSQLCompiler.positiontup 也与 render_postcompile 兼容,因此以前的渲染内联绑定参数的方法在这里也可以正常工作,例如 SQLite 的位置形式:

代码语言:javascript
复制
>>> u1, u2, u3 = uuid.uuid4(), uuid.uuid4(), uuid.uuid4()
>>> stmt = select(A).where(A.data.in_([u1, u2, u3]))

>>> import re
>>> e = create_engine("sqlite+pysqlite://")
>>> compiled = stmt.compile(e, compile_kwargs={"render_postcompile": True})
>>> params = (repr(compiled.params[name]) for name in compiled.positiontup)
>>> print(re.sub(r"\?", lambda m: next(params), str(compiled)))
SELECT  a.id,  a.data
FROM  a
WHERE  a.data  IN  (UUID('aa1944d6-9a5a-45d5-b8da-0ba1ef0a4f38'),  UUID('a81920e6-15e2-4392-8a3c-d775ffa9ccd2'),  UUID('b5574cdb-ff9b-49a3-be52-dbc89f087bfa')) 

警告

请记住,所有上述代码示例,用于将字面值字符串化,将语句发送到数据库时绕过绑定参数的使用,仅在以下情况下使用

  1. 仅用于调试目的
  2. 字符串不应传递给生产数据库
  3. 仅用于本地、可信的输入

上述对字面值字符串化的方法在任何情况下都不安全,绝不应该用于生产数据库

针对特定数据库的字符串化

当我们要将要串化的语句或片段包含有特定于数据库的字符串格式的元素,或者当它包含有仅在某种类型的数据库中可用的元素时,就会出现一些复杂情况。在这些情况下,我们可能会得到一个串化的语句,该语句不符合我们所针对的数据库的正确语法,或者该操作可能会引发一个 UnsupportedCompilationError 异常。在这些情况下,有必要使用 ClauseElement.compile() 方法串化该语句,同时传递一个代表目标数据库的 EngineDialect 对象。如下,如果我们有一个 MySQL 数据库引擎,我们可以根据 MySQL 方言串化一个语句:

代码语言:javascript
复制
from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://scott:tiger@localhost/test")
print(statement.compile(engine))

更直接地,我们可以在不构建 Engine 对象的情况下直接实例化一个 Dialect 对象,如下所示,我们使用了一个 PostgreSQL 方言:

代码语言:javascript
复制
from sqlalchemy.dialects import postgresql

print(statement.compile(dialect=postgresql.dialect()))

注意,任何方言都可以使用 create_engine() 方法与一个虚拟 URL 配合组装,然后访问 Engine.dialect 属性,比如说如果我们想要一个 psycopg2 的方言对象:

代码语言:javascript
复制
e = create_engine("postgresql+psycopg2://")
psycopg2_dialect = e.dialect

当给定一个 ORM Query 对象时,为了获取到 ClauseElement.compile() 方法,我们只需要先访问 Query.statement 属性:

代码语言:javascript
复制
statement = query.statement
print(statement.compile(someengine))
将绑定参数内联渲染

警告

永远不要使用这些技术处理来自不受信任输入的字符串内容,比如来自网络表单或其他用户输入应用程序。SQLAlchemy 将 Python 值强制转换为直接的 SQL 字符串值的能力不安全且不验证传递的数据类型。在针对关系数据库进行非 DDL SQL 语句的编程调用时,始终使用绑定参数。

上述形式将渲染传递给 Python DBAPI 的 SQL 语句,其中包括绑定参数不会内联渲染。SQLAlchemy 通常不会字符串化绑定参数,因为这由 Python DBAPI 适当处理,更不用说绕过绑定参数可能是现代 Web 应用程序中被广泛利用的安全漏洞之一。SQLAlchemy 在某些情况下(如发出 DDL)有限地执行此字符串化。为了访问此功能,可以使用传递给 compile_kwargsliteral_binds 标志:

代码语言:javascript
复制
from sqlalchemy.sql import table, column, select

t = table("t", column("x"))

s = select(t).where(t.c.x == 5)

# **do not use** with untrusted input!!!
print(s.compile(compile_kwargs={"literal_binds": True}))

# to render for a specific dialect
print(s.compile(dialect=dialect, compile_kwargs={"literal_binds": True}))

# or if you have an Engine, pass as first argument
print(s.compile(some_engine, compile_kwargs={"literal_binds": True}))

此功能主要用于日志记录或调试目的,其中查询的原始 SQL 字符串可能会证明有用。

上述方法的注意事项是,它仅支持基本类型,如整数和字符串,而且如果直接使用没有预设值的 bindparam(),它也无法将其字符串化。在下面详细描述了无条件字符串化所有参数的方法。

提示

SQLAlchemy 不支持所有数据类型的完全字符串化的原因有三:

  1. 当正常使用 DBAPI 时,已经支持此功能。SQLAlchemy 项目不能被要求为所有后端的每种数据类型复制此功能,因为这是多余的工作,还会产生重大的测试和持续支持开销。
  2. 对于特定数据库,将边界参数内联化字符串化建议使用实际将这些完全字符串化的语句传递给数据库执行。这是不必要且不安全的,SQLAlchemy 不希望以任何方式鼓励这种用法。
  3. 字面值渲染领域是最有可能报告安全问题的领域。SQLAlchemy 尽量使安全参数字符串化领域成为 DBAPI 驱动程序的问题,其中每个 DBAPI 的具体情况都可以得到适当和安全地处理。

由于 SQLAlchemy 故意不支持对字面值的完全字符串化,因此在特定调试场景中进行这样的技术包括以下内容。例如,我们将使用 PostgreSQL 的 UUID 数据类型:

代码语言:javascript
复制
import uuid

from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import declarative_base

Base = declarative_base()

class A(Base):
    __tablename__ = "a"

    id = Column(Integer, primary_key=True)
    data = Column(UUID)

stmt = select(A).where(A.data == uuid.uuid4())

针对以上模型和语句将比较一列与单个 UUID 值的情况,使用内联值对该语句进行字符串化的选项包括:

一些 DBAPI(如 psycopg2)支持像 mogrify() 这样的辅助函数,提供对它们的字面值渲染功能的访问。要使用这些特性,渲染 SQL 字符串时不要使用 literal_binds,而是通过 SQLCompiler.params 访问器分别传递参数:

代码语言:javascript
复制
e = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

with e.connect() as conn:
    cursor = conn.connection.cursor()
    compiled = stmt.compile(e)

    print(cursor.mogrify(str(compiled), compiled.params))

上述代码将产生 psycopg2 的原始字节字符串:

代码语言:javascript
复制
b"SELECT a.id, a.data \nFROM a \nWHERE a.data = 'a511b0fc-76da-4c47-a4b4-716a8189b7ac'::uuid"

直接将 SQLCompiler.params 渲染到语句中,使用目标 DBAPI 的适当 paramstyle。例如,psycopg2 DBAPI 使用命名的 pyformat 样式。 render_postcompile 的含义将在下一节中讨论。 警告:这不安全,请不要使用不受信任的输入

代码语言:javascript
复制
e = create_engine("postgresql+psycopg2://")

# will use pyformat style, i.e. %(paramname)s for param
compiled = stmt.compile(e, compile_kwargs={"render_postcompile": True})

print(str(compiled) % compiled.params)

这将产生一个无效的字符串,尽管它适用于调试:

代码语言:javascript
复制
SELECT  a.id,  a.data
FROM  a
WHERE  a.data  =  9eec1209-50b4-4253-b74b-f82461ed80c1

另一个示例使用了位置参数风格,如 qmark,我们还可以使用 SQLCompiler.positiontup 集合与 SQLCompiler.params 结合使用,以便按编译后的语句中的位置顺序检索参数:

代码语言:javascript
复制
import re

e = create_engine("sqlite+pysqlite://")

# will use qmark style, i.e. ? for param
compiled = stmt.compile(e, compile_kwargs={"render_postcompile": True})

# params in positional order
params = (repr(compiled.params[name]) for name in compiled.positiontup)

print(re.sub(r"\?", lambda m: next(params), str(compiled)))

上述代码段将打印:

代码语言:javascript
复制
SELECT  a.id,  a.data
FROM  a
WHERE  a.data  =  UUID('1bd70375-db17-4d8c-94f1-fc2ef3aada26')

当存在用户定义的标志时,使用 自定义 SQL 构造和编译扩展 扩展以自定义方式渲染 BindParameter 对象。此标志通过 compile_kwargs 字典发送,就像发送任何其他标志一样:

代码语言:javascript
复制
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.expression import BindParameter

@compiles(BindParameter)
def _render_literal_bindparam(element, compiler, use_my_literal_recipe=False, **kw):
    if not use_my_literal_recipe:
        # use normal bindparam processing
        return compiler.visit_bindparam(element, **kw)

    # if use_my_literal_recipe was passed to compiler_kwargs,
    # render the value directly
    return repr(element.value)

e = create_engine("postgresql+psycopg2://")
print(stmt.compile(e, compile_kwargs={"use_my_literal_recipe": True}))

上述示例将打印:

代码语言:javascript
复制
SELECT  a.id,  a.data
FROM  a
WHERE  a.data  =  UUID('47b154cd-36b2-42ae-9718-888629ab9857')

对于内置于模型或语句中的特定类型字符串化,可以使用 TypeDecorator 类来使用 TypeDecorator.process_literal_param() 方法提供任何数据类型的自定义字符串化:

代码语言:javascript
复制
from sqlalchemy import TypeDecorator

class UUIDStringify(TypeDecorator):
    impl = UUID

    def process_literal_param(self, value, dialect):
        return repr(value)

上述数据类型需要在模型内或在语句中本地使用 type_coerce() 明确使用,例如

代码语言:javascript
复制
from sqlalchemy import type_coerce

stmt = select(A).where(type_coerce(A.data, UUIDStringify) == uuid.uuid4())

print(stmt.compile(e, compile_kwargs={"literal_binds": True}))

再次打印相同的形式:

代码语言:javascript
复制
SELECT  a.id,  a.data
FROM  a
WHERE  a.data  =  UUID('47b154cd-36b2-42ae-9718-888629ab9857')
将 “POSTCOMPILE” 参数呈现为绑定参数

SQLAlchemy 包含一个称为BindParameter.expanding的绑定参数变体,这是一个“延迟评估”的参数,当编译 SQL 结构时以中间状态呈现,然后在语句执行时进一步处理,当实际已知值被传递时。 “扩展”参数默认用于ColumnOperators.in_()表达式,以便 SQL 字符串可以安全地独立于传递给ColumnOperators.in_()的特定调用的实际值被缓存:

代码语言:javascript
复制
>>> stmt = select(A).where(A.id.in_([1, 2, 3]))

要使用实际的绑定参数符号呈现 IN 子句,请在ClauseElement.compile()中使用render_postcompile=True标志:

代码语言:javascript
复制
>>> e = create_engine("postgresql+psycopg2://")
>>> print(stmt.compile(e, compile_kwargs={"render_postcompile": True}))
SELECT  a.id,  a.data
FROM  a
WHERE  a.id  IN  (%(id_1_1)s,  %(id_1_2)s,  %(id_1_3)s) 

在先前有关渲染绑定参数的部分中描述的literal_binds标志会自动将render_postcompile设置为 True,因此对于具有简单 int/字符串的语句,可以直接将它们字符串化:

代码语言:javascript
复制
# render_postcompile is implied by literal_binds
>>> print(stmt.compile(e, compile_kwargs={"literal_binds": True}))
SELECT  a.id,  a.data
FROM  a
WHERE  a.id  IN  (1,  2,  3) 

SQLCompiler.paramsSQLCompiler.positiontuprender_postcompile兼容,因此在此处渲染内联绑定参数的先前方法也将以相同的方式工作,例如 SQLite 的位置形式:

代码语言:javascript
复制
>>> u1, u2, u3 = uuid.uuid4(), uuid.uuid4(), uuid.uuid4()
>>> stmt = select(A).where(A.data.in_([u1, u2, u3]))

>>> import re
>>> e = create_engine("sqlite+pysqlite://")
>>> compiled = stmt.compile(e, compile_kwargs={"render_postcompile": True})
>>> params = (repr(compiled.params[name]) for name in compiled.positiontup)
>>> print(re.sub(r"\?", lambda m: next(params), str(compiled)))
SELECT  a.id,  a.data
FROM  a
WHERE  a.data  IN  (UUID('aa1944d6-9a5a-45d5-b8da-0ba1ef0a4f38'),  UUID('a81920e6-15e2-4392-8a3c-d775ffa9ccd2'),  UUID('b5574cdb-ff9b-49a3-be52-dbc89f087bfa')) 

警告

请记住,所有上述字符串化文字值的代码示例,当将语句发送到数据库时绕过绑定参数的使用,只能在以下情况下使用

  1. 仅用于调试目的
  2. 该字符串不应传递给实时生产数据库
  3. 仅限于本地,可信任的输入

上述用于将文字值字符串化的方法在任何情况下都不安全,绝对不应该用于生产数据库

为什么在将 SQL 语句字符串化时百分号会被加倍?

许多 DBAPI 实现采用pyformatformat paramstyle,这在其语法中必然涉及百分号。这样做的大多数 DBAPI 都希望在使用的语句的字符串形式中,用于其他目的的百分号被双倍化(即转义),例如:

代码语言:javascript
复制
SELECT  a,  b  FROM  some_table  WHERE  a  =  %s  AND  c  =  %s  AND  num  %%  modulus  =  0

当 SQL 语句通过 SQLAlchemy 传递给底层 DBAPI 时,绑定参数的替换方式与 Python 字符串插值运算符%相同,在许多情况下,DBAPI 实际上直接使用这个运算符。上面,绑定参数的替换看起来像是:

代码语言:javascript
复制
SELECT  a,  b  FROM  some_table  WHERE  a  =  5  AND  c  =  10  AND  num  %  modulus  =  0

像 PostgreSQL(默认 DBAPI 是 psycopg2)和 MySQL(默认 DBAPI 是 mysqlclient)这样的数据库的默认编译器将具有这种百分号转义行为:

代码语言:javascript
复制
>>> from sqlalchemy import table, column
>>> from sqlalchemy.dialects import postgresql
>>> t = table("my_table", column("value % one"), column("value % two"))
>>> print(t.select().compile(dialect=postgresql.dialect()))
SELECT  my_table."value %% one",  my_table."value %% two"
FROM  my_table 

当使用这样的方言时,如果需要不包含绑定参数符号的非 DBAPI 语句,一种快速删除百分号的方法是直接使用 Python 的%运算符替换一个空的参数集:

代码语言:javascript
复制
>>> strstmt = str(t.select().compile(dialect=postgresql.dialect()))
>>> print(strstmt % ())
SELECT  my_table."value % one",  my_table."value % two"
FROM  my_table 

另一种方法是在使用的方言上设置不同的参数样式;所有Dialect实现都接受一个paramstyle参数,该参数将导致该方言的编译器使用给定的参数样式。下面,非常常见的named参数样式在用于编译的方言中设置,以便百分号在 SQL 的编译形式中不再重要,并且不再被转义:

代码语言:javascript
复制
>>> print(t.select().compile(dialect=postgresql.dialect(paramstyle="named")))
SELECT  my_table."value % one",  my_table."value % two"
FROM  my_table 

我正在使用 op()生成自定义运算符,但我的括号没出来正确

Operators.op()方法允许创建一个 SQLAlchemy 中未知的自定义数据库操作符:

代码语言:javascript
复制
>>> print(column("q").op("->")(column("p")))
q  ->  p 

然而,当将其用于复合表达式的右侧时,它不会生成我们期望的括号:

代码语言:javascript
复制
>>> print((column("q1") + column("q2")).op("->")(column("p")))
q1  +  q2  ->  p 

在上面的情况下,我们可能想要(q1 + q2) -> p

对于这种情况的解决方案是设置运算符的优先级,使用Operators.op.precedence参数,设置为一个高数字,其中 100 是最大值,当前任何 SQLAlchemy 运算符使用的最高数字是 15:

代码语言:javascript
复制
>>> print((column("q1") + column("q2")).op("->", precedence=100)(column("p")))
(q1  +  q2)  ->  p 

我们还可以通常通过使用ColumnElement.self_group()方法强制在二元表达式(例如具有左/右操作数和运算符的表达式)周围加上括号:

代码语言:javascript
复制
>>> print((column("q1") + column("q2")).self_group().op("->")(column("p")))
(q1  +  q2)  ->  p 
为什么括号的规则是这样的?

当存在过多的括号或者括号处于数据库不期望的不寻常位置时,许多数据库会报错,因此 SQLAlchemy 不基于分组生成括号,它使用操作符优先级以及如果操作符已知是可结合的,则生成最小的括号。否则,表达式如下:

代码语言:javascript
复制
column("a") & column("b") & column("c") & column("d")

将产生:

代码语言:javascript
复制
(((a  AND  b)  AND  c)  AND  d)

这样做可能会让人们感到不爽(并被报告为错误)。在其他情况下,它会导致更容易让数据库混淆或至少降低可读性,比如:

代码语言:javascript
复制
column("q", ARRAY(Integer, dimensions=2))[5][6]

将产生:

代码语言:javascript
复制
((q[5])[6])

还有一些边界情况,我们会得到像"(x) = 7"这样的东西,数据库真的不喜欢这样。因此,括号化不是简单地添加括号,而是使用运算符优先级和结合性来确定分组。

对于Operators.op(),优先级的值默认为零。

如果我们将Operators.op.precedence的值默认为 100,即最高值,会怎样呢?然后这个表达式会多加括号,但除此之外还是可以的,也就是说,这两个表达式是等价的:

代码语言:javascript
复制
>>> print((column("q") - column("y")).op("+", precedence=100)(column("z")))
(q  -  y)  +  z
>>> print((column("q") - column("y")).op("+")(column("z")))
q  -  y  +  z 

但是这两种情况不是:

代码语言:javascript
复制
>>> print(column("q") - column("y").op("+", precedence=100)(column("z")))
q  -  y  +  z
>>> print(column("q") - column("y").op("+")(column("z")))
q  -  (y  +  z) 

目前来看,只要我们根据运算符的优先级和结合性进行括号化,如果真的有一种方法可以自动为没有给定优先级的通用运算符进行括号化,从而在所有情况下都能正常工作,这还不清楚,因为有时您希望自定义的运算符具有比其他运算符更低的优先级,有时您希望它更高。

如果上面的“binary”表达式强制在调用op()时使用self_group()方法,假设左侧的复合表达式总是可以无害地加上括号,那么这种可能性是存在的。也许这种改变以后可以实现,但是目前来看,保持括号规则在内部更一致似乎是更安全的方法。

为什么括号规则会是这样?

当括号过多或者括号出现在它们不期望的不寻常位置时,许多数据库会抛出错误,因此 SQLAlchemy 不基于分组生成括号,而是使用运算符优先级,如果运算符已知为结合性,那么会尽量生成最少的括号。否则,表达式如下:

代码语言:javascript
复制
column("a") & column("b") & column("c") & column("d")

会产生:

代码语言:javascript
复制
(((a  AND  b)  AND  c)  AND  d)

这是可以的,但可能会让人们感到烦恼(并报告为错误)。在其他情况下,它会导致更容易让数据库混淆,或者至少影响可读性,比如:

代码语言:javascript
复制
column("q", ARRAY(Integer, dimensions=2))[5][6]

会产生:

代码语言:javascript
复制
((q[5])[6])

还有一些边界情况,我们会得到像"(x) = 7"这样的东西,数据库真的不喜欢这样。因此,括号化不是简单地添加括号,而是使用运算符优先级和结合性来确定分组。

对于Operators.op(),优先级的值默认为零。

如果我们将Operators.op.precedence的值默认为 100,即最高值,会怎样呢?然后这个表达式会多加括号,但除此之外还是可以的,也就是说,这两个表达式是等价的:

代码语言:javascript
复制
>>> print((column("q") - column("y")).op("+", precedence=100)(column("z")))
(q  -  y)  +  z
>>> print((column("q") - column("y")).op("+")(column("z")))
q  -  y  +  z 

但是这两种情况不是:

代码语言:javascript
复制
>>> print(column("q") - column("y").op("+", precedence=100)(column("z")))
q  -  y  +  z
>>> print(column("q") - column("y").op("+")(column("z")))
q  -  (y  +  z) 

现在,尚不清楚只要我们基于操作符优先级和结合性进行括号化,是否真的有一种方法可以自动为没有给定优先级的通用运算符添加括号,以便在所有情况下都能正常工作,因为有时您希望自定义操作符的优先级低于其他操作符,有时您希望它更高。

也许,如果上面的“二元”表达式在调用op()时强制使用了self_group()方法,假设左侧的复合表达式总是可以无害地加括号。也许这种改变可以在某个时候实现,然而就目前而言,保持括号规则更加内部一致似乎是更安全的做法。

代码语言:javascript
复制
(((a  AND  b)  AND  c)  AND  d)

这样做可能会让人们感到不爽(并被报告为错误)。在其他情况下,它会导致更容易让数据库混淆或至少降低可读性,比如:

代码语言:javascript
复制
column("q", ARRAY(Integer, dimensions=2))[5][6]

将产生:

代码语言:javascript
复制
((q[5])[6])

还有一些边界情况,我们会得到像"(x) = 7"这样的东西,数据库真的不喜欢这样。因此,括号化不是简单地添加括号,而是使用运算符优先级和结合性来确定分组。

对于Operators.op(),优先级的值默认为零。

如果我们将Operators.op.precedence的值默认为 100,即最高值,会怎样呢?然后这个表达式会多加括号,但除此之外还是可以的,也就是说,这两个表达式是等价的:

代码语言:javascript
复制
>>> print((column("q") - column("y")).op("+", precedence=100)(column("z")))
(q  -  y)  +  z
>>> print((column("q") - column("y")).op("+")(column("z")))
q  -  y  +  z 

但是这两种情况不是:

代码语言:javascript
复制
>>> print(column("q") - column("y").op("+", precedence=100)(column("z")))
q  -  y  +  z
>>> print(column("q") - column("y").op("+")(column("z")))
q  -  (y  +  z) 

目前来看,只要我们根据运算符的优先级和结合性进行括号化,如果真的有一种方法可以自动为没有给定优先级的通用运算符进行括号化,从而在所有情况下都能正常工作,这还不清楚,因为有时您希望自定义的运算符具有比其他运算符更低的优先级,有时您希望它更高。

如果上面的“binary”表达式强制在调用op()时使用self_group()方法,假设左侧的复合表达式总是可以无害地加上括号,那么这种可能性是存在的。也许这种改变以后可以实现,但是目前来看,保持括号规则在内部更一致似乎是更安全的方法。

为什么括号规则会是这样?

当括号过多或者括号出现在它们不期望的不寻常位置时,许多数据库会抛出错误,因此 SQLAlchemy 不基于分组生成括号,而是使用运算符优先级,如果运算符已知为结合性,那么会尽量生成最少的括号。否则,表达式如下:

代码语言:javascript
复制
column("a") & column("b") & column("c") & column("d")

会产生:

代码语言:javascript
复制
(((a  AND  b)  AND  c)  AND  d)

这是可以的,但可能会让人们感到烦恼(并报告为错误)。在其他情况下,它会导致更容易让数据库混淆,或者至少影响可读性,比如:

代码语言:javascript
复制
column("q", ARRAY(Integer, dimensions=2))[5][6]

会产生:

代码语言:javascript
复制
((q[5])[6])

还有一些边界情况,我们会得到像"(x) = 7"这样的东西,数据库真的不喜欢这样。因此,括号化不是简单地添加括号,而是使用运算符优先级和结合性来确定分组。

对于Operators.op(),优先级的值默认为零。

如果我们将Operators.op.precedence的值默认为 100,即最高值,会怎样呢?然后这个表达式会多加括号,但除此之外还是可以的,也就是说,这两个表达式是等价的:

代码语言:javascript
复制
>>> print((column("q") - column("y")).op("+", precedence=100)(column("z")))
(q  -  y)  +  z
>>> print((column("q") - column("y")).op("+")(column("z")))
q  -  y  +  z 

但是这两种情况不是:

代码语言:javascript
复制
>>> print(column("q") - column("y").op("+", precedence=100)(column("z")))
q  -  y  +  z
>>> print(column("q") - column("y").op("+")(column("z")))
q  -  (y  +  z) 

现在,尚不清楚只要我们基于操作符优先级和结合性进行括号化,是否真的有一种方法可以自动为没有给定优先级的通用运算符添加括号,以便在所有情况下都能正常工作,因为有时您希望自定义操作符的优先级低于其他操作符,有时您希望它更高。

也许,如果上面的“二元”表达式在调用op()时强制使用了self_group()方法,假设左侧的复合表达式总是可以无害地加括号。也许这种改变可以在某个时候实现,然而就目前而言,保持括号规则更加内部一致似乎是更安全的做法。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-06-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 常见问题
  • 安装
    • 当我尝试使用 asyncio 时,出现了关于未安装 greenlet 的错误
    • 连接 / 引擎
      • 我如何配置日志记录?
        • 我如何池化数据库连接?我的连接被池化了吗?
          • 我如何传递自定义连接参数给我的数据库 API?
            • “MySQL 服务器已断开连接”
              • “命令不同步;您现在无法运行此命令” / “此结果对象不返回行。 它已被自动关闭”
                • 使用 DBAPI 自动提交允许透明重新连接的只读版本
              • 为什么 SQLAlchemy 会发出那么多的 ROLLBACK?
                • 我在 MyISAM 上 - 如何关闭它?
                • 我在 SQL Server 上 - 如何将那些 ROLLBACKs 转换为 COMMITs?
              • 我正在使用 SQLite 数据库的多个连接(通常用于测试事务操作),但我的测试程序不起作用!
                • 当使用引擎时,如何访问原始 DBAPI 连接?
                  • 访问 asyncio 驱动程序的基础连接
                • 如何在 Python 多进程或 os.fork()中使用引擎/连接/会话?
                  • 如何配置日志记录?
                    • 如何池化数据库连接?我的连接是否被池化了?
                      • 如何向我的数据库 API 传递自定义连接参数?
                        • “MySQL 服务器已关闭连接”
                          • “命令不同步;您现在无法运行此命令” / “此结果对象不返回行。它已被自动关闭”
                            • 如何自动“重试”语句执行?
                              • 使用 DBAPI 自动提交允许只读版本的透明重新连接
                            • 为什么 SQLAlchemy 发出了那么多个 ROLLBACK?
                              • 我使用的是 MyISAM - 如何关闭它?
                              • 我使用的是 SQL Server - 如何将那些 ROLLBACKs 转换为 COMMITs?
                              • 我正在使用 MyISAM - 如何关闭它?
                              • 我正在使用 SQL Server - 如何将那些 ROLLBACKs 转换为 COMMITs?
                            • 我正在使用 SQLite 数据库的多个连接(通常用于测试事务操作),但我的测试程序不起作用!
                              • 在使用 Engine 时如何访问原始的 DBAPI 连接?
                                • 访问 asyncio 驱动程序的底层连接
                                • 使用 asyncio 驱动程序访问底层连接
                              • 我如何在 Python 多进程或 os.fork() 中使用引擎/连接/会话?
                              • 元数据 / 模式
                                • 当我说 table.drop() / metadata.drop_all() 时,我的程序挂起了
                                  • SQLAlchemy 支持 ALTER TABLE、CREATE VIEW、CREATE TRIGGER、Schema 升级功能吗?
                                    • 我如何按照它们的依赖关系对 Table 对象进行排序?
                                      • 如何将 CREATE TABLE/ DROP TABLE 输出作为字符串获取?
                                        • 如何对表/列进行子类化以提供特定的行为/配置?
                                          • 当我说table.drop() / metadata.drop_all()时,我的程序挂起了。
                                            • SQLAlchemy 支持 ALTER TABLE、CREATE VIEW、CREATE TRIGGER、Schema 升级功能吗?
                                              • 如何按其依赖顺序对 Table 对象进行排序?
                                                • 如何将 CREATE TABLE/DROP TABLE 输出为字符串?
                                                  • 我如何子类化 Table/Column 以提供某些行为/配置?
                                                  • SQL 表达式
                                                    • 如何将 SQL 表达式呈现为字符串,可能包含内联的绑定参数?
                                                      • 针对特定数据库进行字符串化
                                                      • 内联渲染绑定参数
                                                      • 将“POSTCOMPILE”参数渲染为绑定参数
                                                      • 为什么括号规则是这样的?
                                                      • 针对特定数据库的字符串化
                                                      • 将绑定参数嵌入渲染
                                                      • 将“POSTCOMPILE”参数呈现为绑定参数
                                                      • 针对特定数据库的字符串化
                                                      • 将绑定参数内联渲染
                                                      • 将 “POSTCOMPILE” 参数呈现为绑定参数
                                                    • 为什么在将 SQL 语句字符串化时百分号会被加倍?
                                                      • 我正在使用 op()生成自定义运算符,但我的括号没出来正确
                                                        • 为什么括号的规则是这样的?
                                                        • 为什么括号规则会是这样?
                                                        • 为什么括号规则会是这样?
                                                    相关产品与服务
                                                    数据库
                                                    云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
                                                    领券
                                                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档