前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大规模异步新闻爬虫【6】:用asyncio实现异步爬虫

大规模异步新闻爬虫【6】:用asyncio实现异步爬虫

作者头像
一墨编程学习
发布2019-05-15 11:13:04
1.3K0
发布2019-05-15 11:13:04
举报

关于异步IO这个概念,可能有些小猿们不是非常明白,那就先来看看异步IO是怎么回事儿。 为了大家能够更形象得理解这个概念,我们拿放羊来打个比方:

  • 下载请求开始,就是放羊出去吃草;
  • 下载任务完成,就是羊吃饱回羊圈。

同步放羊的过程就是这样的: 羊倌儿小同要放100只羊,他就先放一只羊出去吃草,等羊吃饱了回来在放第二只羊,等第二只羊吃饱了回来再放第三只羊出去吃草……这样放羊的羊倌儿实在是……

再看看异步放羊的过程: 羊倌儿小异也要放100只羊,他观察后发现,小同放羊的方法比较笨,他觉得草地一下能容下10只羊(带宽)吃草,所以它就一次放出去10只羊等它们回来,然后他还可以给羊剪剪羊毛。有的羊吃得快回来的早,他就把羊关到羊圈接着就再放出去几只,尽量保证草地上都有10只羊在吃草。

很明显,异步放羊的效率高多了。同样的,网络世界里也是异步的效率高。

到了这里,可能有小猿要问,为什么不用多线程、多进程实现爬虫呢? 没错,多线程和多进程也可以提高前面那个同步爬虫的抓取效率,但是异步IO提高的更多,也更适合爬虫这个场景。后面机会我们可以对比一下三者抓取的效率。

1. 异步的downloader

还记得我们之前使用requests实现的那个downloader吗?同步情况下,它很好用,但不适合异步,所以我们要先改造它。幸运的是,已经有aiohttp模块来支持异步http请求了,那么我们就用aiohttp来实现异步downloader。

代码语言:javascript
复制
async def fetch(session, url, headers=None, timeout=9):
    _headers = {
        'User-Agent': ('Mozilla/5.0 (compatible; MSIE 9.0; '
                       'Windows NT 6.1; Win64; x64; Trident/5.0)'),
    }
    if headers:
        _headers = headers
    try:
        async with session.get(url, headers=_headers, timeout=timeout) as response:
            status = response.status
            html = await response.read()
            encoding = response.get_encoding()
            if encoding == 'gb2312':
                encoding = 'gbk'
            html = html.decode(encoding, errors='ignore')
            redirected_url = str(response.url)
    except Exception as e:
        msg = 'Failed download: {} | exception: {}, {}'.format(url, str(type(e)), str(e))
        print(msg)
        html = ''
        status = 0
        redirected_url = url
    return status, html, redirected_url

这个异步的downloader,我们称之为fetch(),它有两个必须参数:

  • seesion: 这是一个aiohttp.ClientSession的对象,这个对象的初始化在crawler里面完成,每次调用fetch()时,作为参数传递。
  • url:这是需要下载的网址。

实现中使用了异步上下文管理器(async with),编码的判断我们还是用cchardet来实现。 有了异步下载器,我们的异步爬虫就可以写起来啦~

2. 异步新闻爬虫

跟同步爬虫一样,我们还是把整个爬虫定义为一个类,它的主要成员有:

  • self.urlpool 网址池
  • self.loop 异步的事件循环
  • self.seesion aiohttp.ClientSession的对象,用于异步下载
  • self.db 基于aiomysql的异步数据库连接
  • self._workers 当前并发下载(放出去的羊)的数量

通过这几个主要成员来达到异步控制、异步下载、异步存储(数据库)的目的,其它成员作为辅助。爬虫类的相关方法,参加下面的完整实现代码:

代码语言:javascript
复制
#!/usr/bin/env python3
# File: news-crawler-async.py
# Author: veelion

import traceback
import time
import asyncio
import aiohttp
import urllib.parse as urlparse
import farmhash
import lzma

import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

import sanicdb

from urlpool import UrlPool
import functions as fn
import config

