我想知道如何使用DataFrame来流FastAPI,而不必将DataFrame保存到磁盘上的csv文件中。目前,我所做的是从csv文件中流数据,但是与返回FileResponse
相比速度并不快。下面的/option7
是我想要做的。
我的目标是从FastAPI后端流数据,而不将DataFrame保存到csv文件中。
谢谢。
from fastapi import FastAPI, Response,Query
from fastapi.responses import FileResponse,HTMLResponse,StreamingResponse
app = FastAPI()
df = pd.read_csv("data.csv")
@app.get("/option4")
def load_questions():
return FileResponse(path="C:Downloads/data.csv", filename="data.csv")
@app.get("/option5")
def load_questions():
def iterfile(): #
with open('data.csv', mode="rb") as file_like: #
yield from file_like #
return StreamingResponse(iterfile(), media_type="text/csv")
@app.get("/option7")
def load_questions():
def iterfile(): #
#with open(df, mode="rb") as file_like: #
yield from df #
return StreamingResponse(iterfile(), media_type="application/json")
发布于 2022-09-12 19:07:01
推荐方法
正如在这个答案以及这里和这里中提到的,当整个数据(在您的例子中是DataFrame
)已经加载到内存中时,就不需要使用StreamingResponse
了。当您想要传输实时数据时,当您不知道预先知道输出的大小时,当您不希望在开始将其发送到客户端之前收集到所有这些数据时,以及当您想返回的文件太大而无法装入内存时(例如,如果您有8GB的RAM,您就不能加载一个50 8GB的文件),StreamingResponse
是有意义的,因此,您宁愿将文件以块的形式加载到内存中。
在您的示例中,由于DataFrame
已经加载到内存中,所以在使用.to_json()
方法将DataFrame
转换为JSON字符串之后,您应该直接返回一个自定义的JSON,如这个答案中所述(参见此相关员额 )。示例:
from fastapi import Response
@app.get("/")
def main():
return Response(df.to_json(orient="records"), media_type="application/json")
如果发现浏览器需要一段时间来显示数据,则可能希望将数据下载为,将作为.json
文件下载到用户的设备(完成速度要快得多),而不是等待浏览器显示大量数据。可以通过使用Content-Disposition
参数在Response
中设置attachment
标头来做到这一点(有关更多细节,请参见这个答案 ):
@app.get("/")
def main():
headers = {'Content-Disposition': 'attachment; filename="data.json"'}
return Response(df.to_json(orient="records"), headers=headers, media_type='application/json')
还可以使用.csv
方法将数据作为.to_csv()
文件返回,而无需指定路径参数。由于使用return df.to_csv()
将导致在浏览器中显示包含\r\n
字符的数据,因此您可能会发现最好将csv数据放在Response
中,并指定Content-Disposition
头,以便将数据作为.csv
文件下载。示例:
@app.get("/")
def main():
headers = {'Content-Disposition': 'attachment; filename="data.csv"'}
return Response(df.to_csv(), headers=headers, media_type="text/csv")
不建议采用的方法
要使用StreamingResponse
,您需要迭代DataFrame中的行,将每一行转换为字典,然后使用标准的json
库或其他更快的JSON编码器将其转换为JSON字符串,如这个答案所述( JSON字符串稍后将被FastAPI/Starlette内部编码为byte
格式,如源代码这里所示)。示例:
@app.get("/")
def main():
def iter_df():
for _, row in df.iterrows():
yield json.dumps(row.to_dict()) + '\n'
return StreamingResponse(iter_df(), media_type="application/json")
迭代Pandas对象通常比较慢,不建议使用。如这个答案所述
Pandas中的迭代是一种反模式,是一种,只有当您用尽所有其他选项时才应该这样做,。您不应该在超过几千行的情况下使用任何以
"iter"
名称命名的函数,否则您将不得不习惯于等待的lot。
更新
正如@Panagiotis Kanavos在下面的注释部分中所指出的那样,在已经加载到内存中的.to_json()
或.to_csv()
上使用DataFrame,将导致在内存中分配整个输出字符串,从而使内存使用量翻一番,甚至更糟。因此,如果您拥有如此大量的数据,可能会导致系统减速或崩溃(因为内存不足),如果使用上述任何一种方法,您应该使用前面描述的StreamingResponse
。您可能会在这个职位中找到更快的警告本地方法到这个职位,以及更快的JSON编码器,如orjson
和ujson
,如这个答案中所描述的。
或者,您可以将数据保存到磁盘,然后删除DataFrame以释放内存--您甚至可以使用gc.collect()
手动触发垃圾收集,如这个答案中所示;但是,不鼓励频繁调用垃圾收集,因为这是一项代价高昂的操作,可能会影响性能--并返回FileResponse
(假设数据可以装入RAM;否则,您也应该使用StreamingResponse
,也可以查看这个答案 ),最后,在返回响应后,有一个BackgroundTask
从磁盘删除文件。示例如下所示。
无论如何,您可能选择的解决方案应该基于您的应用程序的需求,例如,您希望同时服务的用户数、数据大小、响应时间等,以及您的系统规范(例如,用于分配的可分配内存)。此外,由于对DataFrame
方法的所有调用都是同步的,您应该记住用一个普通的def
来定义端点,以便它在外部线程池中运行;否则,它会阻塞服务器。或者,您可以使用concurrency
模块中的Starlette的run_in_threadpool()
,该模块将在一个单独的线程中运行to_csv()
或to_json()
函数,以确保主线程(运行coroutines的地方)不会被阻塞。请查看这个答案以获得有关def
vs async def
的更多详细信息。
from fastapi import BackgroundTasks
from fastapi.responses import FileResponse
import uuid
import os
@app.get("/")
def main(background_tasks: BackgroundTasks):
filename = str(uuid.uuid4()) + ".csv"
df.to_csv(filename)
del df # release the memory
background_tasks.add_task(os.remove, filename)
return FileResponse(filename, filename="data.csv", media_type="text/csv")
https://stackoverflow.com/questions/73688641
复制相似问题