前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >主从分布式爬虫

主从分布式爬虫

作者头像
py3study
发布2020-01-16 00:37:42
7220
发布2020-01-16 00:37:42
举报
文章被收录于专栏:python3

为什么要用分布式爬虫

  • 学习爬虫已经有一段时间了,之前的爬虫都是一个python文件就实现的,没考虑性能,效率之类的。所以作为一个合格的spider,需要学习一下分布式爬虫。
  • 什么分布式爬虫?简单地说就是用多台服务器去获取数据,让这些服务器去协同,分配各自的任务。

分布式爬虫设计

最常用的一种就是主从分布式爬虫,本文将使用Redis服务器来作为任务队列。 如图:

分布式爬虫设计
分布式爬虫设计

准备工作

  1. 安装python3和Redis
  2. 安装requests与Redis相关的库
代码语言:javascript
复制
pip install requests
pip install pyquery
pip install redis

代码

主函数(master.py)

代码语言:javascript
复制
import os
import requests
from pyquery import PyQuery as pq
import re
import json

import config
from cache import RedisCache
from model import Task


def parse_link(div):
    '''
    获取连接
    '''
    e = pq(div)
    href = e.find('a').attr('href')
    return href

def get_from_url(url):
    '''
    获取列表连接
    '''
    page = get_page(url)
    e = pq(page)
    items = e('.epiItem.video')
    links = [parse_link(i) for i in items]
    print(len(links))
    links.reverse()
    return links

def get_page(url):
    '''
    获取页面
    '''
    proxies = config.proxies
    try:
        res = requests.get(url,proxies=proxies)
        # print(res.text)
    except requests.exceptions.ConnectionError as e:
        print('Error',e.args)
    page = res.content
    return page

def get_all_file(path, fileList=[]):
    '''
    获取目录下所有文件
    '''
    get_dir = os.listdir(path)  #遍历当前目录,获取文件列表
    for i in get_dir:
        sub_dir = os.path.join(path,i)  # 把第一步获取的文件加入路径
        # print(sub_dir)
        if os.path.isdir(sub_dir):     #如果当前仍然是文件夹,递归调用
            get_all_file(sub_dir, fileList)
        else:
            ax = os.path.abspath(sub_dir)  #如果当前路径不是文件夹,则把文件名放入列表
            # print(ax)
            fileList.append(ax)
    return fileList

def init_finish_task(path):
    '''
    初始化已结束任务
    '''
    redis_cache = RedisCache()
    fileList = []
    fileList = get_all_file(path, fileList)
    # print(fileList)
    for file in fileList:
        file_name = os.path.basename(file)
        task_id = file_name[:5]
        # print(task_id)
        redis_cache.sadd('Task:finish', task_id)
    print('init_finish_task...end')

def init_task_url():
    '''
    初始化已结束任务url
    '''
    redis_cache = RedisCache()
    url = config.list_url
    link_list = get_from_url(url)
    for link in link_list:
        task_url = config.task_url_head+link
        # print(task_url)
        task_id = task_url[-5:]
        # redis_cache.set('Task:id:{}:url'.format(task_id),task_url)
        t = task_from_url(task_url)
        # print('add task {}'.format(t.__dict__))
        print('add task_id {}'.format(task_id))
        redis_cache.set('Task:id:{}'.format(task_id), t.__dict__)
        # print(t)
    print('init_task_url...end')  

def task_from_url(task_url):
    '''
    获取任务
    '''
    page = get_page(task_url)
    e = pq(page)

    task_id = task_url[-5:]
    title = e('.controlBar').find('.epi-title').text().replace('/', '-').replace(':',':')
    file_url = e('.audioplayer').find('audio').attr('src')
    ext = file_url[-4:]
    file_name = task_id+'.'+title+ext
    # content = e('.epi-description').html()

    t = Task()
    t.id = task_id
    t.title = title
    t.url = task_url
    t.file_name = file_name
    t.file_url =  file_url
    # t.content =  content
    return t

def main():
    init_task_url()
    init_finish_task(config.down_folder)

if __name__ == '__main__':
    main()

从函数(salver.py)

代码语言:javascript
复制
import os
import requests
from pyquery import PyQuery as pq
import re
import json

import config
from cache import RedisCache


def get_page(url):
    '''
    获取页面
    '''
    proxies = config.proxies
    try:
        res = requests.get(url,proxies=proxies)
        # print(res.text)
    except requests.exceptions.ConnectionError as e:
        print('Error',e.args)
    page = res.content
    return page


def begin_task(task_id, file_name, file_url):
    print('begin task {}'.format(task_id))
    redis_cache = RedisCache()
    # 添加到正在下载列表
    redis_cache.sadd('Task:begin', task_id)
    print('download...{}'.format(file_name))
    folder = config.down_folder
    path = os.path.join(folder, file_name)
    download(file_url, path)
    print('end task {}'.format(task_id))


def download(link, path):
    redis_cache = RedisCache()
    proxies = config.proxies
    if os.path.exists(path):
        print('file exist')
    else:
        try:
            r = requests.get(link,proxies=proxies,stream=True)
            total_length = int(r.headers['Content-Length'])
            with open(path, "wb") as code:
                code.write(r.content)
                # 文件是否完整
                length = os.path.getsize(path)
                print('length={}'.format(length))
                print('total_length={}'.format(total_length))
                if  total_length != length:
                    # 删除旧文件
                    os.remove(path)
                    # 重新下载
                    download(path, link)
                else:
                    print('download success')
                    # 添加到已下载
                    file_name = os.path.basename(path)
                    content_id = file_name[:5]
                    redis_cache.srem('Task:begin', content_id)
                    redis_cache.sadd('Task:finish', content_id)
            # print(r.text)
        except requests.exceptions.ConnectionError as e:
            print('Error',e.args)


def main():
    redis_cache = RedisCache()
    #  检查任务列表是否在已下载列表中
    keys = redis_cache.keys('Task:id:[0-9]*')
    # print(keys)
    new_key = [key.decode() for key in keys]
    # print(new_key)
    # 按id排序
    new_key = sorted(new_key,key = lambda i:int(i.split(':')[2]))
    # print(new_key)
    for key in new_key:

        
        task_id = key.split(':')[2]
        # print(task_id)
        is_finish = redis_cache.sismember('Task:finish', task_id)
        is_begin = redis_cache.sismember('Task:begin', task_id)
        if is_finish==1:
            print('Task {} is finish'.format(task_id))
        elif is_begin==1:
            print('Task {} is begin'.format(task_id))
        else:
            file_name = json.loads(redis_cache.get(key).decode('utf-8').replace("\'", "\""))['file_name']
            file_url = json.loads(redis_cache.get(key).decode('utf-8').replace("\'", "\""))['file_url']
            # print(file_url)
            begin_task(task_id, file_name, file_url)

if __name__ == '__main__':
    main()

完整代码的地址

https://github.com/zhourunliang/master-slave-crawler

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019/05/22 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 为什么要用分布式爬虫
  • 分布式爬虫设计
  • 准备工作
  • 代码
    • 主函数(master.py)
      • 从函数(salver.py)
        • 完整代码的地址
        相关产品与服务
        云数据库 Redis
        腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档