我通过邮寄接受这份文件。当我在本地保存它时,我可以使用file.read ()读取内容,但是通过file.name显示的名称不正确(16)。当我试图用这个名字找到它时,我会发现一个错误。有什么问题吗?
我的代码:
@router.post(
path="/upload",
response_model=schema.ContentUploadedResponse,
)
async def upload_file(
background_tasks: BackgroundTasks,
uploaded_file: UploadFile = File(...)):
uploaded_file.file.rollover()
uploaded_file.file.flush()
#shutil.copy(uploaded_file.file.name, f'../api/{uploaded_file.filename}')
background_tasks.add_task(s3_upload, uploaded_file=fp)
return schema.ContentUploadedResponse()
发布于 2020-08-25 14:33:25
背景
UploadFile
只是SpooledTemporaryFile
的包装器,可以作为UploadFile.file
访问。
SpooledTemporaryFile()。函数与TemporaryFile()一样精确地操作
文档关于TemporaryFile
说:
返回一个类似文件的对象,该对象可用作临时存储区域。。。一旦关闭,它就会被销毁(包括垃圾收集对象时的隐式关闭)。在Unix下,文件的目录条目要么根本没有创建,要么在文件创建后立即删除。其他平台不支持这一点;您的代码不应该依赖使用此函数创建的临时文件,文件系统中是否有可见的名称。
async def
端点
您应该使用以下异步方法 of UploadFile
:write
、read
、seek
和close
。它们在线程池中执行,并异步等待。
对于异步将文件写入磁盘,可以使用aiofiles
。示例:
@app.post("/")
async def post_endpoint(in_file: UploadFile=File(...)):
# ...
async with aiofiles.open(out_file_path, 'wb') as out_file:
content = await in_file.read() # async read
await out_file.write(content) # async write
return {"Result": "OK"}
或以分块的方式,以免将整个文件加载到内存中:
@app.post("/")
async def post_endpoint(in_file: UploadFile=File(...)):
# ...
async with aiofiles.open(out_file_path, 'wb') as out_file:
while content := await in_file.read(1024): # async read chunk
await out_file.write(content) # async write chunk
return {"Result": "OK"}
def
端点
另外,我想引用这个主题中的几个有用的实用函数(all credits @dmontagu),它使用shutil.copyfileobj
和内部UploadFile.file
。可以从def
端点调用此函数:
import shutil
from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Callable
from fastapi import UploadFile
def save_upload_file(upload_file: UploadFile, destination: Path) -> None:
try:
with destination.open("wb") as buffer:
shutil.copyfileobj(upload_file.file, buffer)
finally:
upload_file.file.close()
def save_upload_file_tmp(upload_file: UploadFile) -> Path:
try:
suffix = Path(upload_file.filename).suffix
with NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
shutil.copyfileobj(upload_file.file, tmp)
tmp_path = Path(tmp.name)
finally:
upload_file.file.close()
return tmp_path
def handle_upload_file(
upload_file: UploadFile, handler: Callable[[Path], None]
) -> None:
tmp_path = save_upload_file_tmp(upload_file)
try:
handler(tmp_path) # Do something with the saved temp file
finally:
tmp_path.unlink() # Delete the temp file
注意:您可能希望在
def
端点内部使用上面的函数,而不是async def
,因为它们使用阻塞API。
发布于 2021-01-27 10:58:36
在我的例子中,我需要处理巨大的文件,所以我必须避免将它们全部读入内存。我想要的是以块的方式异步地将它们保存到磁盘中。
我正在对此进行实验,它似乎完成了这项工作(CHUNK_SIZE是任意选择的,需要进一步的测试才能找到最佳大小):
import os
import logging
from fastapi import FastAPI, BackgroundTasks, File, UploadFile
log = logging.getLogger(__name__)
app = FastAPI()
DESTINATION = "/"
CHUNK_SIZE = 2 ** 20 # 1MB
async def chunked_copy(src, dst):
await src.seek(0)
with open(dst, "wb") as buffer:
while True:
contents = await src.read(CHUNK_SIZE)
if not contents:
log.info(f"Src completely consumed\n")
break
log.info(f"Consumed {len(contents)} bytes from Src file\n")
buffer.write(contents)
@app.post("/uploadfile/")
async def create_upload_file(file: UploadFile = File(...)):
fullpath = os.path.join(DESTINATION, file.filename)
await chunked_copy(file, fullpath)
return {"File saved to disk at": fullpath}
但是,我很快就意识到,在文件被完全接收之前,不会调用create_upload_file
。因此,如果这个代码片段是正确的,那么它可能对性能有好处,但不会启用任何类似于向客户机提供上传进度的反馈,并且它将在服务器中执行一个完整的数据副本。如果不能访问原始的UploadFile临时文件、刷新它并将其移动到其他地方,从而避免复制,这似乎很愚蠢。
发布于 2021-12-24 14:42:41
您可以通过复制和粘贴以下代码来保存文件。
fastapi import (
FastAPI
UploadFile,
File,
status
)
from fastapi.responses import JSONResponse
import aiofiles
app = FastAPI( debug = True )
@app.post("/upload_file/", response_description="", response_model = "")
async def result(file:UploadFile = File(...)):
try:
async with aiofiles.open(file.filename, 'wb') as out_file:
content = await file.read() # async read
await out_file.write(content) # async write
except Exception as e:
return JSONResponse(
status_code = status.HTTP_400_BAD_REQUEST,
content = { 'message' : str(e) }
)
else:
return JSONResponse(
status_code = status.HTTP_200_OK,
content = {"result":'success'}
)
如果您想上传多个文件,那么复制粘贴下面的代码
fastapi import (
FastAPI
UploadFile,
File,
status
)
from fastapi.responses import JSONResponse
import aiofiles
app = FastAPI( debug = True )
@router.post("/upload_multiple_file/", response_description="", response_model = "")
async def result(files:List[UploadFile] = File(...), secret_key: str = Depends(secretkey_middleware)):
try:
for file in files:
async with aiofiles.open(eventid+file.filename, 'wb') as out_file:
content = await file.read()
await out_file.write(content)
pass
except Exception as e:
return JSONResponse(
status_code = status.HTTP_400_BAD_REQUEST,
content = { 'message' : str(e) }
)
else:
return JSONResponse(
status_code = status.HTTP_200_OK,
content = {"result":'result'}
)
https://stackoverflow.com/questions/63580229
复制相似问题