class NewsCrawlerAsync:
    def __init__(self, name):
        self._workers = 0
        self._workers_max = 30
        self.logger = fn.init_file_logger(name+ '.log')

        self.urlpool = UrlPool(name)

        self.loop = asyncio.get_event_loop()
        self.session = aiohttp.ClientSession(loop=self.loop)
        self.db = sanicdb.SanicDB(
            config.db_host,
            config.db_db,
            config.db_user,
            config.db_password,
            loop=self.loop
        )

    async def load_hubs(self,):
        sql = 'select url from crawler_hub'
        data = await self.db.query(sql)
        self.hub_hosts = set()
        hubs = []
        for d in data:
            host = urlparse.urlparse(d['url']).netloc
            self.hub_hosts.add(host)
            hubs.append(d['url'])
        self.urlpool.set_hubs(hubs, 300)

    async def save_to_db(self, url, html):
        urlhash = farmhash.hash64(url)
        sql = 'select url from crawler_html where urlhash=%s'
        d = await self.db.get(sql, urlhash)
        if d:
            if d['url'] != url:
                msg = 'farmhash collision: %s <=> %s' % (url, d['url'])
                self.logger.error(msg)
            return True
        if isinstance(html, str):
            html = html.encode('utf8')
        html_lzma = lzma.compress(html)
        sql = ('insert into crawler_html(urlhash, url, html_lzma) '
               'values(%s, %s, %s)')
        good = False
        try:
            await self.db.execute(sql, urlhash, url, html_lzma)
            good = True
        except Exception as e:
            if e.args[0] == 1062:
                # Duplicate entry
                good = True
                pass
            else:
                traceback.print_exc()
                raise e
        return good

    def filter_good(self, urls):
        goodlinks = []
        for url in urls:
            host = urlparse.urlparse(url).netloc
            if host in self.hub_hosts:
                goodlinks.append(url)
        return goodlinks

    async def process(self, url, ishub):
        status, html, redirected_url = await fn.fetch(self.session, url)
        self.urlpool.set_status(url, status)
        if redirected_url != url:
            self.urlpool.set_status(redirected_url, status)
        # 提取hub网页中的链接, 新闻网页中也有“相关新闻”的链接,按需提取
        if status != 200:
            return
        if ishub:
            newlinks = fn.extract_links_re(redirected_url, html)
            goodlinks = self.filter_good(newlinks)
            print("%s/%s, goodlinks/newlinks" % (len(goodlinks), len(newlinks)))
            self.urlpool.addmany(goodlinks)
        else:
            await self.save_to_db(redirected_url, html)
        self._workers -= 1

    async def loop_crawl(self,):
        await self.load_hubs()
        last_rating_time = time.time()
        counter = 0
        while 1:
            tasks = self.urlpool.pop(self._workers_max)
            if not tasks:
                print('no url to crawl, sleep')
                await asyncio.sleep(3)
                continue
            for url, ishub in tasks.items():
                self._workers += 1
                counter += 1
                print('crawl:', url)
                asyncio.ensure_future(self.process(url, ishub))

            gap = time.time() - last_rating_time
            if gap > 5:
                rate = counter / gap
                print('\tloop_crawl() rate:%s, counter: %s, workers: %s' % (round(rate, 2), counter, self._workers))
                last_rating_time = time.time()
                counter = 0
            if self._workers > self._workers_max:
                print('====== got workers_max, sleep 3 sec to next worker =====')
                await asyncio.sleep(3)

    def run(self):
        try:
            self.loop.run_until_complete(self.loop_crawl())
        except KeyboardInterrupt:
            print('stopped by yourself!')
            del self.urlpool
            pass

if __name__ == '__main__':
    nc = NewsCrawlerAsync('yrx-async')
    nc.run()

爬虫的主流程是在方法loop_crawl()里面实现的。它的主体是一个while循环,每次从self.urlpool里面获取定量的爬虫作为下载任务(从羊圈里面选出一批羊),通过ensure_future()开始异步下载(把这些羊都放出去)。而process()这个方法的流程是下载网页并存储、提取新的url,这就类似羊吃草、下崽等。

通过self._workersself._workers_max来控制并发量。不能一直并发,给本地CPU、网络带宽带来压力,同样也会给目标服务器带来压力。

至此,我们实现了同步和异步两个新闻爬虫,分别实现了NewsCrawlerSync和NewsCrawlerAsync两个爬虫类,他们的结构几乎完全一样,只是抓取流程一个是顺序的,一个是并发的。小猿们可以通过对比两个类的实现,来更好的理解异步的流程。

爬虫知识点

1. uvloop模块 uvloop这个模块是用Cython编写建立在libuv库之上,它是asyncio内置事件循环的替代,使用它仅仅是多两行代码而已:

代码语言:javascript
复制
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

uvloop使得asyncio很快,比odejs、gevent和其它Python异步框架的快至少2倍,接近于Go语言的性能。

uvloop作者的性能测试

这是uvloop作者的性能对比测试。 目前,uvloop不支持Windows系统和Python 3.5 及其以上版本,这在它源码的setup.py文件中可以看到:

代码语言:javascript
复制
if sys.platform in ('win32', 'cygwin', 'cli'):
    raise RuntimeError('uvloop does not support Windows at the moment')

vi = sys.version_info
if vi < (3, 5):
    raise RuntimeError('uvloop requires Python 3.5 or greater')

所以,使用Windows的小猿们要运行异步爬虫,就要把uvloop那两行注释掉哦。

思考题

1. 给同步的downloader()或异步的fetch()添加功能 或许有些小猿还没见过这样的html代码,它出现在<head>里面:

代码语言:javascript
复制
<meta http-equiv="refresh" content="5; url=https://example.com/">

它的意思是,告诉浏览器在5秒之后跳转到另外一个url:https://example.com/。 那么问题来了,请给downloader(fetch())添加代码,让它支持这个跳转。

2. 如何控制hub的刷新频率,及时发现最新新闻 这是我们写新闻爬虫要考虑的一个很重要的问题,我们实现的新闻爬虫中并没有实现这个机制,小猿们来思考一下,并对手实现实现。

后面的章节,是介绍如何使用工具,比如如何使用charles抓包,如何管理浏览器cookie,如何使用selenium等等,也欢迎你的阅读。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019.05.06 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 异步的downloader
  • 2. 异步新闻爬虫
  • 爬虫知识点
  • 思考题
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档