自动化下载处理大文件需要考虑下载稳定性、速度优化以及可能出现的错误处理等多方面因素,以下为你详细介绍一些常见的处理方法:
pythonimport requests
def download_large_file_in_chunks(url, file_path, chunk_size=8192):
headers = {}
# 获取文件总大小
response = requests.head(url)
file_size = int(response.headers.get('Content-Length', 0))
if file_size > 0:
with open(file_path, 'wb') as f:
for start in range(0, file_size, chunk_size):
end = min(start + chunk_size - 1, file_size - 1)
headers['Range'] = f'bytes={start}-{end}'
chunk_response = requests.get(url, headers=headers, stream=True)
if chunk_response.status_code == 206: # 206表示部分内容
f.seek(start)
for chunk in chunk_response.iter_content(chunk_size=chunk_size):
if chunk:
f.write(chunk)
else:
print("无法获取文件大小信息")
# 使用示例
file_url = "https://example.com/large_file.zip"
save_path = "C:/Downloads/large_file.zip"
download_large_file_in_chunks(file_url, save_path)pythonimport requests
import threading
def download_chunk(url, start, end, file_path, chunk_number):
headers = {'Range': f'bytes={start}-{end}'}
response = requests.get(url, headers=headers, stream=True)
with open(f"{file_path}.part{chunk_number}", 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
def merge_chunks(file_path, num_chunks):
with open(file_path, 'wb') as final_file:
for i in range(num_chunks):
chunk_file_path = f"{file_path}.part{i}"
with open(chunk_file_path, 'rb') as chunk_file:
final_file.write(chunk_file.read())
# 下载完成后可删除临时分块文件
os.remove(chunk_file_path)
def download_large_file_multithreaded(url, file_path, num_threads=4):
response = requests.head(url)
file_size = int(response.headers.get('Content-Length', 0))
chunk_size = file_size // num_threads
threads = []
for i in range(num_threads):
start = i * chunk_size
end = start + chunk_size - 1 if i < num_threads - 1 else file_size - 1
thread = threading.Thread(target=download_chunk, args=(url, start, end, file_path, i))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
merge_chunks(file_path, num_threads)
# 使用示例
file_url = "https://example.com/large_file.zip"
save_path = "C:/Downloads/large_file.zip"
download_large_file_multithreaded(file_url, save_path)pythonimport time
import requests
def download_with_retry(url, max_retries=3, retry_interval=5):
retries = 0
while retries < max_retries:
try:
response = requests.get(url, stream=True)
if response.status_code == 200:
# 处理下载逻辑
return response
except requests.RequestException as e:
print(f"下载出错:{e},将在 {retry_interval} 秒后重试...")
retries += 1
time.sleep(retry_interval)
print("达到最大重试次数,下载失败")
return Nonepythonfrom tqdm import tqdm
import requests
def download_with_progress(url, file_path):
response = requests.get(url, stream=True)
total_size = int(response.headers.get('Content-Length', 0))
block_size = 8192
with open(file_path, 'wb') as f, tqdm(total=total_size, unit='iB', unit_scale=True) as pbar:
for data in response.iter_content(block_size):
if data:
f.write(data)
pbar.update(len(data))