专栏首页程序员的知识天地大规模异步新闻爬虫【2】:实现功能强大,简洁易用的网址池(URL Pool)

大规模异步新闻爬虫【2】:实现功能强大,简洁易用的网址池(URL Pool)

对于比较大型的爬虫来说,URL管理的管理是个核心问题,管理不好,就可能重复下载,也可能遗漏下载。这里,我们设计一个URL池来管理URL。 这个URL池就是一个生产者 - 消费者模式:

生产者 - 消费者流程图

依葫芦画瓢,URLPool就是这样的

设计的网络爬虫URLPool

我们从网址池的使用目的出发来设计网址池的接口,它应该具有以下功能:

  • 往池子里面添加URL;
  • 从池子里面取URL以下载;
  • 池子内部要管理URL状态;

前面我提到的网址的状态有以下4中:

  • 已经下载成功
  • 下载多次失败无需再下载
  • 正在下载
  • 下载失败要再次尝试

前两个是永久状态,也就是已经下载成功的不再下载,多次尝试后仍失败的也就不再下载,它们需要永久存储起来,以便爬虫重启后,这种永久状态记录不会消失,已经成功下载的网址不再被重复下载永久存储的方法有很多种:

比如,直接写入文本文件,但它不利于查找某个URL是否已经存在文本中; 比如,直接写入的MySQL等关系型数据库,它利用查找,但是速度又比较慢, 比如,使用键值数据库,查找和速度都符合要求,是不错的选择!

我们这个URL池选用LevelDB来作为URL状态的永久存储.LevelDB是谷歌开源的一个键值数据库,速度非常快,同时自动压缩数据。我们用它先来实现一个UrlDB作为永久存储数据库。

UrlDB的实现

import leveldb

class UrlDB:
    '''Use LevelDB to store URLs what have been done(succeed or faile)
    '''
    status_failure = b'0'
    status_success = b'1'

    def __init__(self, db_name):
        self.name = db_name + '.urldb'
        self.db = leveldb.LevelDB(self.name)

    def load_from_db(self, status):
        urls = []
        for url, _status in self.db.RangeIter():
            if status == _status:
                urls.append(url)
        return urls

    def set_success(self, url):
        if isinstance(url, str):
            url = url.encode('utf8')
        try:
            self.db.Put(url, self.state_success)
            s = True
        except:
            s = False
        return s

    def set_failure(self, url):
        if isinstance(url, str):
            url = url.encode('utf8')
        try:
            self.db.Put(url, self.status_failure)
            s = True
        except:
            s = False
        return s

    def has(self, url):
        if isinstance(url, str):
            url = url.encode('utf8')
        try:
            attr = self.db.Get(url)
            return attr
        except:
            pass
        return False

UrlDB将被UrlPool使用,主要有三个方法被使用:

  • has(url)查看是否已经存在某url
  • set_success(url)存储url状态为成功
  • set_failure(url)存储url状态为失败

UrlPool的实现

而正在下载和下载失败次数这两个URL的状态只需暂时保存在内容即可,我们把它们放到UrlPool这个类中进行管理接着我们来实现网址池:

#Author: veelion

import pickle
import leveldb
import time
import urllib.parse as urlparse

