我正在使用fastapi
与postgres
和ormar
为ORM构建一个api (目前正在使用docker-compose
进行本地开发)。路由运行良好,但我很难使用本机异步ormar
添加JWT用户授权来确认db中的用户(我正在跟踪这个例子,但希望用调用authenticate
函数中的实际db替换fake_db
)。
我尝试使用asyncio
将数据库调用转换为同步操作,以便将其作为authenticate
的一部分执行,但仍然会出现错误,这表明我没有将任务添加到正确的线程或池中。
下面是引发问题的片段(具体来说,asyncio.run
就是它发生的地方)
import asyncio
from app.db import User
def authenticate(email: str, password: str) -> Optional[User]:
user = await User.objects.filter(email=email).first()
asyncio.run(user)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
这是堆栈跟踪
web_1 | Future exception was never retrieved
web_1 | future: <Future finished exception=ConnectionDoesNotExistError('connection was closed in the middle of operation')>
web_1 | asyncpg.exceptions.ConnectionDoesNotExistError: connection was closed in the middle of operation
web_1 | INFO: 192.168.176.2:50038 - "POST /auth/login HTTP/1.1" 500 Internal Server Error
web_1 | ERROR: Exception in ASGI application
web_1 | Traceback (most recent call last):
web_1 | File "/usr/local/lib/python3.8/site-packages/uvicorn/protocols/http/h11_impl.py", line 396, in run_asgi
web_1 | result = await app(self.scope, self.receive, self.send)
web_1 | File "/usr/local/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
web_1 | return await self.app(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/fastapi/applications.py", line 199, in __call__
web_1 | await super().__call__(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/applications.py", line 111, in __call__
web_1 | await self.middleware_stack(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
web_1 | raise exc from None
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
web_1 | await self.app(scope, receive, _send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
web_1 | raise exc from None
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
web_1 | await self.app(scope, receive, sender)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 566, in __call__
web_1 | await route.handle(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 227, in handle
web_1 | await self.app(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 41, in app
web_1 | response = await func(request)
web_1 | File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 201, in app
web_1 | raw_response = await run_endpoint_function(
web_1 | File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 150, in run_endpoint_function
web_1 | return await run_in_threadpool(dependant.call, **values)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/concurrency.py", line 34, in run_in_threadpool
web_1 | return await loop.run_in_executor(None, func, *args)
web_1 | File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 57, in run
web_1 | result = self.fn(*self.args, **self.kwargs)
web_1 | File "/app/./app/api/routes/auth.py", line 19, in login
web_1 | user = authenticate(email=form_data.username, password=form_data.password)
web_1 | File "/app/./app/core/auth.py", line 35, in authenticate
web_1 | asyncio.run(user)
web_1 | File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
web_1 | return loop.run_until_complete(main)
web_1 | File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
web_1 | return future.result()
web_1 | File "/app/./app/core/auth.py", line 19, in get_user
web_1 | return await User.objects.get(email=email) #User.objects.filter(email=email).first()
web_1 | File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 929, in get
web_1 | return await self.filter(*args, **kwargs).get()
web_1 | File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 945, in get
web_1 | rows = await self.database.fetch_all(expr)
web_1 | File "/usr/local/lib/python3.8/site-packages/databases/core.py", line 142, in fetch_all
web_1 | return await connection.fetch_all(query, values)
web_1 | File "/usr/local/lib/python3.8/site-packages/databases/core.py", line 248, in __aexit__
web_1 | await self._connection.release()
web_1 | File "/usr/local/lib/python3.8/site-packages/databases/backends/postgres.py", line 168, in release
web_1 | self._connection = await self._database._pool.release(self._connection)
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 666, in release
web_1 | return await asyncio.shield(ch.release(timeout))
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 218, in release
web_1 | raise ex
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 208, in release
web_1 | await self._con.reset(timeout=budget)
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 1311, in reset
web_1 | await self.execute(reset_query, timeout=timeout)
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 297, in execute
web_1 | return await self._protocol.query(query, timeout)
web_1 | File "asyncpg/protocol/protocol.pyx", line 321, in query
web_1 | File "asyncpg/protocol/protocol.pyx", line 684, in asyncpg.protocol.protocol.BaseProtocol._check_state
web_1 | asyncpg.exceptions._base.InterfaceError: cannot perform operation: another operation is in progress
如何同步运行对数据库的调用?
更新2021年12月28日
按@AndreaTedeschi的建议更改为下面的代码
async def get_user_by_email(email: str) -> Optional[User]:
return await User.objects.get_or_none(email=email)
def authenticate(email: str, password: str) -> Optional[User]:
user = get_user_by_email(email=email)
asyncio.run(user)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
我得到一个错误,没有附加到正确的循环。全堆栈跟踪:
web_1 | Future exception was never retrieved
web_1 | future: <Future finished exception=ConnectionDoesNotExistError('connection was closed in the middle of operation')>
web_1 | asyncpg.exceptions.ConnectionDoesNotExistError: connection was closed in the middle of operation
web_1 | INFO: 172.27.0.3:42414 - "POST /auth/login HTTP/1.1" 500 Internal Server Error
web_1 | ERROR: Exception in ASGI application
web_1 | Traceback (most recent call last):
web_1 | File "/usr/local/lib/python3.8/site-packages/uvicorn/protocols/http/h11_impl.py", line 396, in run_asgi
web_1 | result = await app(self.scope, self.receive, self.send)
web_1 | File "/usr/local/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
web_1 | return await self.app(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/fastapi/applications.py", line 199, in __call__
web_1 | await super().__call__(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/applications.py", line 111, in __call__
web_1 | await self.middleware_stack(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
web_1 | raise exc from None
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
web_1 | await self.app(scope, receive, _send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
web_1 | raise exc from None
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
web_1 | await self.app(scope, receive, sender)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 566, in __call__
web_1 | await route.handle(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 227, in handle
web_1 | await self.app(scope, receive, send)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 41, in app
web_1 | response = await func(request)
web_1 | File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 201, in app
web_1 | raw_response = await run_endpoint_function(
web_1 | File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 150, in run_endpoint_function
web_1 | return await run_in_threadpool(dependant.call, **values)
web_1 | File "/usr/local/lib/python3.8/site-packages/starlette/concurrency.py", line 34, in run_in_threadpool
web_1 | return await loop.run_in_executor(None, func, *args)
web_1 | File "/usr/local/lib/python3.8/concurrent/futures/thread.py", line 57, in run
web_1 | result = self.fn(*self.args, **self.kwargs)
web_1 | File "/app/./app/api/routes/auth.py", line 19, in login
web_1 | user = authenticate(email=form_data.username, password=form_data.password)
web_1 | File "/app/./app/core/auth.py", line 36, in authenticate
web_1 | asyncio.run(user, debug=True)
web_1 | File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
web_1 | return loop.run_until_complete(main)
web_1 | File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
web_1 | return future.result()
web_1 | File "/app/./app/core/auth.py", line 20, in get_user_by_email
web_1 | return await User.objects.get_or_none(email=email) #User.objects.filter(email=email).first()
web_1 | File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 912, in get_or_none
web_1 | return await self.get(*args, **kwargs)
web_1 | File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 933, in get
web_1 | return await self.filter(*args, **kwargs).get()
web_1 | File "/usr/local/lib/python3.8/site-packages/ormar/queryset/queryset.py", line 953, in get
web_1 | rows = await self.database.fetch_all(expr)
web_1 | File "/usr/local/lib/python3.8/site-packages/databases/core.py", line 148, in fetch_all
web_1 | return await connection.fetch_all(query, values)
web_1 | File "/usr/local/lib/python3.8/site-packages/databases/core.py", line 264, in __aexit__
web_1 | await self._connection.release()
web_1 | File "/usr/local/lib/python3.8/site-packages/databases/backends/postgres.py", line 168, in release
web_1 | self._connection = await self._database._pool.release(self._connection)
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 867, in release
web_1 | return await asyncio.shield(ch.release(timeout))
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 224, in release
web_1 | raise ex
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/pool.py", line 214, in release
web_1 | await self._con.reset(timeout=budget)
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 1367, in reset
web_1 | await self.execute(reset_query, timeout=timeout)
web_1 | File "/usr/local/lib/python3.8/site-packages/asyncpg/connection.py", line 318, in execute
web_1 | return await self._protocol.query(query, timeout)
web_1 | File "asyncpg/protocol/protocol.pyx", line 338, in query
web_1 | RuntimeError: Task <Task pending name='Task-34' coro=<PoolConnectionHolder.release() running at /usr/local/lib/python3.8/site-packages/asyncpg/pool.py:214> cb=[shield.<locals>._inner_done_callback() at /usr/local/lib/python3.8/asyncio/tasks.py:885] created at /usr/local/lib/python3.8/asyncio/tasks.py:878> got Future <Future pending cb=[Protocol._on_waiter_completed()]> attached to a different loop
因此取得了一些进展,但仍未完全取得进展。我该怎么解决这个问题?
显然,我不是异步python方面的专家,所以我很感谢您的帮助!
发布于 2021-12-28 11:30:12
您使用的orm是异步的。因此,查询数据库的方法是协同函数,必须等待。
您不能在正常函数中使用等待事件。您需要使用前缀async
才能使它成为一个coroutine函数:
async def get_by_username(username: str) -> Optional[User]:
return await User.objects.get_or_none(username=username)
async def authenticate(username: str, password: str) -> Optional[User]:
user = await get_by_username(username)
if not user:
return None
# do stuff about password verificazione
return username
https://stackoverflow.com/questions/70446806
复制相似问题