首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >数据库(postgres)查询的同步执行--使用异步不与ormar ORM ( query应用程序)协同工作

数据库(postgres)查询的同步执行--使用异步不与ormar ORM ( query应用程序)协同工作
EN

Stack Overflow用户
提问于 2021-12-22 09:24:24
回答 1查看 572关注 0票数 0

我正在使用fastapipostgresormar为ORM构建一个api (目前正在使用docker-compose进行本地开发)。路由运行良好,但我很难使用本机异步ormar添加JWT用户授权来确认db中的用户(我正在跟踪这个例子,但希望用调用authenticate函数中的实际db替换fake_db )。

我尝试使用asyncio将数据库调用转换为同步操作,以便将其作为authenticate的一部分执行,但仍然会出现错误,这表明我没有将任务添加到正确的线程或池中。

下面是引发问题的片段(具体来说,asyncio.run就是它发生的地方)

代码语言:javascript
运行
复制
import asyncio 
from app.db import User


def authenticate(email: str, password: str) -> Optional[User]:

    user = await User.objects.filter(email=email).first()
    asyncio.run(user)

    if not user:
        return None
    if not verify_password(password, user.hashed_password):
        return None
    return user

这是堆栈跟踪

代码语言:javascript
运行
复制
web_1      | Future exception was never retrieved
web_1      | future: <Future finished exception=ConnectionDoesNotExistError('connection was closed in the middle of operation')>
web_1      | asyncpg.exceptions.ConnectionDoesNotExistError: connection was closed in the middle of operation
web_1      | INFO:     192.168.176.2:50038 - "POST /auth/login HTTP/1.1" 500 Internal Server Error
web_1      | ERROR:    Exception in ASGI application
web_1      | Traceback (most recent call last):
web_1      |   File "/usr/local/lib/python3.8/site-packages/uvicorn/protocols/http/h11_impl.py", line 396, in run_asgi
web_1      |     result = await app(self.scope, self.receive, self.send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
web_1      |     return await self.app(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/fastapi/applications.py", line 199, in __call__
web_1      |     await super().__call__(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/applications.py", line 111, in __call__
web_1      |     await self.middleware_stack(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
web_1      |     raise exc from None
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
web_1      |     await self.app(scope, receive, _send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
web_1      |     raise exc from None
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
web_1      |     await self.app(scope, receive, sender)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 566, in __call__
web_1      |     await route.handle(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 227, in handle
web_1      |     await self.app(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 41, in app
web_1      |     response = await func(request)
web_1      |   File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 201, in app
web_1      |     raw_response = await run_endpoint_function(
web_1      |   File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 150, in run_endpoint_function
web_1      |     return await run_in_threadpool(dependant.call, **values)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/concurrency.py", line 34, in run_in_threadpool
web_1      |     return await loop.run_in_executor(None, func, *args)
web_1      |   File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 57, in run
web_1      |     result = self.fn(*self.args, **self.kwargs)
web_1      |   File "/app/./app/api/routes/auth.py", line 19, in login
web_1      |     user = authenticate(email=form_data.username, password=form_data.password)
web_1      |   File "/app/./app/core/auth.py", line 35, in authenticate
web_1      |     asyncio.run(user)
web_1      |   File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
web_1      |     return loop.run_until_complete(main)
web_1      |   File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
web_1      |     return future.result()
web_1      |   File "/app/./app/core/auth.py", line 19, in get_user
web_1      |     return await User.objects.get(email=email) #User.objects.filter(email=email).first()
web_1      |   File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 929, in get
web_1      |     return await self.filter(*args, **kwargs).get()
web_1      |   File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 945, in get
web_1      |     rows = await self.database.fetch_all(expr)
web_1      |   File "/usr/local/lib/python3.8/site-packages/databases/core.py", line 142, in fetch_all
web_1      |     return await connection.fetch_all(query, values)
web_1      |   File "/usr/local/lib/python3.8/site-packages/databases/core.py", line 248, in __aexit__
web_1      |     await self._connection.release()
web_1      |   File "/usr/local/lib/python3.8/site-packages/databases/backends/postgres.py", line 168, in release
web_1      |     self._connection = await self._database._pool.release(self._connection)
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 666, in release
web_1      |     return await asyncio.shield(ch.release(timeout))
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 218, in release
web_1      |     raise ex
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 208, in release
web_1      |     await self._con.reset(timeout=budget)
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 1311, in reset
web_1      |     await self.execute(reset_query, timeout=timeout)
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 297, in execute
web_1      |     return await self._protocol.query(query, timeout)
web_1      |   File "asyncpg/protocol/protocol.pyx", line 321, in query
web_1      |   File "asyncpg/protocol/protocol.pyx", line 684, in asyncpg.protocol.protocol.BaseProtocol._check_state
web_1      | asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress

如何同步运行对数据库的调用?

更新2021年12月28日

按@AndreaTedeschi的建议更改为下面的代码

代码语言:javascript
运行
复制
async def get_user_by_email(email: str) -> Optional[User]:
    return await User.objects.get_or_none(email=email) 

def authenticate(email: str, password: str) -> Optional[User]:
    
    user = get_user_by_email(email=email)
    asyncio.run(user)

    if not user:
        return None
    if not verify_password(password, user.hashed_password):
        return None
    return user

我得到一个错误,没有附加到正确的循环。全堆栈跟踪:

代码语言:javascript
运行
复制
web_1      | Future exception was never retrieved
web_1      | future: <Future finished exception=ConnectionDoesNotExistError('connection was closed in the middle of operation')>
web_1      | asyncpg.exceptions.ConnectionDoesNotExistError: connection was closed in the middle of operation
web_1      | INFO:     172.27.0.3:42414 - "POST /auth/login HTTP/1.1" 500 Internal Server Error
web_1      | ERROR:    Exception in ASGI application
web_1      | Traceback (most recent call last):
web_1      |   File "/usr/local/lib/python3.8/site-packages/uvicorn/protocols/http/h11_impl.py", line 396, in run_asgi
web_1      |     result = await app(self.scope, self.receive, self.send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
web_1      |     return await self.app(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/fastapi/applications.py", line 199, in __call__
web_1      |     await super().__call__(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/applications.py", line 111, in __call__
web_1      |     await self.middleware_stack(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
web_1      |     raise exc from None
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
web_1      |     await self.app(scope, receive, _send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
web_1      |     raise exc from None
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
web_1      |     await self.app(scope, receive, sender)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 566, in __call__
web_1      |     await route.handle(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 227, in handle
web_1      |     await self.app(scope, receive, send)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 41, in app
web_1      |     response = await func(request)
web_1      |   File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 201, in app
web_1      |     raw_response = await run_endpoint_function(
web_1      |   File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 150, in run_endpoint_function
web_1      |     return await run_in_threadpool(dependant.call, **values)
web_1      |   File "/usr/local/lib/python3.8/site-packages/starlette/concurrency.py", line 34, in run_in_threadpool
web_1      |     return await loop.run_in_executor(None, func, *args)
web_1      |   File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 57, in run
web_1      |     result = self.fn(*self.args, **self.kwargs)
web_1      |   File "/app/./app/api/routes/auth.py", line 19, in login
web_1      |     user = authenticate(email=form_data.username, password=form_data.password)
web_1      |   File "/app/./app/core/auth.py", line 36, in authenticate
web_1      |     asyncio.run(user, debug=True)
web_1      |   File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
web_1      |     return loop.run_until_complete(main)
web_1      |   File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
web_1      |     return future.result()
web_1      |   File "/app/./app/core/auth.py", line 20, in get_user_by_email
web_1      |     return await User.objects.get_or_none(email=email) #User.objects.filter(email=email).first()
web_1      |   File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 912, in get_or_none
web_1      |     return await self.get(*args, **kwargs)
web_1      |   File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 933, in get
web_1      |     return await self.filter(*args, **kwargs).get()
web_1      |   File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 953, in get
web_1      |     rows = await self.database.fetch_all(expr)
web_1      |   File "/usr/local/lib/python3.8/site-packages/databases/core.py", line 148, in fetch_all
web_1      |     return await connection.fetch_all(query, values)
web_1      |   File "/usr/local/lib/python3.8/site-packages/databases/core.py", line 264, in __aexit__
web_1      |     await self._connection.release()
web_1      |   File "/usr/local/lib/python3.8/site-packages/databases/backends/postgres.py", line 168, in release
web_1      |     self._connection = await self._database._pool.release(self._connection)
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 867, in release
web_1      |     return await asyncio.shield(ch.release(timeout))
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 224, in release
web_1      |     raise ex
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 214, in release
web_1      |     await self._con.reset(timeout=budget)
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 1367, in reset
web_1      |     await self.execute(reset_query, timeout=timeout)
web_1      |   File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 318, in execute
web_1      |     return await self._protocol.query(query, timeout)
web_1      |   File "asyncpg/protocol/protocol.pyx", line 338, in query
web_1      | RuntimeError: Task <Task pending name='Task-34' coro=<PoolConnectionHolder.release() running at /usr/local/lib/python3.8/site-packages/asyncpg/pool.py:214> cb=[shield.<locals>._inner_done_callback() at /usr/local/lib/python3.8/asyncio/tasks.py:885] created at /usr/local/lib/python3.8/asyncio/tasks.py:878> got Future <Future pending cb=[Protocol._on_waiter_completed()]> attached to a different loop

因此取得了一些进展,但仍未完全取得进展。我该怎么解决这个问题?

显然,我不是异步python方面的专家,所以我很感谢您的帮助!

EN

回答 1

Stack Overflow用户

发布于 2021-12-28 11:30:12

您使用的orm是异步的。因此,查询数据库的方法是协同函数,必须等待。

您不能在正常函数中使用等待事件。您需要使用前缀async才能使它成为一个coroutine函数:

代码语言:javascript
运行
复制
async def get_by_username(username: str) -> Optional[User]:
    return await User.objects.get_or_none(username=username)

async def authenticate(username: str, password: str) -> Optional[User]:
    user = await get_by_username(username)
    if not user:
        return None
    # do stuff about password verificazione
    return username
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70446806

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档