我可以在初始化SQLAlchemy
对象时使用以下命令设置该选项:
db = SQLAlchemy(app, session_options={"expire_on_commit": False})
但是通过这种方式,Flask-SQLAlchemy创建的所有会话都会将该选项设置为False,而我只想将其设置为一个会话。
我尝试了db.session.expire_on_commit = False
,但似乎没有任何效果。
发布于 2018-07-21 11:07:35
expire_on_commit
是sqlalchemy.orm.session.Session
类的参数。
获取Session
实例的首选方法是通过sqlalchemy.orm.session.sessionmaker
类。使用将用于创建Session
实例的设置配置sessionmaker
实例。例如:
>>> from sqlalchemy import create_engine
>>> from sqlalchemy.orm import sessionmaker
>>> engine = create_engine('sqlite:///:memory:')
>>> Session = sessionmaker(bind=engine)
>>> type(Session)
<class 'sqlalchemy.orm.session.sessionmaker'>
>>> session = Session()
>>> type(session)
<class 'sqlalchemy.orm.session.Session'>
因此,调用sessionmaker
实例将返回一个Session
实例。
使用这种配置,我们每次调用sessionmaker
实例时,都会得到一个新的Session
实例。
>>> session1 = Session()
>>> session2 = Session()
>>> session1 is session2
False
scoped_session
改变了上述行为:
>>> from sqlalchemy.orm import scoped_session
>>> Session = scoped_session(sessionmaker(bind=engine))
>>> type(Session)
<class 'sqlalchemy.orm.scoping.scoped_session'>
>>> session1 = Session()
>>> session2 = Session()
>>> session1 is session2
True
这就是Flask-SQLAlchemy“在幕后”使用的(这也是为什么@CodeLikeBeaker引导你访问会话API的评论是有效的)。这意味着每次在处理request
时调用db.session
时,您都在使用相同的底层会话。下面是与上面相同的示例,但使用了Flask-SQLAlchemy
扩展。
>>> type(db.session)
<class 'sqlalchemy.orm.scoping.scoped_session'>
>>> session1 = db.session()
>>> session2 = db.session()
>>> session1 is session2
True
请注意,本例中的type(db.session)
产生的结果与上一个示例中的type(Session)
完全相同。
由Flask-SQLAlchemy创建的所有会话都将该选项设置为False,而我只想为一个会话设置该选项。
考虑到Flask-SQLAlchemy只为每个请求创建一个会话的事实,我认为这意味着在处理请求时,您有时想要一个到expire_on_commit
的会话,有时不想。
实现此目的的一种方法是使用context manager暂时关闭expire_on_commit
:
from contextlib import contextmanager
@contextmanager
def no_expire():
s = db.session()
s.expire_on_commit = False
try:
yield
finally:
s.expire_on_commit = True
这是我的测试模型:
class Person(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(16))
配置日志记录以查看SQLAlchemy
正在执行的操作:
import logging
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
logging.basicConfig(level=logging.INFO)
创建一些测试数据:
db.drop_all()
db.create_all()
names = ('Jane', 'Tarzan')
db.session.add_all([Person(name=n) for n in names])
db.session.commit()
这是我用来测试的函数:
def test_func():
# query the db
people = Person.query.all()
# commit the session
db.session.commit()
# iterate through people accessing name to see if sql is emitted
for person in people:
print(f'Person is {person.name}')
db.session.rollback()
我在没有上下文管理器的情况下运行一次测试函数:
test_func()
下面是stdout:
INFO:sqlalchemy.engine.base.Engine:BEGIN (implicit)
INFO:sqlalchemy.engine.base.Engine:SELECT person.id AS person_id, person.name AS person_name
FROM person
INFO:sqlalchemy.engine.base.Engine:{}
INFO:sqlalchemy.engine.base.Engine:COMMIT
INFO:sqlalchemy.engine.base.Engine:BEGIN (implicit)
INFO:sqlalchemy.engine.base.Engine:SELECT person.id AS person_id, person.name AS person_name
FROM person
WHERE person.id = %(param_1)s
INFO:sqlalchemy.engine.base.Engine:{'param_1': 1}
*****Person is Jane*****
INFO:sqlalchemy.engine.base.Engine:SELECT person.id AS person_id, person.name AS person_name
FROM person
WHERE person.id = %(param_1)s
INFO:sqlalchemy.engine.base.Engine:{'param_1': 2}
*****Person is Tarzan*****
可以看到,在提交之后,重新发出sql以刷新对象属性。
一旦使用了上下文管理器:
db.session.rollback()
with no_expire():
test_func()
下面是上下文管理器的stdout:
INFO:sqlalchemy.engine.base.Engine:ROLLBACK
INFO:sqlalchemy.engine.base.Engine:BEGIN (implicit)
INFO:sqlalchemy.engine.base.Engine:SELECT person.id AS person_id, person.name AS person_name
FROM person
INFO:sqlalchemy.engine.base.Engine:{}
INFO:sqlalchemy.engine.base.Engine:COMMIT
*****Person is Jane*****
*****Person is Tarzan*****
https://stackoverflow.com/questions/51446322
复制相似问题