前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于minio实现大文件的分片上传功能

基于minio实现大文件的分片上传功能

作者头像
明月AI
发布2023-08-26 15:06:15
2.4K0
发布2023-08-26 15:06:15
举报
文章被收录于专栏:野生AI架构师野生AI架构师

问题

在系统中上传大文件的时候,可能会因为文件过大而被网关限制,或者超时而导致失败。

我们的存储是基于minio实现s3文件存储服务。

最直接的解决方案

直接把minio开放出去作为一个s3服务,minio本身也是支持自动对文件进行分片上传的,但是这样会有一个问题,权限很难做精细化的控制,对于高安全性要求的场景就达不到安全要求。

先缓存到本地,合并成完整的文件再传到S3上

就是通过接口接收每个分片,存储到本地,当接收了所有的分片之后,再将文件合并成一个大文件,再上传到S3上。

同事之前写的代码就是这样实现的,好像是可以实现需求的,但是这会对本地文件系统产生依赖,一旦系统需要部署多个节点,就会出问题,没办法保证同一个大文件的所有分片都落在同一个服务器节点上,就出问题了。

基于minio的内部接口实现分片上传

网上找了半天,官方文档也找了,并没有找到minio可以自助实现切片上传的接口,后来翻看接口才找到以下几个接口:
代码语言:javascript
复制
# 创建分片上传,返回上传id
_create_multipart_upload
# 使用上传id来上传分片
_upload_part
# 所有分片都上传完之后,需要执行这个完成上传的接口
# 这个接口执行成功之后,s3中的文件才正常
_complete_multipart_upload

这三个接口居然都是下划线开头的,难怪官方文档都找不到。

有了这三个接口,要实现分片上传并不难,基于FastAPI实现对应的三个接口:

具体代码如下:

代码语言:javascript
复制
import time
from minio import Minio
from minio.datatypes import Part
from fastapi import APIRouter, Body, UploadFile, File
from fastapi import BackgroundTasks

s3_client  = Minio(
    "192.168.1.242:19000",
    access_key="xxxxxxx",
    secret_key="xxxxxxx",
    secure=False
)
headers = {}
headers["Content-Type"] = "application/octet-stream"
bucket_name = "test"
save_name = 'test.file'

router = APIRouter()

# 分片基础信息
parts = []

def save_to_s3(file_part, object_name: str, part_number: int, upload_id: str):
    global parts
    # 上传文件分片,内网测试耗时约:0.45秒
    etag = s3_client._upload_part(data=file_part, bucket_name=bucket_name, object_name=object_name,
                                  part_number=part_number, upload_id=upload_id, headers=headers)

    # 将上传的分片添加到分片列表
    parts.append((part_number, etag))
    return

@router.post("/part/create", summary='创建分片上传')
async def create_api(
):
    """创建分片上传,获取上传ID"""
    global parts
    parts = []
    upload_id = s3_client._create_multipart_upload(bucket_name=bucket_name, object_name=save_name, headers=headers)
    return {'data': upload_id}

@router.post("/part", summary='上传分片')
async def upload_api(
    background_tasks: BackgroundTasks,
    part: UploadFile = File(..., title="分片文件"),
    part_number: int = Body(..., title="分片序号", description='分片序号'),
    upload_id: str = Body(..., title="上传ID", description='上传ID'),
):
    """分片上传"""
    content = await part.read()
    background_tasks.add_task(save_to_s3, content, save_name, part_number, upload_id)
    return {'data': True}

@router.post("/part/finish", summary='上传完成')
async def finish_api(
    upload_id: str = Body(..., title="上传ID", description='上传ID'),
    part_count: int = Body(..., title="分片数量", description='分片数量'),
):
    """完成分片上传"""
    if len(parts) != part_count:
        return {'data': len(parts), 'status': False}
    _start = time.time()
    _parts = sorted(parts, key=lambda v: v[0])
    _parts = [Part(*p) for p in _parts]
    # 这个执行成功之后,S3才能找到对应的文件
    response = s3_client._complete_multipart_upload(bucket_name, save_name, upload_id, _parts)
    print(f"upload id: {upload_id}, parts: {parts}")
    print(f'文件上传完成, time: {time.time()-_start}', flush=True)
    return {'data': time.time()-_start, 'status': True}

为了加速大文件的上传,使用BackgroundTasks将比较耗时的分片上传到S3的过程移到后台任务中去执行。

在finish接口,特别需要注意的是,parts参数需要按分片的序号排好序,不然会报错。

另外,在minio中,分片大小不能小于5M,否则最后调用finish接口的时候会报错。

接口测试代码

测试代码如下:
代码语言:javascript
复制
import os
import math
import time
import requests

url_prefix = "http://127.0.0.1:8000"

def upload_large_file(file_path, object_name, part_size = 5 * 1024 * 1024):
    # 创建一个multipart上传
    resp = requests.post(f"{url_prefix}/upload/part/create").json()
    upload_id = resp['data']

    # 计算文件分片数
    file_size = os.path.getsize(file_path)
    part_count = int(math.ceil(file_size / part_size))
    print(f"size: {file_size}, count: {part_count}, file: {file_path}")

    # 逐个上传文件分片
    parts = []
    total_start = time.time()
    with open(file_path, 'rb') as file:
        for part_number in range(1, part_count + 1):
            start = (part_number - 1) * part_size
            end = min(start + part_size, file_size)
            file_part = file.read(end - start)

            # 上传文件分片,内网测试耗时约:0.45秒
            _start = time.time()
            data = {
                'part_number': part_number,
                'upload_id': upload_id,
            }
            files = {
                'part': file_part,
            }
            resp = requests.post(f"{url_prefix}/upload/part", data=data, files=files).json()
            print(f"num: {part_number}, time: {time.time()-_start}, total time: {time.time()-total_start}")

    upload_time = time.time() - total_start
    # 完成multipart上传,内网测试耗时:
    # 如果时全新的文件上传,耗时约:0.2秒
    # 如果是覆盖文件上传,耗时约:1-3秒
    while True:
        _start = time.time()
        data = {
            'part_count': part_count,
            'upload_id': upload_id,
        }
        resp = requests.post(f"{url_prefix}/upload/part/finish", json=data).json()
        print(f'文件上传完成, time: {time.time()-_start}, total: {time.time() - total_start}, upload: {upload_time}')
        print(resp)
        if resp['status'] == True:
            break
        time.sleep(1)

if __name__ == "__main__":
    import sys
    upload_large_file(sys.argv[1], "test-2.txt", part_size=5*1024*1024)

对于大文件,可以测试不同的分片大小,看各块的耗时情况,本地测试时,分片数量超过45个的时候,上传分片接口的延迟就会增大不少,这个可能跟系统性能是有关系的,实际应用中,应该测试一个比较合适的值。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-07-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 野生AI架构师 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
腾讯云服务器利旧
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档