前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python实现主从分布式爬虫,含源码分享

Python实现主从分布式爬虫,含源码分享

作者头像
程序猿的栖息地
发布2022-04-29 14:43:21
3310
发布2022-04-29 14:43:21
举报
文章被收录于专栏:程序猿的栖息地

为什么要用分布式爬虫

学习爬虫已经有一段时间了,之前的爬虫都是一个python文件就实现的,没考虑性能,效率之类的。所以作为一个合格的spider,需要学习一下分布式爬虫。

什么分布式爬虫?简单地说就是用多台服务器去获取数据,让这些服务器去协同,分配各自的任务。

分布式爬虫设计

最常用的一种就是主从分布式爬虫,本文将使用Redis服务器来作为任务队列。

如图:

准备工作

安装python3和Redis

安装requests与Redis相关的库

pip install requests

pip install pyquery

pip install redis

代码

主函数(master.py)

代码语言:javascript
复制
import os
import requests
from pyquery import PyQuery as pq
import re
import json
import config
from cache import RedisCache
from model import Task
def parse_link(div):
获取连接
e pq(div)
href e.find('a').attr('href')
return href
def get_from_url(wrl):
获取列表连接
page get_page(url)
e pq(page)
items e('.epiltem.video')
links [parse_link(i) for i in items]
print(len(links))
links.reverse()
return links
def get_page(wrl):
获取页面
proxies config.proxies
try:
res requests.get(url,proxies = proxies)
# print(res.text)
except requests.exceptions.ConnectionError as e:
print('Error',e.args)
page res.content
return page
def get_all_file(patht
 fiieList []):
获取目录下所有文件
get_dir os.listdir(path) #適历当前目录,获取文件列表
for i in get_dir:
sub_dir os.path.join(path,i) #把第一步获取的文件加入路径
# print(sub_dir)
if os.path.isdir(sub_dir): #如果当前仍然是文件夹,递归调用
get_all_file(sub_dir, fiieList)
else:
ax os.path.abspath(sub_dii') #如果当前路径不是文件夹,则把文件名放入列表
# print(ax)
fiieList.append(ax)
return fiieList
def init_finish_task(path):
初始化已结束任务
redis—cache RedisCache()
fiieList []
fiieList get_all_file(path, fiieList)
# print(fileList)
for file in fiieList:
file_name os.path.basename(file)
task_id file_name[:5]
# print(task_id)
redis_cache.sadd('Task:finish', task_id)
print('init_finish_task...end')
def init_task_url():
初始化已结束任务url
redis—cache RedisCache()
url config.list_url
link_list get_f rom_u r1(u r1)
for link in link_list:
task_url config.task_url_head link
# print(task_url)
task_id task_url[ 5:]
# redis—cache.set(_Task:id:{}:url_.format(task_id),task_url)
t task_from_url(task_url)
# print('add task {}'.format(t._diet_))
print('add task_id {}'.format(task_id))
redis_cache.set('Task:id:{}'.format(task_id), t._diet_)
# print(t)
print('init_task_url...end')
def task_from_url(tosfe_wrl):
获取任务
page get_page(task_url)
e pq(page)
task_id task_url[ 5:]
title e('.controlBar').find('.epi-title').text().replace('/'> '-')•neplace(')
file_url e('.audioplayer').find('audio').attr('sre')
ext file_url[ 4:]
file_name task_id+'.' title ext
# content = e('.epi-description').html()
t Task()
t.id task一id
t.title title
t.url task_url
t.file_name file_name
t.file_url file_url
# t.content = content
return t
def main():
init_task_url()
init_finish_task(config.down_folder)
if _name_ '_main_':
main()

从函数(salver.py)

代码语言:javascript
复制
import os
import requests
from pyquery import PyQuery as pq
import re
import json
import config
from cache import RedisCache
def get_page(wrl):
获取页面
proxies config.proxies
try:
res requests.get(url,proxies = proxies)
# print(res.text)
except requests.exceptions.ConnectionError as e:
print('Error',e.args)
page res.content
return page
def begin_task(tosfe_id, fiLe_name, fiie_uri):
print('begin task {}'.format(task_id))
redis—cache RedisCache()
#添力5到正在下载列表
redis_cache.sadd('Task:begin', task_id)
print('download...format(file_name))
folder config.down_folder
path os.path.join(folder, file_name)
download(file_url, path)
print('end task {}'.format(task_id))
def do\-/nload(linfe, path):
redis—cache RedisCache()
proxies config.proxies
if os.path.exists(path):
print('file exist')
else:
try:
r requests.get(link,proxies = proxies,streom=True)
total—length int(r.headers[.Content-Length.])
with open(path, "wb") as code:
code.write(r.content)
#文件是否完整
length os.path.getsize(path)
print('length={}'.format(length))
print('total_length={}'.format(total_length))
if total_length ! length:
#刪1^旧文件
os.remove(path)
#重新下载
download(path^ link)
else:
print('download success')
#添加到已下载
file_name os.path.basename(path)
content_id file_name[:5]
redis—cache.srem('Task:begin_, content_id)
redis—cache.sadd(*Task:finish *, content_id)
# print(r.text)
except requests.exceptions.ConnectionError as e:
print('Error',e.args)
def main():
redis—cache RedisCache()
#检^任务列表是否在已下载列表中
keys redis—cache.keys(’Task:id:[0-9]* _)
# print(keys)
new_key [key.decode〇 for key in keys]
# print(new_key)
#按1(1排序_
new_key sorted(new_key,feey = Lambda split(':')[2]))
# print(new_key)
for key in new_key:
task_id key.split(•:•)[2]
# print(task_id)
is_finish redis_cache.sismember('Task:finish', task_id)
is_begin redis—cache.sismember('Task:begin_, task_id)
if is_finish =1:
print('Task {} is finish'.format(task_id))
elif is_begin =1:
print('Task {} is begin'.format(task_id))
else:
file—name json.loads(redis_cache.get(key).decode('utf-8').replace("\"\""))['file_name']
file_url json.loads(redis_cache.get(key).decode('utf-8').replace("\"\""))['file_url']
# print(file_url)
begin_task(task_id, file—name, file_url)
if _name_ '_main_':
main()
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-05-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序猿的栖息地 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档