class UrlPool:
    '''URL Pool for crawler to manage URLs
    '''

    def __init__(self, pool_name):
        self.name = pool_name
        self.db = UrlDB(pool_name)

        self.pool = {}  # host: set([urls]), 记录待下载URL
        self.pending = {}  # url: pended_time, 记录已被pend但还未被更新状态(正在下载)的URL
        self.failure = {}  # url: times, 记录失败的URL的次数
        self.failure_threshold = 3
        self.pending_threshold = 60  # pending的最大时间,过期要重新下载
        self.in_mem_count = 0
        self.max_hosts = ['', 0]  # [host: url_count] 目前pool中url最多的host及其url数量
        self.hub_pool = {}  # {url: last_query_time}
        self.hub_refresh_span = 0
        self.load_cache()

    def load_cache(self,):
        path = self.name + '.pkl'
        try:
            with open(path, 'rb') as f:
                self.pool = pickle.load(f)
            cc = [len(v) for k, v in self.pool]
            print('saved pool loaded! urls:', sum(cc))
        except:
            pass

    def set_hubs(self, urls, hub_refresh_span):
        self.hub_refresh_span = hub_refresh_span
        self.hub_pool = {}
        for url in urls:
            self.hub_pool[url] = 0

    def set_status(self, url, status_code):
        if url in self.pending:
            self.pending.pop(url)

        if status_code == 200:
            self.db.set_success(url)
            return
        if status_code == 404:
            self.db.set_failure(url)
            return
        if url in self.failure:
            self.failure[url] += 1
            if self.failure[url] > self.failure_threshold:
                self.db.set_failure(url)
                self.failure.pop(url)
            else:
                self.add(url)
        else:
            self.failure[url] = 1

    def push_to_pool(self, url):
        host = urlparse.urlparse(url).netloc
        if not host or '.' not in host:
            print('try to push_to_pool with bad url:', url, ', len of ur:', len(url))
            return False
        if host in self.pool:
            if url in self.pool[host]:
                return True
            self.pool[host].add(url)
            if len(self.pool[host]) > self.max_hosts[1]:
                self.max_hosts[1] = len(self.pool[host])
                self.max_hosts[0] = host
        else:
            self.pool[host] = set([url])
        self.in_mem_count += 1
        return True

    def add(self, url, always):
        if always:
            return self.push_to_pool(url)
        pended_time = self.pending.get(url, 0)
        if time.time() - pended_time < self.pending_threshold:
            print('being downloading:', url)
            return
        if self.db.has(url):
            return
        if pended_time:
            self.pending.pop(url)
        return self.push_to_pool(url)

    def addmany(self, urls, always=False):
        if isinstance(urls, str):
            print('urls is a str !!!!', urls)
            self.add(urls, always)
        else:
            for url in urls:
                self.add(url, always)

    def pop(self, count, hubpercent=50):
        print('\n\tmax of host:', self.max_hosts)

        # 取出的url有两种类型:hub=1, 普通=2
        url_attr_url = 0
        url_attr_hub = 1
        # 1\. 首先取出hub,保证获取hub里面的最新url.
        hubs = {}
        hub_count = count * hubpercent // 100
        for hub in self.hub_pool:
            span = time.time() - self.hub_pool[hub]
            if span < self.hub_refresh_span:
                continue
            hubs[hub] = url_attr_hub  # 1 means hub-url
            self.hub_pool[hub] = time.time()
            if len(hubs) >= hub_count:
                break

        # 2\. 再取出普通url
        # 如果某个host有太多url,则每次可以取出3(delta)个它的url
        if self.max_hosts[1] * 10 > self.in_mem_count:
            delta = 3
            print('\tset delta:', delta, ', max of host:', self.max_hosts)
        else:
            delta = 1
        left_count = count - len(hubs)
        urls = {}
        for host in self.pool:
            if not self.pool[host]:
                # empty_host.append(host)
                continue
            if self.max_hosts[0] == host:
                while delta > 0:
                    url = self.pool[host].pop()
                    self.max_hosts[1] -= 1
                    if not self.pool[host]:
                        break
                    delta -= 1
            else:
                url = self.pool[host].pop()
            urls[url] = url_attr_url
            self.pending[url] = time.time()
            if len(urls) >= left_count:
                break
        self.in_mem_count -= len(urls)
        print('To pop:%s, hubs: %s, urls: %s, hosts:%s' % (count, len(hubs), len(urls), len(self.pool)))
        urls.update(hubs)
        return urls

    def size(self,):
        return self.in_mem_count

    def empty(self,):
        return self.in_mem_count == 0

    def __del__(self):
        path = self.name + '.pkl'
        try:
            with open(path, 'wb') as f:
                pickle.dump(self.pool, f)
            print('self.pool saved!')
        except:
            pass

UrlPool的实现有些复杂,且听我一一分解。

UrlPool的使用

先看看它的主要成员及其用途:

  • self.db是一个UrlDB的示例,用来永久存储url的永久状态
  • self.pool是用来存放url的,它是一个字典(dict)结构,key是url的主机,值是一个用来存储这个主机的所有url的集合(set)。
  • self.pending用来管理正在下载的url状态。它是一个字典结构,key是url,value是它被pop的时间戳。当一个url被pop()时,就是它被下载的开始。当该url被set_status()时,就是下载结束的时刻。如果一个url被添加()入池时,发现它已经被套的时间超过pending_threshold时,就可以再次入库等待被下载。否则,暂不入池。
  • self.failue是一个字典,key是url,value是识别的次数,超过failure_threshold就会被永久记录为失败,不再尝试下载。
  • hub_pool是一个用来存储hub page面的字典,key是hub url,value是上次刷新该hub页面的时间。

以上成员就构成了我们这个网址池的数据结构,再通过以下成员方法对这个网址池进行操作:

1. load_cache()和dump_cache()对网址池进行缓存 load_cache()在init()中调用,创建池的时候,尝试去加载上次退出时缓存的URL池; dump_cache()在del()中调用,也就是在网址池销毁前(比如爬虫意外退出),把内存中的URL pool缓存到硬盘。 这里使用了pickle模块,这是一个把内存数据序列化到硬盘的工具。

** 2. set_hubs()方法设置hub URL ** hub网页就是像百度新闻那样的页面,整​​个页面都是新闻的标题和链接,是我们真正需要的新闻的聚合页面,并且这样的页面会不断更新,把最新的新闻聚合到这样的页面,我们称它们为hub页面,其URL就是hub url。在新闻爬虫中添加大量的这样的url,有助于爬虫及时发现并抓取最新的新闻。 该方法就是将这样的hub url列表传给网址池,在爬虫从池中取URL时,根据时间间隔(self.hub_refresh_span)来取集枢纽网址。

