前几天又遇到客户反馈说用户上传个视频要等半天,体验太差了。我一看代码,好家伙,文件还在走服务器中转,这不是自己给自己找罪受吗?
今天就来聊聊怎么把文件上传这件事做得又快又稳,不管是5MB的图片还是5GB的视频,都能让用户爽到飞起。
说实话,文件上传这块水还挺深的,踩过的坑能绕地球一圈。不过别担心,我把这些年踩过的坑都给你们总结出来了。
记住一句话:永远不要让文件经过你的服务器。
我之前就犯过这个错误,用户上传文件先到我们服务器,然后再转发到S3。结果呢?服务器内存爆了,用户等得想骂娘,老板天天找我谈话。
客户端 → 你的服务器 → S3
这样搞的话,你的服务器就成了瓶颈。内存占用飙升,超时一堆,扩展性基本为零。
客户端 → S3(直接上传)
你的服务器只负责生成个预签名URL就行了,简单粗暴,效果拔群。
先来看看最基本的实现,这个已经能解决80%的场景了。
# s3_service.py
import boto3
import uuid
from datetime import datetime, timedelta
from botocore.exceptions import ClientError
from typing import Dict, Tuple
import logging
class S3Service:
def __init__(self, aws_access_key: str, aws_secret_key: str,
region: str, bucket_name: str):
# 这里配置一些性能优化参数
self.s3_client = boto3.client(
's3',
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
region_name=region,
config=boto3.session.Config(
retries={'max_attempts': 3},
connect_timeout=5,
read_timeout=5,
max_pool_connections=50 # 连接池优化
)
)
self.bucket_name = bucket_name
self.logger = logging.getLogger(__name__)
def generate_presigned_url(self, filename: str, content_type: str,
file_size: int) -> Dict[str, str]:
"""生成预签名URL,让客户端直接上传到S3"""
try:
# 生成唯一的文件key
key = f"uploads/{uuid.uuid4()}/{filename}"
# 生成预签名URL,15分钟过期
upload_url = self.s3_client.generate_presigned_url(
'put_object',
Params={
'Bucket': self.bucket_name,
'Key': key,
'ContentType': content_type,
'ContentLength': file_size
},
ExpiresIn=900 # 15分钟,够用了
)
return {
'upload_url': upload_url,
'key': key
}
except ClientError as e:
self.logger.error(f"生成预签名URL失败: {e}")
raise Exception("生成上传链接失败")# upload_controller.py
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from typing import Dict
import logging
router = APIRouter(prefix="/api/upload")
logger = logging.getLogger(__name__)
class UploadRequest(BaseModel):
filename: str
content_type: str
file_size: int
@router.post("/initiate")
async def initiate_upload(
request: UploadRequest,
s3_service: S3Service = Depends(get_s3_service)
) -> Dict[str, str]:
"""初始化文件上传,返回预签名URL"""
# 简单的文件大小检查
if request.file_size > 5 * 1024 * 1024 * 1024: # 5GB限制
raise HTTPException(status_code=400, detail="文件太大了兄弟")
try:
result = s3_service.generate_presigned_url(
request.filename,
request.content_type,
request.file_size
)
logger.info(f"为文件 {request.filename} 生成上传URL成功")
return result
except Exception as e:
logger.error(f"上传初始化失败: {e}")
raise HTTPException(status_code=500, detail="服务器开小差了")// 前端上传逻辑,带进度条的那种
async function uploadFileToS3(file, token) {
try {
// 第一步:获取预签名URL
const response = await fetch('/api/upload/initiate', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
filename: file.name,
content_type: file.type,
file_size: file.size
})
});
const { upload_url, key } = await response.json();
// 第二步:直接上传到S3,带进度条
await fetch(upload_url, {
method: 'PUT',
body: file,
headers: {
'Content-Type': file.type
}
});
return key;
} catch (error) {
console.error('上传失败:', error);
throw error;
}
}
// 如果你想要进度条,用axios会更方便
async function uploadWithProgress(file, token, onProgress) {
const { data } = await axios.post('/api/upload/initiate', {
filename: file.name,
content_type: file.type,
file_size: file.size
}, {
headers: { Authorization: `Bearer ${token}` }
});
await axios.put(data.upload_url, file, {
headers: { 'Content-Type': file.type },
onUploadProgress: (progressEvent) => {
const percentage = Math.round(
(progressEvent.loaded / progressEvent.total) * 100
);
onProgress(percentage);
}
});
return data.key;
}当文件超过100MB的时候,单次上传就有点吃力了。这时候就要用到S3的分片上传功能,这玩意儿真的是个好东西:
# multipart_service.py
import boto3
import math
from typing import List, Dict, Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
class MultipartUploadService:
def __init__(self, aws_access_key: str, aws_secret_key: str,
region: str, bucket_name: str):
self.s3_client = boto3.client(
's3',
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
region_name=region
)
self.bucket_name = bucket_name
self.logger = logging.getLogger(__name__)
# 根据经验调整的分片大小
self.CHUNK_SIZE = 10 * 1024 * 1024 # 10MB,这个大小比较合适
def initiate_multipart_upload(self, filename: str,
content_type: str) -> Dict[str, str]:
"""开始分片上传"""
key = f"uploads/{uuid.uuid4()}/{filename}"
try:
response = self.s3_client.create_multipart_upload(
Bucket=self.bucket_name,
Key=key,
ContentType=content_type
)
return {
'upload_id': response['UploadId'],
'key': key,
'chunk_size': self.CHUNK_SIZE
}
except Exception as e:
self.logger.error(f"初始化分片上传失败: {e}")
raise
def generate_presigned_part_url(self, key: str, upload_id: str,
part_number: int) -> str:
"""为每个分片生成预签名URL"""
try:
url = self.s3_client.generate_presigned_url(
'upload_part',
Params={
'Bucket': self.bucket_name,
'Key': key,
'UploadId': upload_id,
'PartNumber': part_number
},
ExpiresIn=3600 # 1小时,大文件需要更长时间
)
return url
except Exception as e:
self.logger.error(f"生成分片URL失败: {e}")
raise
def complete_multipart_upload(self, key: str, upload_id: str,
parts: List[Dict]) -> Dict:
"""完成分片上传"""
try:
# 按分片号排序,这个很重要
sorted_parts = sorted(parts, key=lambda x: x['PartNumber'])
response = self.s3_client.complete_multipart_upload(
Bucket=self.bucket_name,
Key=key,
UploadId=upload_id,
MultipartUpload={'Parts': sorted_parts}
)
return response
except Exception as e:
self.logger.error(f"完成分片上传失败: {e}")
# 如果失败了,清理掉未完成的上传
self.abort_multipart_upload(key, upload_id)
raise
def abort_multipart_upload(self, key: str, upload_id: str):
"""取消分片上传,释放存储空间"""
try:
self.s3_client.abort_multipart_upload(
Bucket=self.bucket_name,
Key=key,
UploadId=upload_id
)
self.logger.info(f"已取消上传: {key}")
except Exception as e:
self.logger.error(f"取消上传失败: {e}")
def calculate_optimal_chunk_size(self, file_size: int) -> int:
"""根据文件大小计算最优分片大小"""
if file_size < 100 * 1024 * 1024: # 小于100MB
return file_size # 直接单次上传
elif file_size < 1024 * 1024 * 1024: # 小于1GB
return 10 * 1024 * 1024 # 10MB分片
elif file_size < 5 * 1024 * 1024 * 1024: # 小于5GB
return 25 * 1024 * 1024 # 25MB分片
else:
return 100 * 1024 * 1024 # 100MB分片# multipart_controller.py
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from typing import List, Dict
router = APIRouter(prefix="/api/upload/multipart")
class InitiateMultipartRequest(BaseModel):
filename: str
content_type: str
file_size: int
class PartUrlRequest(BaseModel):
key: str
upload_id: str
part_number: int
class CompleteMultipartRequest(BaseModel):
key: str
upload_id: str
parts: List[Dict[str, any]]
@router.post("/initiate")
async def initiate_multipart(
request: InitiateMultipartRequest,
service: MultipartUploadService = Depends(get_multipart_service)
):
"""开始分片上传"""
try:
result = service.initiate_multipart_upload(
request.filename,
request.content_type
)
# 计算需要多少个分片
chunk_size = service.calculate_optimal_chunk_size(request.file_size)
total_parts = math.ceil(request.file_size / chunk_size)
result.update({
'total_parts': total_parts,
'chunk_size': chunk_size
})
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"初始化失败: {str(e)}")
@router.post("/part-url")
async def get_part_url(
request: PartUrlRequest,
service: MultipartUploadService = Depends(get_multipart_service)
):
"""获取分片上传URL"""
try:
url = service.generate_presigned_part_url(
request.key,
request.upload_id,
request.part_number
)
return {'url': url}
except Exception as e:
raise HTTPException(status_code=500, detail=f"获取URL失败: {str(e)}")
@router.post("/complete")
async def complete_multipart(
request: CompleteMultipartRequest,
service: MultipartUploadService = Depends(get_multipart_service)
):
"""完成分片上传"""
try:
result = service.complete_multipart_upload(
request.key,
request.upload_id,
request.parts
)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=f"完成上传失败: {str(e)}")
@router.post("/abort")
async def abort_multipart(
request: dict,
service: MultipartUploadService = Depends(get_multipart_service)
):
"""取消分片上传"""
try:
service.abort_multipart_upload(
request['key'],
request['upload_id']
)
return {'success': True}
except Exception as e:
raise HTTPException(status_code=500, detail=f"取消失败: {str(e)}")这个功能真的是神器,特别是对于海外用户,速度提升能达到50%-500%。
# 在S3客户端配置中启用传输加速
import boto3
s3_client = boto3.client(
's3',
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
region_name=region,
config=boto3.session.Config(
s3={
'use_accelerate_endpoint': True # 开启传输加速
}
)
)记得在S3控制台里开启传输加速功能,路径是:存储桶设置 → 属性 → 传输加速。
不同文件大小用不同的分片策略,这个很重要:
文件大小 | 推荐分片大小 | 原因 |
|---|---|---|
< 100MB | 单次上传 | 分片的开销不值得 |
100MB - 1GB | 10MB | 平衡速度和可靠性 |
1GB - 5GB | 25MB | 减少API调用次数 |
> 5GB | 100MB | 最大化效率 |
def calculate_optimal_chunk_size(self, file_size: int) -> int:
"""
根据文件大小计算最优分片大小
这个是我踩了很多坑总结出来的经验值
"""
if file_size < 100 * 1024 * 1024:
return file_size # 直接单次上传
elif file_size < 1024 * 1024 * 1024:
return 10 * 1024 * 1024 # 10MB
elif file_size < 5 * 1024 * 1024 * 1024:
return 25 * 1024 * 1024 # 25MB
else:
return 100 * 1024 * 1024 # 100MB
def get_optimal_concurrency(self, network_speed_mbps: float) -> int:
"""根据网络速度调整并发数"""
if network_speed_mbps < 5:
return 2 # 网速慢的时候别并发太多
elif network_speed_mbps < 50:
return 3
elif network_speed_mbps < 100:
return 5
else:
return 8 # 网速快的时候可以多开几个import asyncio
import aiohttp
from typing import List
import time
class ConcurrentUploader:
def __init__(self, max_concurrency: int = 5):
self.max_concurrency = max_concurrency
self.semaphore = asyncio.Semaphoreimport asyncio
import aiohttp
from typing import List
import time
class ConcurrentUploader:
def __init__(self, max_concurrency: int = 5):
self.max_concurrency = max_concurrency
self.semaphore = asyncio.Semaphore(max_concurrency)
async def upload_part_with_retry(self, chunk_data: bytes,
upload_url: str, part_number: int,
max_retries: int = 3) -> dict:
"""带重试的分片上传"""
async with self.semaphore: # 控制并发数
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.put(
upload_url,
data=chunk_data,
timeout=aiohttp.ClientTimeout(total=300) # 5分钟超时
) as response:
if response.status == 200:
etag = response.headers.get('ETag', '').strip('"')
return {
'PartNumber': part_number,
'ETag': etag
}
else:
raise Exception(f"上传失败,状态码: {response.status}")
except Exception as e:
if attempt < max_retries - 1:
# 指数退避,1秒、2秒、4秒
wait_time = 2 ** attempt
await asyncio.sleep(wait_time)
continue
else:
raise e
async def upload_file_concurrent(self, file_path: str,
upload_urls: List[str]) -> List[dict]:
"""并发上传所有分片"""
tasks = []
with open(file_path, 'rb') as f:
for i, upload_url in enumerate(upload_urls):
chunk_data = f.read(self.chunk_size)
if not chunk_data:
break
task = self.upload_part_with_retry(
chunk_data, upload_url, i + 1
)
tasks.append(task)
# 并发执行所有上传任务
results = await asyncio.gather(*tasks, return_exceptions=True)
# 检查是否有失败的
successful_parts = []
for result in results:
if isinstance(result, Exception):
raise result
successful_parts.append(result)
return successful_partsimport time
import requests
from typing import Tuple
class NetworkOptimizer:
def __init__(self):
self.last_speed_test = 0
self.cached_speed = None
async def detect_network_speed(self) -> float:
"""检测网络速度,用于调整上传策略"""
# 缓存5分钟,避免频繁测试
if (time.time() - self.last_speed_test) < 300 and self.cached_speed:
return self.cached_speed
try:
# 下载一个小文件测试速度
test_url = "https://httpbin.org/bytes/1048576" # 1MB测试文件
start_time = time.time()
response = requests.get(test_url, timeout=10)
end_time = time.time()
if response.status_code == 200:
duration = end_time - start_time
file_size_mb = len(response.content) / (1024 * 1024)
speed_mbps = (file_size_mb * 8) / duration # 转换为Mbps
self.cached_speed = speed_mbps
self.last_speed_test = time.time()
return speed_mbps
else:
return 10.0 # 默认值
except Exception:
return 10.0 # 网络测试失败,使用默认值
def get_optimal_settings(self, speed_mbps: float,
file_size: int) -> Tuple[int, int]:
"""根据网速和文件大小返回最优的分片大小和并发数"""
# 根据网速调整并发数
if speed_mbps < 5:
concurrency = 2
chunk_size = 5 * 1024 * 1024 # 5MB
elif speed_mbps < 25:
concurrency = 3
chunk_size = 10 * 1024 * 1024 # 10MB
elif speed_mbps < 100:
concurrency = 5
chunk_size = 25 * 1024 * 1024 # 25MB
else:
concurrency = 8
chunk_size = 50 * 1024 * 1024 # 50MB
# 根据文件大小微调
if file_size < 100 * 1024 * 1024: # 小于100MB
chunk_size = min(chunk_size, file_size)
concurrency = min(concurrency, 2)
return chunk_size, concurrency性能监控这块不能少,得知道系统跑得怎么样。
import time
import logging
from dataclasses import dataclass
from typing import Optional
import json
@dataclass
class UploadMetrics:
file_key: str
file_size: int
upload_duration: float
upload_speed_mbps: float
method: str # 'single' 或 'multipart'
parts_count: Optional[int] = None
retry_count: int = 0
timestamp: float = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
class UploadMonitor:
def __init__(self):
self.logger = logging.getLogger(__name__)
def track_upload_performance(self, metrics: UploadMetrics):
"""记录上传性能指标"""
# 记录到日志
self.logger.info(
f"上传完成 - 文件: {metrics.file_key}, "
f"大小: {metrics.file_size / (1024*1024):.2f}MB, "
f"耗时: {metrics.upload_duration:.2f}s, "
f"速度: {metrics.upload_speed_mbps:.2f}Mbps, "
f"方式: {metrics.method}"
)
# 如果速度太慢,发出警告
if metrics.upload_speed_mbps < 1.0:
self.logger.warning(
f"上传速度过慢: {metrics.file_key}, "
f"速度仅 {metrics.upload_speed_mbps:.2f}Mbps"
)
# 这里可以发送到监控系统,比如DataDog、CloudWatch等
self._send_to_monitoring_system(metrics)
def _send_to_monitoring_system(self, metrics: UploadMetrics):
"""发送指标到监控系统"""
try:
# 这里可以集成你们的监控系统
# 比如发送到Prometheus、DataDog等
metric_data = {
'metric': 'file_upload_performance',
'value': metrics.upload_speed_mbps,
'tags': {
'method': metrics.method,
'file_size_category': self._get_size_category(metrics.file_size)
},
'timestamp': metrics.timestamp
}
# 实际发送逻辑
# monitoring_client.send(metric_data)
except Exception as e:
self.logger.error(f"发送监控数据失败: {e}")
def _get_size_category(self, file_size: int) -> str:
"""文件大小分类,便于分析"""
if file_size < 10 * 1024 * 1024:
return 'small' # < 10MB
elif file_size < 100 * 1024 * 1024:
return 'medium' # 10MB - 100MB
elif file_size < 1024 * 1024 * 1024:
return 'large' # 100MB - 1GB
else:
return 'xlarge' # > 1GB网络这东西,说断就断,所以容错机制必须得有。
import pickle
import os
from typing import Dict, List, Optional
class ResumableUploader:
def __init__(self, upload_service: MultipartUploadService):
self.upload_service = upload_service
self.state_dir = "/tmp/upload_states" # 状态保存目录
os.makedirs(self.state_dir, exist_ok=True)
def save_upload_state(self, file_path: str, state: Dict):
"""保存上传状态,用于断点续传"""
state_file = os.path.join(
self.state_dir,
f"{os.path.basename(file_path)}.state"
)
try:
with open(state_file, 'wb') as f:
pickle.dump(state, f)
except Exception as e:
logging.error(f"保存上传状态失败: {e}")
def load_upload_state(self, file_path: str) -> Optional[Dict]:
"""加载上传状态"""
state_file = os.path.join(
self.state_dir,
f"{os.path.basename(file_path)}.state"
)
if not os.path.exists(state_file):
return None
try:
with open(state_file, 'rb') as f:
return pickle.load(f)
except Exception as e:
logging.error(f"加载上传状态失败: {e}")
return None
def cleanup_upload_state(self, file_path: str):
"""清理上传状态文件"""
state_file = os.path.join(
self.state_dir,
f"{os.path.basename(file_path)}.state"
)
try:
if os.path.exists(state_file):
os.remove(state_file)
except Exception as e:
logging.error(f"清理状态文件失败: {e}")
async def upload_with_resume(self, file_path: str,
content_type: str) -> str:
"""支持断点续传的上传"""
# 尝试加载之前的上传状态
saved_state = self.load_upload_state(file_path)
if saved_state:
logging.info(f"发现未完成的上传,继续上传: {file_path}")
return await self._resume_upload(file_path, saved_state)
else:
logging.info(f"开始新的上传: {file_path}")
return await self._start_new_upload(file_path, content_type)
async def _start_new_upload(self, file_path: str,
content_type: str) -> str:
"""开始新的上传"""
file_size = os.path.getsize(file_path)
filename = os.path.basename(file_path)
# 初始化分片上传
init_result = self.upload_service.initiate_multipart_upload(
filename, content_type
)
upload_state = {
'upload_id': init_result['upload_id'],
'key': init_result['key'],
'file_size': file_size,
'chunk_size': init_result['chunk_size'],
'completed_parts': [],
'total_parts': math.ceil(file_size / init_result['chunk_size'])
}
# 保存初始状态
self.save_upload_state(file_path, upload_state)
return await self._continue_upload(file_path, upload_state)
async def _resume_upload(self, file_path: str,
saved_state: Dict) -> str:
"""恢复之前的上传"""
logging.info(
f"恢复上传,已完成 {len(saved_state['completed_parts'])}"
f"/{saved_state['total_parts']} 个分片"
)
return await self._continue_upload(file_path, saved_state)
async def _continue_upload(self, file_path: str,
upload_state: Dict) -> str:
"""继续上传剩余分片"""
completed_part_numbers = {
part['PartNumber'] for part in upload_state['completed_parts']
}
# 找出还需要上传的分片
remaining_parts = []
for i in range(1, upload_state['total_parts'] + 1):
if i not in completed_part_numbers:
remaining_parts.append(i)
logging.info(f"还需上传 {len(remaining_parts)} 个分片")
# 上传剩余分片
try:
with open(file_path, 'rb') as f:
for part_number in remaining_parts:
# 计算分片位置
start_pos = (part_number - 1) * upload_state['chunk_size']
f.seek(start_pos)
chunk_data = f.read(upload_state['chunk_size'])
if not chunk_data:
break
# 获取预签名URL并上传
upload_url = self.upload_service.generate_presigned_part_url(
upload_state['key'],
upload_state['upload_id'],
part_number
)
# 上传分片(带重试)
part_result = await self._upload_part_with_retry(
chunk_data, upload_url, part_number
)
# 更新状态
upload_state['completed_parts'].append(part_result)
self.save_upload_state(file_path, upload_state)
logging.info(
f"分片 {part_number}/{upload_state['total_parts']} 上传完成"
)
# 完成分片上传
result = self.upload_service.complete_multipart_upload(
upload_state['key'],
upload_state['upload_id'],
upload_state['completed_parts']
)
# 清理状态文件
self.cleanup_upload_state(file_path)
logging.info(f"文件上传完成: {upload_state['key']}")
return upload_state['key']
except Exception as e:
logging.error(f"上传过程中出错: {e}")
# 保持状态文件,下次可以继续
raise
async def _upload_part_with_retry(self, chunk_data: bytes,
upload_url: str, part_number: int,
max_retries: int = 3) -> Dict:
"""带重试的分片上传"""
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.put(upload_url, data=chunk_data) as response:
if response.status == 200:
etag = response.headers.get('ETag', '').strip('"')
return {
'PartNumber': part_number,
'ETag': etag
}
else:
raise Exception(f"HTTP {response.status}")
except Exception as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 指数退避
logging.warning(
f"分片 {part_number} 上传失败,{wait_time}秒后重试: {e}"
)
await asyncio.sleep(wait_time)
else:
logging.error(f"分片 {part_number} 上传彻底失败: {e}")
raise我测试了几种不同的上传方式,给大家一个参考:
上传1GB文件的实际测试结果:
上传方式 | 耗时 | 平均速度 | 备注 |
|---|---|---|---|
通过服务器中转 | 4-6分钟 | ~3MB/s | 服务器成瓶颈 |
直接预签名URL | 1.5-2分钟 | ~9MB/s | 简单有效 |
分片上传(3片) | 45-60秒 | ~18MB/s | 明显提升 |
分片+传输加速 | 30-40秒 | ~26MB/s | 最佳效果 |
网络环境:100Mbps带宽,延迟约50ms
部署到生产环境之前,这些点一定要检查:
还有一些容易忽略的点:
# S3生命周期规则,清理未完成的分片上传
lifecycle_config = {
'Rules': [
{
'ID': 'cleanup-incomplete-uploads',
'Status': 'Enabled',
'AbortIncompleteMultipartUpload': {
'DaysAfterInitiation': 7 # 7天后清理
}
}
]
}
# 设置CORS,允许前端直接上传
cors_config = {
'CORSRules': [
{
'AllowedHeaders': ['*'],
'AllowedMethods': ['PUT', 'POST'],
'AllowedOrigins': ['https://yourdomain.com'],
'ExposeHeaders': ['ETag'],
'MaxAgeSeconds': 3000
}
]
}前端这块也有不少坑,特别是大文件上传的时候。
// FileUploader.tsx
import React, { useState, useCallback } from 'react';
import axios from 'axios';
interface UploadProgress {
loaded: number;
total: number;
percentage: number;
}
interface UploadState {
isUploading: boolean;
progress: UploadProgress;
error: string | null;
uploadedKey: string | null;
}
const FileUploader: React.FC = () => {
const [uploadState, setUploadState] = useState<UploadState>({
isUploading: false,
progress: { loaded: 0, total: 0, percentage: 0 },
error: null,
uploadedKey: null
});
const uploadFile = useCallback(async (file: File) => {
setUploadState(prev => ({
...prev,
isUploading: true,
error: null,
progress: { loaded: 0, total: file.size, percentage: 0 }
}));
try {
// 判断是否需要分片上传
const shouldUseMultipart = file.size > 100 * 1024 * 1024; // 100MB
if (shouldUseMultipart) {
const key = await uploadLargeFile(file);
setUploadState(prev => ({ ...prev, uploadedKey: key }));
} else {
const key = await uploadSmallFile(file);
setUploadState(prev => ({ ...prev, uploadedKey: key }));
}
} catch (error) {
setUploadState(prev => ({
...prev,
error: error instanceof Error ? error.message : '上传失败'
}));
} finally {
setUploadState(prev => ({ ...prev, isUploading: false }));
}
}, []);
const uploadSmallFile = async (file: File): Promise<string> => {
// 获取预签名URL
const { data } = await axios.post('/api/upload/initiate', {
filename: file.name,
content_type: file.type,
file_size: file.size
});
// 直接上传到S3
await axios.put(data.upload_url, file, {
headers: { 'Content-Type': file.type },
onUploadProgress: (progressEvent) => {
const percentage = Math.round(
(progressEvent.loaded / progressEvent.total!) * 100
);
setUploadState(prev => ({
...prev,
progress: {
loaded: progressEvent.loaded,
total: progressEvent.total!,
percentage
}
}));
}
});
return data.key;
};
const uploadLargeFile = async (file: File): Promise<string> => {
// 初始化分片上传
const { data: initData } = await axios.post('/api/upload/multipart/initiate', {
filename: file.name,
content_type: file.type,
file_size: file.size
});
const { upload_id, key, chunk_size, total_parts } = initData;
const uploadedParts: Array<{ PartNumber: number; ETag: string }> = [];
// 分片上传
for (let partNumber = 1; partNumber <= total_parts; partNumber++) {
const start = (partNumber - 1) * chunk_size;
const end = Math.min(start + chunk_size, file.size);
const chunk = file.slice(start, end);
// 获取分片上传URL
const { data: urlData } = await axios.post('/api/upload/multipart/part-url', {
key,
upload_id,
part_number: partNumber
});
// 上传分片
const response = await axios.put(urlData.url, chunk, {
headers: { 'Content-Type': 'application/octet-stream' },
onUploadProgress: (progressEvent) => {
const totalLoaded = (partNumber - 1) * chunk_size + progressEvent.loaded;
const percentage = Math.round((totalLoaded / file.size) * 100);
setUploadState(prev => ({
...prev,
progress: {
loaded: totalLoaded,
total: file.size,
percentage
}
}));
}
});
uploadedParts.push({
PartNumber: partNumber,
ETag: response.headers.etag.replace(/"/g, '')
});
}
// 完成分片上传
await axios.post('/api/upload/multipart/complete', {
key,
upload_id,
parts: uploadedParts
});
return key;
};
return (
<div className="file-uploader">
<input
type="file"
onChange={(e) => {
const file = e.target.files?.[0];
if (file) uploadFile(file);
}}
disabled={uploadState.isUploading}
/>
{uploadState.isUploading && (
<div className="upload-progress">
<div className="progress-bar">
<div
className="progress-fill"
style={{ width: `${uploadState.progress.percentage}%` }}
/>
</div>
<span>{uploadState.progress.percentage}%</span>
</div>
)}
{uploadState.error && (
<div className="error-message">{uploadState.error}</div>
)}
{uploadState.uploadedKey && (
<div className="success-message">
上传成功!文件ID: {uploadState.uploadedKey}
</div>
)}
</div>
);
};
export default FileUploader;// DragDropUploader.tsx
import React, { useState, useRef, DragEvent } from 'react';
const DragDropUploader: React.FC = () => {
const [isDragging, setIsDragging] = useState(false);
const fileInputRef = useRef<HTMLInputElement>(null);
const handleDragOver = (e: DragEvent) => {
e.preventDefault();
setIsDragging(true);
};
const handleDragLeave = (e: DragEvent) => {
e.preventDefault();
setIsDragging(false);
};
const handleDrop = (e: DragEvent) => {
e.preventDefault();
setIsDragging(false);
const files = Array.from(e.dataTransfer.files);
files.forEach(file => {
// 这里调用上传逻辑
console.log('准备上传文件:', file.name);
});
};
return (
<div
className={`drag-drop-area ${isDragging ? 'dragging' : ''}`}
onDragOver={handleDragOver}
onDragLeave={handleDragLeave}
onDrop={handleDrop}
onClick={() => fileInputRef.current?.click()}
>
<input
ref={fileInputRef}
type="file"
multiple
style={{ display: 'none' }}
onChange={(e) => {
const files = Array.from(e.target.files || []);
// 处理文件上传
}}
/>
<div className="upload-hint">
{isDragging ? '松开鼠标上传文件' : '点击或拖拽文件到这里上传'}
</div>
</div>
);
};这个问题经常遇到,特别是前端直接上传到S3的时候。
# 在S3存储桶设置CORS规则
cors_configuration = {
'CORSRules': [
{
'AllowedHeaders': ['*'],
'AllowedMethods': ['GET', 'PUT', 'POST', 'DELETE'],
'AllowedOrigins': [
'https://yourdomain.com',
'http://localhost:3000' # 开发环境
],
'ExposeHeaders': ['ETag', 'x-amz-version-id'],
'MaxAgeSeconds': 3000
}
]
}
# 应用CORS配置
s3_client.put_bucket_cors(
Bucket=bucket_name,
CORSConfiguration=cors_configuration
)def generate_presigned_url_with_validation(self, filename: str,
content_type: str,
file_size: int) -> Dict[str, str]:
"""生成预签名URL,带有额外的验证"""
# 根据文件大小调整过期时间
if file_size > 1024 * 1024 * 1024: # 大于1GB
expires_in = 3600 # 1小时
else:
expires_in = 900 # 15分钟
key = f"uploads/{uuid.uuid4()}/{filename}"
try:
upload_url = self.s3_client.generate_presigned_url(
'put_object',
Params={
'Bucket': self.bucket_name,
'Key': key,
'ContentType': content_type,
'ContentLength': file_size,
# 添加一些安全限制
'ServerSideEncryption': 'AES256'
},
ExpiresIn=expires_in
)
return {
'upload_url': upload_url,
'key': key,
'expires_at': int(time.time()) + expires_in
}
except Exception as e:
self.logger.error(f"生成预签名URL失败: {e}")
raise大文件上传时,千万别把整个文件读到内存里。
def upload_large_file_streaming(self, file_path: str, key: str):
"""流式上传大文件,避免内存占用过高"""
def file_generator(file_path: str, chunk_size: int = 8192):
"""文件流生成器"""
with open(file_path, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
try:
# 使用流式上传
self.s3_client.upload_fileobj(
file_generator(file_path),
self.bucket_name,
key,
Config=boto3.s3.transfer.TransferConfig(
multipart_threshold=1024 * 25, # 25MB开始分片
max_concurrency=10,
multipart_chunksize=1024 * 25,
use_threads=True
)
)
except Exception as e:
self.logger.error(f"流式上传失败: {e}")
raiseimport backoff
from botocore.exceptions import ClientError
class RobustUploader:
@backoff.on_exception(
backoff.expo,
(ClientError, ConnectionError, TimeoutError),
max_tries=5,
max_time=300 # 最多重试5分钟
)
async def upload_with_robust_retry(self, chunk_data: bytes,
upload_url: str) -> str:
"""超级稳定的上传方法"""
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=300),
connector=aiohttp.TCPConnector(
limit=100,
limit_per_host=10,
keepalive_timeout=30
)
) as session:
async with session.put(
upload_url,
data=chunk_data,
headers={'Content-Type': 'application/octet-stream'}
) as response:
if response.status == 200:
return response.headers.get('ETag', '').strip('"')
else:
# 抛出异常触发重试
raise ClientError(
{'Error': {'Code': str(response.status)}},
'PutObject'
)# gunicorn配置优化
# gunicorn_config.py
import multiprocessing
# 工作进程数
workers = multiprocessing.cpu_count() * 2 + 1
# 工作线程数(处理IO密集型任务)
threads = 4
# 连接超时
timeout = 300 # 5分钟,给大文件上传足够时间
# 最大请求体大小
max_requests = 1000
max_requests_jitter = 100
# 内存优化
preload_app = True
worker_class = "uvicorn.workers.UvicornWorker"
# 日志配置
accesslog = "/var/log/gunicorn/access.log"
errorlog = "/var/log/gunicorn/error.log"
loglevel = "info"# nginx.conf
server {
listen 80;
server_name yourdomain.com;
# 上传文件大小限制
client_max_body_size 5G;
# 超时设置
client_body_timeout 300s;
client_header_timeout 60s;
# 缓冲区设置
client_body_buffer_size 128k;
client_header_buffer_size 32k;
location /api/upload/ {
proxy_pass http://backend;
# 代理超时设置
proxy_connect_timeout 60s;
proxy_send_timeout 300s;
proxy_read_timeout 300s;
# 请求体处理
proxy_request_buffering off;
proxy_buffering off;
# 头部设置
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}上传记录表的设计也很重要:
-- 上传记录表
CREATE TABLE upload_records (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
file_key VARCHAR(500) NOT NULL,
original_filename VARCHAR(255) NOT NULL,
file_size BIGINT NOT NULL,
content_type VARCHAR(100),
upload_method VARCHAR(20) DEFAULT 'single', -- single/multipart
upload_duration DECIMAL(10,3), -- 上传耗时(秒)
upload_speed DECIMAL(10,2), -- 上传速度(MB/s)
status VARCHAR(20) DEFAULT 'uploading', -- uploading/completed/failed
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
INDEX idx_user_id (user_id),
INDEX idx_status (status),
INDEX idx_created_at (created_at)
);
-- 分片上传状态表
CREATE TABLE multipart_upload_states (
id BIGSERIAL PRIMARY KEY,
upload_id VARCHAR(100) NOT NULL UNIQUE,
file_key VARCHAR(500) NOT NULL,
user_id BIGINT NOT NULL,
total_parts INT NOT NULL,
completed_parts JSON, -- 已完成的分片信息
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP NOT NULL,
INDEX idx_upload_id (upload_id),
INDEX idx_user_id (user_id),
INDEX idx_expires_at (expires_at)
);生产环境必须要有完善的监控:
# monitoring.py
import time
from dataclasses import dataclass
from typing import Dict, List
import json
@dataclass
class UploadAlert:
alert_type: str # 'slow_upload', 'high_failure_rate', 'storage_quota'
message: str
severity: str # 'warning', 'error', 'critical'
metadata: Dict
class UploadMonitoringService:
def __init__(self):
self.upload_stats = {
'total_uploads': 0,
'failed_uploads': 0,
'slow_uploads': 0,
'total_size': 0
}
def track_upload_completion(self, file_size: int, duration: float,
success: bool):
"""跟踪上传完成情况"""
self.upload_stats['total_uploads'] += 1
self.upload_stats['total_size'] += file_size
if not success:
self.upload_stats['failed_uploads'] += 1
# 计算上传速度
speed_mbps = (file_size * 8) / (duration * 1024 * 1024)
if speed_mbps < 1.0: # 速度低于1Mbps算慢
self.upload_stats['slow_uploads'] += 1
self._send_alert(UploadAlert(
alert_type='slow_upload',
message=f'上传速度过慢: {speed_mbps:.2f}Mbps',
severity='warning',
metadata={'speed': speed_mbps, 'file_size': file_size}
))
# 检查失败率
failure_rate = self.upload_stats['failed_uploads'] / self.upload_stats['total_uploads']
if failure_rate > 0.1: # 失败率超过10%
self._send_alert(UploadAlert(
alert_type='high_failure_rate',
message=f'上传失败率过高: {failure_rate:.2%}',
severity='error',
metadata={'failure_rate': failure_rate}
))
def _send_alert(self, alert: UploadAlert):
"""发送告警"""
# 这里可以接入钉钉、企微、邮件等告警渠道
print(f"[{alert.severity.upper()}] {alert.alert_type}: {alert.message}")
# 发送到监控系统
# self._send_to_datadog(alert)
# self._send_to_prometheus(alert)文件上传优化这件事,说简单也简单,说复杂也复杂。关键是要理解核心原理:
最重要的几个点:
我们团队用了这套方案后,用户上传体验提升了好几个档次。之前经常有用户抱怨上传慢,现在基本没有了。
当然,具体实施的时候还要根据你们的业务场景调整。比如如果用户主要在国内,传输加速的效果可能没那么明显;如果文件都比较小,分片上传也用不上。
最后提醒一句:性能优化是个持续的过程,不要指望一次就能做到完美。先把基础架构搭好,然后根据监控数据持续优化,这样才能真正做出用户满意的产品。