前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >⭐Python实现多线程并发下载大文件(制作支持断点续传的下载器的绝佳参考⁉️)⭐

⭐Python实现多线程并发下载大文件(制作支持断点续传的下载器的绝佳参考⁉️)⭐

作者头像
全栈程序员站长
发布2022-09-13 16:32:37
1.9K0
发布2022-09-13 16:32:37
举报
文章被收录于专栏:全栈程序员必看

大家好,又见面了,我是你们的朋友全栈君。

不知道各位童鞋们是否遇到过需要使用python下载大文件的需求,或者需要从一些网速很慢的网站上下载文件。如果你在实际下载过程碰到下载不稳定经常失败的情况,本文的方法将会给你带来一些解决思路和方案。

本文会给大家演示如何使用python对单个大文件进行多线程下载或协程形式下载,基于此还提供了断点续传的实现思路,想使用python开发下载器的朋友都可以拿本文作为参考,期待各位大佬的大作。

文章目录

下面我们以知乎视频《【AI混血】人工智能一键生成角色全身立绘?!【大谷纽约实验室】》为例进行演示,链接:

https://www.zhihu.com/zvideo/1387830268154195968

下面首先看一下最基础的直接下载文件的方法:

⭐单线程直接下载⭐

只需要在开发者工具的元素选项卡搜索video标签,即可找到视频的MP4下载链接:

当然这个操作也完全可以借助idm的嗅探功能获取下载链接。 idm下载地址:https://www.lanzoui.com/ia51jqb idm插件安装地址:https://chrome.google.com/webstore/detail/idm-integration-module/ngpampappnmepgilojfohadhhmbhlaek

复制出该链接后即可直接下载该视频:

代码语言:javascript
复制
import requests

url = "https://vdn3.vzuu.com/HD/e898cfec-ccf3-11eb-b43a-6ec658071f3e-t1111-vgodrDABRC.mp4?disable_local_cache=1&auth_key=1629560389-0-0-05874b492bec9be924c7da35aa619536&f=mp4&bu=http-com&expiration=1629560389&v=tx"
save_name = "单线程直接下载.mp4"
with open(save_name, "wb") as f, requests.get(url) as res:
    f.write(res.content)

下载后视频正常播放:

这种下载方式对于知乎这种网络快的网站自然是没有问题,但是有些网络不好的网站就很可能下载中途网络中断,等了很久最终却下载失败。下面我们看看相对稳定很多的下载方法:

🔥单线程流式下载🔥

使用方法是get方法指定参数stream=True

代码语言:javascript
复制
save_name = "单线程流式下载.mp4"
num = 0
with open(save_name, "wb") as f, requests.get(url, stream=True) as res:
    for chunk in res.iter_content(chunk_size=64*1024):
        if not chunk:
            break
        f.write(chunk)
        num += 1
        print(f"\r迭代次数:{ 
     num}", end=" ")

上述代码,以64KB为一组进行流式数据传输,最终速度显然比普通的下载更快一些:

☀️单线程分片流式下载☀️

那么我们如何做到文件的断点续传呢?这时候就需要通过请求头修改需要读取的字节范围,当然也需要先检查目标服务器是否支持范围请求。

如果请求一个资源时, HTTP响应中出现Accept-Ranges且其值不是none, 那么服务器支持范围请求。

我们看看head这种请求方式:

代码语言:javascript
复制
res = requests.head(url)
head = res.headers
data = res.content
print(head)
print(data)
代码语言:javascript
复制
{'Server': 'NWSs', 'Date': 'Sat, 21 Aug 2021 15:25:19 GMT', 'Content-Type': 'video/mp4', 'Content-Length': '53825263', 'Connection': 'keep-alive', 'Cache-Control': 'max-age=600', 'Expires': 'Sat, 21 Aug 2021 15:35:19 GMT', 'Last-Modified': 'Mon, 14 Jun 2021 10:06:22 GMT', 'X-NWS-LOG-UUID': '8fcf387e-7bab-44fb-8cf0-e96f5def3b1c', 'Access-Control-Allow-Origin': '*', 'Access-Control-Max-Age': '31536000', 'x-cdn-provider': 'tencent', 'X-Cache-Lookup': 'Hit From Disktank3, Hit From Inner Cluster', 'Accept-Ranges': 'bytes', 'ETag': '"bfd7937656505a8ab9a05bc373745a8b"', 'x-cos-hash-crc64ecma': '5809002519149120115', 'x-cos-replication-status': 'Complete', 'x-cos-request-id': 'NjEwOTM5MzZfNWM0ZTQ0MGJfNDc2MV8xZmQ3YmZj', 'x-cos-storage-class': 'STANDARD_IA', 'x-cos-version-id': 'MTg0NDUxMjA0MDg1MjcxMDk4MTA', 'X-Daa-Tunnel': 'hop_count=1'}
b''

可以看到head请求只返回的响应头,未返回任何数据。上面的响应头中,'Accept-Ranges': 'bytes' 代表可以使用字节作为单位来定义请求范围。Content-Length 则代表该资源的完整大小。

于是我们可以通过Content-Length 响应头获取文件的大小:

