自动化下载是指借助特定的软件、脚本或工具,在无需人工持续手动干预的情况下,按照预设的规则和流程自动完成文件、程序、数据等资源的下载任务 。
requests库用于发送HTTP请求获取文件数据,os库用于处理文件和目录操作。pip命令进行安装。例如,安装requests库,在命令行中输入pip install requests。pythonimport requests
def download_file(url, save_path):
try:
response = requests.get(url, stream=True)
response.raise_for_status()
with open(save_path, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
file.write(chunk)
print(f"文件下载成功,保存路径:{save_path}")
except Exception as e:
print(f"下载失败:{e}")
# 示例用法
file_url = "https://example.com/file.zip"
save_location = "C:/Downloads/file.zip"
download_file(file_url, save_location)在上述代码中,定义了一个download_file函数,它接受文件的URL和本地保存路径作为参数。函数内部使用requests.get方法以流的方式获取文件数据,并逐块写入到本地文件中。
.py文件,例如download.py,然后在命令行中进入该文件所在的目录,运行命令python download.py即可执行脚本完成下载任务。requests库为例,可以通过设置Range头部来实现分块下载。以下是一个简单示例:pythonimport requests
def download_large_file_in_chunks(url, file_path, chunk_size=8192):
headers = {}
# 获取文件总大小
response = requests.head(url)
file_size = int(response.headers.get('Content-Length', 0))
if file_size > 0:
with open(file_path, 'wb') as f:
for start in range(0, file_size, chunk_size):
end = min(start + chunk_size - 1, file_size - 1)
headers['Range'] = f'bytes={start}-{end}'
chunk_response = requests.get(url, headers=headers, stream=True)
if chunk_response.status_code == 206: # 206表示部分内容
f.seek(start)
for chunk in chunk_response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
else:
print("无法获取文件大小信息")
# 使用示例
file_url = "https://example.com/large_file.zip"
save_path = "C:/Downloads/large_file.zip"
download_large_file_in_chunks(file_url, save_path)Range请求头,以便告知服务器从哪个字节位置开始继续传输数据。threading模块(多线程)或multiprocessing模块(多进程)来实现多线程或多进程下载。以下是一个简单的多线程下载示例:pythonimport requests
import threading
def download_chunk(url, start, end, file_path, chunk_number):
headers = {'Range': f'bytes={start}-{end}'}
response = requests.get(url, headers=headers, stream=True)
with open(f"{file_path}.part{chunk_number}", 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
def merge_chunks(file_path, num_chunks):
with open(file_path, 'wb') as final_file:
for i in range(num_chunks):
chunk_file_path = f"{file_path}.part{i}"
with open(chunk_file_path, 'rb') as chunk_file:
final_file.write(chunk_file.read())
# 下载完成后可删除临时分块文件
os.remove(chunk_file_path)
def download_large_file_multithreaded(url, file_path, num_threads=4):
response = requests.head(url)
file_size = int(response.headers.get('Content-Length', 0))
chunk_size = file_size // num_threads
threads = []
for i in range(num_threads):
start = i * chunk_size
end = start + chunk_size - 1 if i < num_threads - 1 else file_size - 1
thread = threading.Thread(target=download_chunk, args=(url, start, end, file_path, i))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
merge_chunks(file_path, num_threads)
# 使用示例
file_url = "https://example.com/large_file.zip"
save_path = "C:/Downloads/large_file.zip"
download_large_file_multithreaded(file_url, save_path)try-except语句捕获可能出现的异常(如网络连接超时、HTTP错误等),并在捕获到异常后进行重试操作。可以设置最大重试次数和重试间隔时间,避免无限重试导致资源浪费。例如:pythonimport time
import requests
def download_with_retry(url, max_retries=3, retry_interval=5):
retries = 0
while retries < max_retries:
try:
response = requests.get(url, stream=True)
if response.status_code == 200:
# 处理下载逻辑
return response
except requests.RequestException as e:
print(f"下载出错:{e},将在 {retry_interval} 秒后重试...")
retries += 1
time.sleep(retry_interval)
print("达到最大重试次数,下载失败")
return Nonetqdm)来方便地显示进度条。例如:pythonfrom tqdm import tqdm
import requests
def download_with_progress(url, file_path):
response = requests.get(url, stream=True)
total_size = int(response.headers.get('Content-Length', 0))
block_size = 8192
with open(file_path, 'wb') as f, tqdm(total=total_size, unit='iB', unit_scale=True) as pbar:
for data in response.iter_content(block_size):
if data:
f.write(data)
pbar.update(len(data))datetime模块获取当前时间并添加到文件名中:pythonimport requests
from datetime import datetime
def download_file_with_timestamp(url, save_directory):
response = requests.get(url, stream=True)
file_name = url.split('/')[-1] # 获取原始文件名
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
new_file_name = f"{file_name}_{timestamp}"
save_path = f"{save_directory}/{new_file_name}"
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return save_path
# 使用示例
file_url = "https://example.com/file.txt"
save_dir = "C:/Downloads"
download_file_with_timestamp(file_url, save_dir)pythonimport requests
counter = 1
def download_file_with_sequence(url, save_directory):
global counter
response = requests.get(url, stream=True)
file_name = url.split('/')[-1]
file_extension = file_name.split('.')[-1]
new_file_name = f"file_{counter}.{file_extension}"
counter += 1
save_path = f"{save_directory}/{new_file_name}"
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return save_path
# 多次调用该函数下载文件时,文件名将按顺序编号<title>标签的内容作为文件名。这需要解析HTML文档,可使用相关的库(如Python的BeautifulSoup库)来实现。示例代码如下:pythonimport requests
from bs4 import BeautifulSoup
def download_webpage_with_title(url, save_directory):
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
page_title = soup.title.string.strip().replace(' ', '_').replace('/', '_') # 清理标题中的非法字符
file_extension = 'html'
new_file_name = f"{page_title}.{file_extension}"
save_path = f"{save_directory}/{new_file_name}"
with open(save_path, 'w', encoding='utf-8') as f:
f.write(response.text)
return save_pathexifread库用于处理图片的EXIF数据。pythonimport json
def load_rename_config(config_file):
with open(config_file, 'r') as f:
config = json.load(f)
return config
def download_file_with_config(url, save_directory, config_file):
config = load_rename_config(config_file)
# 根据配置文件中的规则进行重命名和下载操作
# 假设配置文件中有一个 'prefix' 字段用于指定文件名前缀
prefix = config.get('prefix', '')
# 下载和重命名逻辑(结合前面提到的方法)
#...threading、concurrent.futures 或 aiohttp 等库。Range 头部指定下载的字节范围:http复制GET /file.zip HTTP/1.1 Range: bytes=0-499999concurrent.futures.ThreadPoolExecutor 来创建线程池。示例代码(Python)
以下是一个简单的 Python 示例,演示如何使用 concurrent.futures 实现多线程下载:
python复制import requests
from concurrent.futures import ThreadPoolExecutor
def download_part(url, start, end, part_num):
headers = {'Range': f'bytes={start}-{end}'}
response = requests.get(url, headers=headers, stream=True)
with open(f'part_{part_num}', 'wb') as f:
f.write(response.content)
def main(url, num_parts):
response = requests.head(url)
file_size = int(response.headers['Content-Length'])
part_size = file_size // num_parts
with ThreadPoolExecutor(max_workers=num_parts) as executor:
futures = []
for i in range(num_parts):
start = i * part_size
end = start + part_size - 1 if i < num_parts - 1 else file_size - 1
futures.append(executor.submit(download_part, url, start, end, i))
for future in futures:
future.result() # Wait for all threads to complete
# 合并文件
with open('final_file', 'wb') as final_file:
for i in range(num_parts):
with open(f'part_{i}', 'rb') as part_file:
final_file.write(part_file.read())
if __name__ == "__main__":
url = "http://example.com/largefile.zip"
num_parts = 4 # 设置线程数
main(url, num_parts)注意事项
确保目标服务器支持 HTTP Range 请求。通过发送带有 Range 头的请求,可以指定要下载的文件的字节范围。
在开始下载之前,检查本地是否已经存在部分下载的文件,并获取其大小。这可以通过 os.path.getsize() 方法实现。
根据已下载的文件大小,计算出需要下载的字节范围。例如,如果已下载 500 KB,而文件总大小为 2 MB,则需要下载的范围是从 500 KB 到 2 MB。
在下载时,使用 Range 头部指定要下载的字节范围。如果文件已经部分下载,可以从上次中断的地方继续下载。
在下载过程中,确保将新下载的数据追加到已存在的文件中,而不是覆盖它。
实现错误处理机制,以便在下载失败时能够重试。可以设置最大重试次数,避免无限重试。
示例代码(Python)
以下是一个简单的 Python 示例,演示如何实现断点续传:
python复制import os
import requests
def download_file(url, local_filename):
# 检查已下载的文件大小
if os.path.exists(local_filename):
resume_header = {'Range': f'bytes={os.path.getsize(local_filename)}-'}
else:
resume_header = {}
# 发送请求
response = requests.get(url, headers=resume_header, stream=True)
# 检查响应状态
if response.status_code not in (200, 206):
print(f"Error: {response.status_code}")
return
# 以追加模式打开文件
with open(local_filename, 'ab') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Downloaded: {local_filename}")
if __name__ == "__main__":
url = "http://example.com/largefile.zip"
local_filename = "largefile.zip"
download_file(url, local_filename)关键点
Range 头部来请求未下载的部分。'ab' 模式打开文件,以便在文件末尾追加数据。注意事项
以下是一个使用 Python 和 cryptography 库进行 AES 加密和解密的示例:
python复制from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import os
def encrypt_file(key, input_file, output_file):
iv = os.urandom(16) # 生成随机的初始向量
cipher = Cipher(algorithms.AES(key), modes.CFB(iv), backend=default_backend())
encryptor = cipher.encryptor()
with open(input_file, 'rb') as f:
plaintext = f.read()
with open(output_file, 'wb') as f:
f.write(iv) # 将初始向量写入文件
f.write(encryptor.update(plaintext) + encryptor.finalize())
def decrypt_file(key, input_file, output_file):
with open(input_file, 'rb') as f:
iv = f.read(16) # 读取初始向量
ciphertext = f.read()
cipher = Cipher(algorithms.AES(key), modes.CFB(iv), backend=default_backend())
decryptor = cipher.decryptor()
with open(output_file, 'wb') as f:
f.write(decryptor.update(ciphertext) + decryptor.finalize())
if __name__ == "__main__":
key = os.urandom(32) # 生成一个随机密钥(32字节用于AES-256)
encrypt_file(key, 'example.txt', 'example.enc') # 加密文件
decrypt_file(key, 'example.enc', 'example_decrypted.txt') # 解密文件下载进度:记录每个文件的下载进度,包括已下载的字节数和总字节数。这可以帮助用户了解下载的实时状态。
下载状态:监控下载是否成功、失败或中断,并记录相应的状态信息。
文件完整性:在下载完成后,可以使用哈希算法(如 SHA-256)对文件进行完整性校验,确保文件未被篡改。
文件大小变化:监控文件的大小变化,确保下载的文件与预期一致。
文件系统监控:可以使用文件系统监控工具(如 inotify 在 Linux 上)来监控特定目录中的文件变化,及时响应文件的创建、修改或删除事件。
日志内容:记录每次下载的详细信息,包括:
日志格式:使用结构化日志格式(如 JSON)或简单的文本格式,便于后续分析和处理。
日志库:使用 Python 的 logging 模块或其他语言的日志库来管理日志记录。可以设置不同的日志级别(DEBUG、INFO、WARNING、ERROR)和输出格式。
以下是一个简单的示例,演示如何在自动化下载中实现文件监控和日志记录:
python复制import os
import requests
import logging
import hashlib
from time import time
# 设置日志配置
logging.basicConfig(filename='download.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def calculate_md5(file_path):
"""计算文件的 MD5 值"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def download_file(url, local_filename):
"""下载文件并记录日志"""
try:
logging.info(f"开始下载: {url}")
response = requests.get(url, stream=True)
response.raise_for_status() # 检查请求是否成功
with open(local_filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logging.info(f"下载完成: {local_filename}")
# 校验文件完整性
md5_hash = calculate_md5(local_filename)
logging.info(f"文件 MD5: {md5_hash}")
except Exception as e:
logging.error(f"下载失败: {url} - 错误: {str(e)}")
if __name__ == "__main__":
url = "http://example.com/largefile.zip"
local_filename = "largefile.zip"
download_file(url, local_filename)