我正在用FastAPI写我的第一个项目,我有点困难。特别是,我不确定我应该如何在我的应用程序中使用异步连接池。目前我所拥有的是这样的
在db.py中,我有
pgpool = None
async def get_pool():
global pgpool
if not pgpool:
pgpool = await asyncpg.create_pool(dsn='MYDB_DSN')
return pgpool
然后在单独的文件中,我使用get_pool作为依赖项。
@router.post("/user/", response_model=models.User, status_code=201)
async def create_user(user: models.UserCreate, pgpool = Depends(get_pool)):
# ... do things ...
首先,我拥有的每个端点都使用数据库,因此为每个函数添加依赖参数似乎很愚蠢。其次,这似乎是一种拐弯抹角的做法。我定义一个全局变量,然后定义一个返回该全局变量的函数,然后注入该函数。我相信有更自然的方式去做。
我见过有人建议只将我需要的任何东西作为属性添加到app对象中
@app.on_event("startup")
async def startup():
app.pool = await asyncpg.create_pool(dsn='MYDB_DSN')
但是当我有多个路由器文件时,它就不起作用了,我不知道如何从路由器对象访问应用程序对象。
我遗漏了什么?
发布于 2020-08-06 07:49:18
您可以使用应用程序工厂模式来设置应用程序。
为了避免使用全局或直接向app对象添加内容,您可以创建自己的class Database来保存您的连接池。
要将连接池传递给每个路由,您可以使用中间件并将连接池添加到request.state
示例代码如下:
import asyncio
import asyncpg
from fastapi import FastAPI, Request
class Database():
async def create_pool(self):
self.pool = await asyncpg.create_pool(dsn='MYDB_DSN')
def create_app():
app = FastAPI()
db = Database()
@app.middleware("http")
async def db_session_middleware(request: Request, call_next):
request.state.pgpool = db.pool
response = await call_next(request)
return response
@app.on_event("startup")
async def startup():
await db.create_pool()
@app.on_event("shutdown")
async def shutdown():
# cleanup
pass
@app.get("/")
async def hello(request: Request):
print(request.state.pool)
return app
app = create_app()
发布于 2021-01-19 15:53:14
我这样做的方法是用db.py。
class Database:
def __init__(self,user,password,host,database,port="5432"):
self.user = user
self.password = password
self.host = host
self.port = port
self.database = database
self._cursor = None
self._connection_pool = None
self.con = None
async def connect(self):
if not self._connection_pool:
try:
self._connection_pool = await asyncpg.create_pool(
min_size=1,
max_size=20,
command_timeout=60,
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database,
ssl="require"
)
logger.info("Database pool connectionn opened")
except Exception as e:
logger.exception(e)
async def fetch_rows(self, query: str,*args):
if not self._connection_pool:
await self.connect()
else:
con = await self._connection_pool.acquire()
try:
result = await con.fetch(query,*args)
return result
except Exception as e:
logger.exception(e)
finally:
await self._connection_pool.release(con)
async def close(self):
if not self._connection_pool:
try:
await self._connection_pool.close()
logger.info("Database pool connection closed")
except Exception as e:
logger.exception(e)
然后在应用程序中
@app.on_event("startup")
async def startup_event():
database_instance = db.Database(**db_arguments)
await database_instance.connect()
app.state.db = database_instance
logger.info("Server Startup")
@app.on_event("shutdown")
async def shutdown_event():
if not app.state.db:
await app.state.db.close()
logger.info("Server Shutdown")
然后,您可以通过在路由中传入一个请求参数来使用request.app.state.db获取db实例。
https://stackoverflow.com/questions/63270196
复制相似问题