** 3. add(),addmany(),push_to_pool()对网址池进行入池操作** 把url放入网址池时,先检查内存中的self.pending是否存在该url,即是否正在下载该。网址如果正在下载就不入池;如果正下载或已经超时,就进行到下一步; 接着检查该网址是否已经在性LevelDB中存在,存在就表明之前已经成功下载或彻底失败,不再下载了也不入池。如果没有则进行到下一步; 最后通过push_to_pool()把url放入self.pool中。存放的规则是,按照url的主机进行分类,相同主机的url放到一起,在取出时 -个主取一个url,尽量保证每次取出的一批url都是指向不同的服务器的,这样做的目的也是为了尽量减少对抓取目标服务器的请求压力。力争做一个服务器友好的爬虫O(∩∩ _∩)O

** 4. pop()对网址池进行出池操作** 爬虫通过该方法,从网址池中获取一批url去下载。取出url分两步: 第一步,先从self.hub_pool中获得,方法是遍历hub_pool,检查每个集线器-URL距上次被弹出的时间间隔是否超过毂页面刷新间隔(self.hub_refresh_span),来决定毂-URL是否应该被弹出。 第二步,从self.pool中获取。前面push_to_pool中,介绍了流行的原则,就是每次取出的一批URL都是指向不同服务器的,有了self.pool的特殊数据结构,安装这个原则获取网址就简单了,按主机(自我.pool的键)遍历self.pool即可。

** 5. set_status()方法设置网址池中url的状态** 其参数status_code是http响应的状态码。爬虫在下载完URL后进行url状态设置。 首先,把该url成self.pending中删除,已经下载完毕,不再是未决状态; 接着,根据STATUS_CODE来设置URL状态,200和404的直接设置为永久状态;其它状态就记录失败次数,并再次入池进行后续下载尝试。

通过以上成员变量和方法,我们把这个网址池(UrlPool)解析的清清楚楚。小猿们可以毫不客气的收藏起来,今后在写爬虫时可以用它方便的管理URL,并且这个实现只有一个PY文件,方便加入到任何项目中。

爬虫知识点

1.网址的管理 网址的管理,其目的就是为了:不重抓,不漏抓。

2. pickle模块 把内存数据保存到硬盘,再把硬盘数据重新加载到内存,这是很多程序停止和启动的必要步骤.pickle就是实现数据在内存和硬盘之间转移的模块。

3. leveldb模块 这是一个经典且强大的硬盘型key-value数据库,非常适合url-status这种结构的存储。

4. urllib.parse 解析网址的模块,在处理url时首先想到的模块就应该是它。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 大规模异步新闻爬虫【4】:实现一个同步定向新闻爬虫

    前面,我们先写了一个简单的百度新闻爬虫,可是它槽点满满。接着,我们实现了一些模块,来为我们的爬虫提供基础功能,包括:网络请求、网址池、MySQL封装。

    一墨编程学习
  • 用python抓取某腾视频所有电影的爬虫,不用钱就可以看会员电影!

    一墨编程学习
  • JavaScript编程趋势:用map和filter替换forEach

    当你需要拷贝一个数组的全部或者部分到一个新数组的时候,优先使用map和filter而不是forEach。

    一墨编程学习
  • 外行学 Python 爬虫 第四篇 URL 去重

    当你可以从网站上获取网页,也可以将网页中有效的信息提取出来以后,接下来你会做什么?我想它一定是一个肯定的答案『获取整个网站的内容』,毕竟只获取网站上一个网页的内...

    keinYe
  • python爬虫:爬取笔趣小说网站首页所有的小说内容,并保存到本地(单线程爬取,似乎有点慢)

    这几天在进行新的内容学习,并且在尝试使用据说是全宇宙唯一一款专门开发python的ide工具,叫做pycharm。

    HUBU生信
  • php解决新浪图床防盗链接口

    将以上6行代码复制然后在你服务器上新建一个php文件,内容就是这6行代码,使用的时候在你文件的远程路径后加?url=图片链接:

    许坏
  • python对url格式解析的方法

    本文实例讲述了python对url格式解析的方法。分享给大家供大家参考。具体分析如下:

    用户2398817
  • 糗事百科_多进程_demo(3)

    版权声明:Copyright © ...

    WEi_
  • python中序列的排序,包括字典排序、列表排序、升序、降序、逆序

    我们知道python中的内建序列包括字典、列表、元组、字符串等,序列是python中最基本的数据结构。

    刘金玉编程
  • Linux VIM自动提示插件安装配置

    GitHub上有大牛奉献了自己的配置方案,这里我做个推广,希望能帮助到需要使用VIM的童鞋。 作者是使用YCM(YouCompleteMe)自动补全插件,但...

    卡尔曼和玻尔兹曼谁曼

扫码关注云+社区

领取腾讯云代金券

玩转腾讯云 有奖征文活动