代码语言:javascript
复制
filesize = int(head['Content-Length'])
filesize
代码语言:javascript
复制
53825263

这就是当前文件的总大小。

这时我们就可以根据总大小对文件进行分片,例如总共分几部分或者多大的部分作为一个分片。这里我以个数进行分片,下面方法默认对文件分成10个小部分:

代码语言:javascript
复制
def calc_divisional_range(filesize, chuck=10):
    step = filesize//chuck
    arr = list(range(0, filesize, step))
    result = []
    for i in range(len(arr)-1):
        s_pos, e_pos = arr[i], arr[i+1]-1
        result.append([s_pos, e_pos])
    result[-1][-1] = filesize-1
    return result


divisional_ranges = calc_divisional_range(filesize)
divisional_ranges
代码语言:javascript
复制
[[0, 5382525],
 [5382526, 10765051],
 [10765052, 16147577],
 [16147578, 21530103],
 [21530104, 26912629],
 [26912630, 32295155],
 [32295156, 37677681],
 [37677682, 43060207],
 [43060208, 48442733],
 [48442734, 53825262]]

Range 请求头的语法:Range: bytes=start-end

Range头域可以请求一个或者多个子范围。例如:

  • 表示头500个字节:bytes=0-499
  • 表示第二个500字节:bytes=500-999
  • 表示最后500个字节:bytes=-500
  • 表示500字节以后的范围:bytes=500-
  • 第一个和最后一个字节:bytes=0-0,-1
  • 同时指定几个范围:bytes=500-600,601-999

需要注意一下各种文件模式的区别:

模式

描述

r

默认模式:以只读文本形式打开文件,文件的指针将会放在文件的开头。

rb

以二进制格式打开一个文件用于只读,文件指针将会放在文件的开头。

r+

打开一个文件用于文本读写,文件指针将会放在文件的开头。

rb+

以二进制格式打开一个文件用于读写,文件指针将会放在文件的开头。

w

打开一个文件只用于写入。如果该文件已存在则将其覆盖。如果该文件不存在,创建新文件。

wb

以二进制格式打开一个文件只用于文本写入。如果该文件已存在则将其覆盖。如果该文件不存在,创建新文件。

w+

打开一个文件用于读写,如果该文件已存在则将其覆盖。如果该文件不存在,创建新文件。

wb+

以二进制格式打开一个文件用于读写。如果该文件已存在则将其覆盖。如果该文件不存在,创建新文件。

a

打开一个文件用于文本追加。如果该文件已存在,文件指针将会放在文件的结尾。如果该文件不存在,创建新文件进行写入。

ab

以二进制格式打开一个文件用于追加。如果该文件已存在,文件指针将会放在文件的结尾。如果该文件不存在,创建新文件进行写入。

a+

打开一个文件用于读写。如果该文件已存在,文件指针将会放在文件的结尾。文件打开时会是追加模式。如果该文件不存在,创建新文件用于读写。

ab+

以二进制格式打开一个文件用于追加。如果该文件已存在,文件指针将会放在文件的结尾。如果该文件不存在,创建新文件用于读写。

可以看到:

wb+rb+ab+均以二进制格式读写文件,但wb+会覆盖已经存在的文件,只有rb+ab+能够允许多个文件句柄操作同一个文件。最终我选择打开后文件指针在文件开头的rb+模式来完成多线程对同一文件的读写。

首先需要先创建空文件,保证rb+模式读取文件前,文件已经存在:

代码语言:javascript
复制
save_name = "单线程分片流式下载.mp4"
with open(save_name, "wb") as f:
    pass

将范围下载的过程封装到以下方法中:

代码语言:javascript
复制
def range_download(save_name, s_pos, e_pos):
    headers = { 
   "Range": f"bytes={ 
     s_pos}-{ 
     e_pos}"}
    res = requests.get(url, headers=headers, stream=True)
    with open(save_name, "rb+") as f:
        f.seek(s_pos)
        for chunk in res.iter_content(chunk_size=64*1024):
            if chunk:
                f.write(chunk)

然后就可以分片进行下载:

代码语言:javascript
复制
for s_pos, e_pos in divisional_ranges:
    range_download(save_name, s_pos, e_pos)

循环每一次都打开一个文件句柄写入指定范围的数据。

基于此,我们就可以很简单的转换为多线程的实现:

😎多线程下载大文件😎

关于多线程、协程和多进程可以参考前面的示例:

结合前面已有代码,实现多线程下载的完整代码为:

代码语言:javascript
复制
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests


def calc_divisional_range(filesize, chuck=10):
    step = filesize//chuck
    arr = list(range(0, filesize, step))
    result = []
    for i in range(len(arr)-1):
        s_pos, e_pos = arr[i], arr[i+1]-1
        result.append([s_pos, e_pos])
    result[-1][-1] = filesize-1
    return result


# 下载方法
def range_download(save_name, s_pos, e_pos):
    headers = { 
   "Range": f"bytes={ 
     s_pos}-{ 
     e_pos}"}
    res = requests.get(url, headers=headers, stream=True)
    with open(save_name, "rb+") as f:
        f.seek(s_pos)
        for chunk in res.iter_content(chunk_size=64*1024):
            if chunk:
                f.write(chunk)


