我有一个FastAPI应用程序,在几个不同的场合,需要调用外部API。我在这些电话中使用httpx.AsyncClient。关键是我不完全明白我应该如何使用它。
在httpx文档中,我应该使用上下文管理器,
async def foo():
""""
I need to call foo quite often from different
parts of my application
"""
async with httpx.AsyncClient() as aclient:
# make some http requests, e.g.,
await aclient.get("http://example.it")
但是,我理解,每次调用foo()
时,都会生成一个新的客户机,这正是我们最初希望通过使用客户机来避免的。
我想另一种选择是在某个地方定义一些全局客户端,并在需要的时候导入它。
aclient = httpx.AsyncClient()
async def bar():
# make some http requests using the global aclient, e.g.,
await aclient.get("http://example.it")
然而,第二种选择看上去有点可疑,因为没有人在处理会议闭幕等问题。
因此,问题是:如何在httpx.AsyncClient()
应用程序中正确地(重新)使用FastAPI?
发布于 2022-11-11 02:31:04
您可以拥有在FastApi关机事件中关闭的全局客户端。
import logging
from fastapi import FastAPI
import httpx
logging.basicConfig(level=logging.INFO, format="%(levelname)-9s %(asctime)s - %(name)s - %(message)s")
LOGGER = logging.getLogger(__name__)
class HTTPXClientWrapper:
async_client = None
def start(self):
""" Instantiate the client. Call from the FastAPI startup hook."""
self.async_client = httpx.AsyncClient()
LOGGER.info(f'httpx AsyncClient instantiated. Id {id(self.async_client)}')
async def stop(self):
""" Gracefully shutdown. Call from FastAPI shutdown hook."""
LOGGER.info(f'httpx async_client.is_closed(): {self.async_client.is_closed} - Now close it. Id (will be unchanged): {id(self.async_client)}')
await self.async_client.aclose()
LOGGER.info(f'httpx async_client.is_closed(): {self.async_client.is_closed}. Id (will be unchanged): {id(self.async_client)}')
self.async_client = None
LOGGER.info('httpx AsyncClient closed')
def __call__(self):
""" Calling the instantiated HTTPXClientWrapper returns the wrapped singleton."""
# Ensure we don't use it if not started / running
assert self.async_client is not None
LOGGER.info(f'httpx async_client.is_closed(): {self.async_client.is_closed}. Id (will be unchanged): {id(self.async_client)}')
return self.async_client
httpx_client_wrapper = HTTPXClientWrapper()
app = FastAPI()
@app.get('/test-call-external')
async def call_external_api(url: str = 'https://stackoverflow.com'):
async_client = httpx_client_wrapper()
res = await async_client.get(url)
result = res.text
return {
'result': result,
'status': res.status_code
}
@app.on_event("startup")
async def startup_event():
httpx_client_wrapper.start()
@app.on_event("shutdown")
async def shutdown_event():
await httpx_client_wrapper.stop()
if __name__ == '__main__':
import uvicorn
LOGGER.info(f'starting...')
uvicorn.run(f"{__name__}:app", host="127.0.0.1", port=8000)
注意-这个答案是受我很久以前在aiohttp
的其他地方看到的类似答案的启发,我找不到参考,但多亏了那个人!
编辑
我在示例中添加了uvicorn引导,这样它现在就完全可用了。我还添加了日志记录,以显示启动和关闭时发生了什么,您可以访问localhost:8000/docs
来触发端点并查看发生了什么(通过日志)。
从启动钩子调用start()
方法的原因是,在调用钩子时,事件循环已经启动,因此我们知道我们将在异步上下文中实例化httpx客户端。
另外,我错过了stop()
方法上的stop()
,并且有一个self.async_client = None
而不是async_client = None
,所以我在示例中修正了这些错误。
发布于 2022-11-11 10:37:12
这个问题的答案取决于您如何构造FastAPI应用程序以及如何管理您的依赖项。使用httpx.AsyncClient()的一种可能方法是创建一个自定义依赖函数,该函数返回客户端的一个实例,并在请求完成时关闭它。例如:
from fastapi import FastAPI, Depends
import httpx
app = FastAPI()
async def get_client():
# create a new client for each request
async with httpx.AsyncClient() as client:
# yield the client to the endpoint function
yield client
# close the client when the request is done
@app.get("/foo")
async def foo(client: httpx.AsyncClient = Depends(get_client)):
# use the client to make some http requests, e.g.,
response = await client.get("http://example.it")
return response.json()
这样,您就不需要创建一个全局客户端,也不需要担心手动关闭它。FastAPI将为您处理依赖注入和上下文管理。您还可以对需要使用客户端的其他端点使用相同的依赖函数。
或者,您可以创建一个全局客户端,并在应用程序关闭时关闭它。例如:
from fastapi import FastAPI, Depends
import httpx
import atexit
app = FastAPI()
# create a global client
client = httpx.AsyncClient()
# register a function to close the client when the app exits
atexit.register(client.aclose)
@app.get("/bar")
async def bar():
# use the global client to make some http requests, e.g.,
response = await client.get("http://example.it")
return response.json()
这样,您不需要为每个请求创建一个新的客户机,但是您需要确保在应用程序停止时客户端被正确关闭。您可以使用atexit模块注册一个在应用程序退出时将被调用的函数,也可以使用其他方法,例如信号处理程序或事件挂钩。
这两种方法各有优缺点,您应该选择适合您的需求和偏好的方法。您还可以查看有关依赖关系和测试的依赖关系文档,以获得更多的示例和最佳实践。
发布于 2022-04-07 09:39:54
好吧,你的选择在我看来不错,每次你需要它的时候,你都会产生一个客户端并重用它,但是你是对的,没有人会关闭会话。
来自httpx:(在您的示例中,从fastApi导入BackgroundTask和StreamingResponse )
import httpx
from starlette.background import BackgroundTask
from starlette.responses import StreamingResponse
client = httpx.AsyncClient()
async def home(request):
req = client.build_request("GET", "https://www.example.com/")
r = await client.send(req, stream=True)
return StreamingResponse(r.aiter_text(), background=BackgroundTask(r.aclose))
https://stackoverflow.com/questions/71031816
复制相似问题