首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >SqlAlchemy异步orm:如何查询数据库

SqlAlchemy异步orm:如何查询数据库
EN

Stack Overflow用户
提问于 2021-07-13 10:27:50
回答 2查看 19K关注 0票数 15

在SqlAlchemy异步引擎中,如何查询表并获取值或全部?

我从非异步方法中知道我可以做的事情

SESSION.query(TableClass).get(x)

但是用异步方法尝试它会引发下一个错误:

AttributeError: 'AsyncSession' object has no attribute 'query'.

下面是定义的会话变量。循环变量仅用于在加载sql模块时启动异步方法,并填充用作缓存的变量,以避免每次需要缓存时缓存数据库:

代码语言:javascript
运行
复制
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, scoped_session

from .. import CONFIG, LOOP

def _build_async_db_uri(uri):
    if "+asyncpg" not in uri:
        return '+asyncpg:'.join(uri.split(":", 1))
    return uri

async def start() -> declarative_base:
    engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
    async with engine.begin() as conn:
        BASE.metadata.bind = engine
        await conn.run_sync(BASE.metadata.create_all)
    return scoped_session(sessionmaker(bind=engine, autoflush=False, class_=AsyncSession))


BASE = declarative_base()
SESSION = LOOP.run_until_complete(start())

下面是一个表和缓存函数的示例:

代码语言:javascript
运行
复制
class TableClass:
    __tablename__ = "tableclass"
    id = Column(Integer, primary_key = True)
    alias = Column(Integer)

CACHE = {}
async def _load_all():
    global CACHE
    try:
        curr = await SESSION.query(TableClass).all()
        CACHE = {i.id: i.alias for i in curr}

LOOP.run_until_complete(_load_all())
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-07-13 11:01:59

session.query是旧的API。异步版本使用select和伴随方法。

代码语言:javascript
运行
复制
from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker


engine = create_async_engine(_build_async_db_uri(CONFIG.general.sqlalchemy_db_uri))
async_session = sessionmaker(
    engine, expire_on_commit=False, class_=AsyncSession
)


CACHE = {}
async def _load_all():
    global CACHE
    try:
        async with async_session() as session:
            q = select(TableClass)
            result = await session.execute(q)
            curr = result.scalars()
            CACHE = {i.id: i.alias for i in curr}
    except:
        pass

LOOP.run_until_complete(_load_all())

您可以阅读更多关于SqlAlchemy异步I/O 这里的内容。

票数 21
EN

Stack Overflow用户

发布于 2022-05-10 13:35:00

按主键查询

新版本1.4:添加了Session.get(),它是从现在不再推荐的Query.get()方法中移出的。

代码语言:javascript
运行
复制
  id = 1
  user = await session.get(User, id)

Session.get - SQLAlchemy 1.4文档

按其他列查询

方法是使用select语句。

代码语言:javascript
运行
复制
statement = select(User).where(User.name == 'John')
result = await session.execute(statement)

# This will return a collection of users named 'John'
johns : list[User] = result.scalars.all()

# This will take the first one
first_john : User = result.scalars.one()

# This will take one, use it if you're expecting a single result
john : User = result.scalar()

选择- SQLAlchemy 1.4文档

执行

更多的例子

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68360687

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档