url = "https://vdn3.vzuu.com/HD/e898cfec-ccf3-11eb-b43a-6ec658071f3e-t1111-vgodrDABRC.mp4?disable_local_cache=1&auth_key=1629707188-0-0-8e6fe4e1e29621664c71e2b95fc3bdb9&f=mp4&bu=http-com&expiration=1629707188&v=tx"
res = requests.head(url)
filesize = int(head['Content-Length'])
divisional_ranges = calc_divisional_range(filesize)


save_name = "多线程流式下载.mp4"
# 先创建空文件
with open(save_name, "wb") as f:
    pass
with ThreadPoolExecutor() as p:
    futures = []
    for s_pos, e_pos in divisional_ranges:
        print(s_pos, e_pos)
        futures.append(p.submit(range_download, save_name, s_pos, e_pos))
    # 等待所有任务执行完毕
    as_completed(futures)
代码语言:javascript
复制
0 5382525
5382526 10765051
10765052 16147577
16147578 21530103
21530104 26912629
26912630 32295155
32295156 37677681
37677682 43060207
43060208 48442733
48442734 53825262

这样我们就实现了Python多线程下载大文件。

从结果看,四种下载方法得到的文件都完全一致:

这样我们就实现了大文件的多线程下载。

😱协程分片下载大文件😱

那么能否以协程形式分片下载大文件呢?

在之前的协程爬虫的文章中,我使用了aiohttp完成了数据的异常爬虫,这次我们尝试使用最近一个新的支持异步爬取的库httpx,而且该库支持http2.0能够爬取http2.0协议的网页。

要爬取http2.0的站点只需要: import httpx client = httpx.Client(http2=True) 之后client对象与request库的API几乎完全一致,只需把之前代码中使用的requests改成这个client对象即可。

协程不需要支持stream流式下载,最终封装的下载方法为:

代码语言:javascript
复制
async def async_range_download(save_name, s_pos, e_pos):
    headers = { 
   "Range": f"bytes={ 
     s_pos}-{ 
     e_pos}"}
    res = await client.get(url, headers=headers)
    with open(save_name, "rb+") as f:
        f.seek(s_pos)
        f.write(res.content)   

完整下载代码为:

代码语言:javascript
复制
import asyncio
import httpx
import requests

import nest_asyncio
nest_asyncio.apply()


def calc_divisional_range(filesize, chuck=10):
    step = filesize//chuck
    arr = list(range(0, filesize, step))
    result = []
    for i in range(len(arr)-1):
        s_pos, e_pos = arr[i], arr[i+1]-1
        result.append([s_pos, e_pos])
    result[-1][-1] = filesize-1
    return result


# 下载方法
async def async_range_download(save_name, s_pos, e_pos):
    headers = { 
   "Range": f"bytes={ 
     s_pos}-{ 
     e_pos}"}
    res = await client.get(url, headers=headers)
    with open(save_name, "rb+") as f:
        f.seek(s_pos)
        f.write(res.content)

client = httpx.AsyncClient()

url = "https://vdn1.vzuu.com/HD/e898cfec-ccf3-11eb-b43a-6ec658071f3e-t1111-vgodrDABRC.mp4?disable_local_cache=1&auth_key=1629718189-0-0-2e4eceee29e2d17a92c77fd49911d39a&f=mp4&bu=http-com&expiration=1629718189&v=hw"
res = httpx.head(url)
filesize = int(res.headers['Content-Length'])
divisional_ranges = calc_divisional_range(filesize, 20)


save_name = "协程分片下载.mp4"
# 先创建空文件
with open(save_name, "wb") as f:
    pass

loop = asyncio.get_event_loop()
tasks = [async_range_download(save_name, s_pos, e_pos)
         for s_pos, e_pos in divisional_ranges]
# 等待所有协程执行完毕
loop.run_until_complete(asyncio.wait(tasks))

上述代码中:

代码语言:javascript
复制
import nest_asyncio
nest_asyncio.apply()

这两行的目的是为了兼容协程程序能够在Jupyter notebook环境中运行,对于普通的py文件中运行,可以直接删除。

最终下载结果:

💙实现断点续传的思路💙

那么如何通过python实现断点续传呢?

粗粒度的方法就是以分片为校验单位,某个分片下载失败则重新下载。

细粒度一点的方法是每个分片内部校验已下载的范围,对于下载失败的分布,重新定位起始位置继续下载。

🚀总结🚀

本文非常浅层的演示了python如何实现多线程文件下载,核心取决于现代服务器基本都支持范围下载的前提下。关于断点续传,文章已提供基本理论和实现思路,有兴趣通过python实现下载器的朋友都可以专门去实现一下。

我是小小明,咱们下期再见~

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/162154.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • ⭐单线程直接下载⭐
  • 🔥单线程流式下载🔥
  • ☀️单线程分片流式下载☀️
  • 😎多线程下载大文件😎
  • 😱协程分片下载大文件😱
  • 💙实现断点续传的思路💙
  • 🚀总结🚀
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档