前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >读者投稿:使用redis和mongodb下载小说,并用pytest做测试

读者投稿:使用redis和mongodb下载小说,并用pytest做测试

作者头像
青南
发布2019-01-23 18:01:59
7040
发布2019-01-23 18:01:59
举报
文章被收录于专栏:未闻Code未闻Code

本文是《Python爬虫开发 从入门到实战》读者miniyk的在学习完第6章以后的练习作品。

周末为了熟悉mongodb和redis,写了一个抓取《白夜行》小说的程序,并且用pytest测试框架做单元测试, 使用了线程池加快下载速度:

代码语言:javascript
复制
# white_novel.py""" 使用redis存储网址,使用mongodb存储内容"""

import lxml.html  # type: ignore
import requests  # type: ignore
import redis  # type: ignore
from pymongo
import MongoClient, database
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from multiprocessing.dummy import Pool
from functools import partial
 
 
class DownloadWhite(object):
    KEY = 'urls'

    def __init__(self, workers=15, home_url='http://dongyeguiwu.zuopinj.com/5525'):
        self.workers = workers
        self.home_url = home_url
        self.redis_client = redis.StrictRedis(decode_responses=True)
        mongo_client = MongoClient()
        db: database.Database = mongo_client['Chapter6']
        self.collection = db['white_novel']

    def _clear(self):
        self.redis_client.delete(self.KEY)
        self.collection.delete_many({})

    def save_urls(self):
        home_page = requests.get(self.home_url).content.decode()
        selector = lxml.html.fromstring(home_page)
        useful = selector.xpath('//div[@class="book_list"]/ul/li')
        urls = []
        for i, li in enumerate(useful):
            url = li.xpath('a/@href')[0] if li.xpath('a/@href') else None
            urls.append(url)
        self.redis_client.rpush(self.KEY, *urls)

    def download_novel(self):
        client = self.redis_client
        contents = []
        urls = client.lrange(self.KEY, 0, -1)
        if not urls:
            return
        # method1
        # with ThreadPoolExecutor(max_workers=self.workers) as executor:
        #     futures = [executor.submit(self._download_chapter, url, contents) for url in urls]
        # for _ in as_completed(futures):
        #     pass
        # method2
        pool = Pool(self.workers)
        pool.map(partial(self._download_chapter, contents=contents), urls)
        print(f'at last insert {len(contents)} chapters')
        self.collection.insert_many(contents)

    @staticmethod
    def _download_chapter(url, contents: list) -> None:
        page = requests.get(url).content.decode()
        selector = lxml.html.fromstring(page)
        title = selector.xpath('//div[@class="h1title"]/h1/text()')[0]
        content = '\n'.join(selector.xpath('//div[@id="htmlContent"]/p/text()'))
        contents.append({'title': title, 'contnet': content})


if __name__ == '__main__':
    dlw = DownloadWhite()
    dlw._clear()
    dlw.save_urls()
    start = time.perf_counter()
    dlw.download_novel()
    print(f'time elapse {time.perf_counter() - start} seconds')

线程池的实现我试了2个方案,一种方案是ThreadPoolExecutor, 另一种方案是multiprocessing.dummy.Pool, 还用了partial这种小技巧.

不过我有个疑惑:多个线程往同一个列表contents里append,这个contents是线程安全的吗? What kinds of global value mutation are thread-safe?解答了我的疑问,由于GIL的存在,许多java中的非线程安全问题在python中不存在了,少数类似L[i] +=4这样的先读取再赋值的语句,由于不是原子操作,才可能线程不安全。

由于使用了线程池(15个线程)并发下载章节,因此13章的耗时基本等于1章的耗时

代码语言:javascript
复制
at last insert 13 chapters
time elapse 0.9961462760111317 seconds

单元测试:

代码语言:javascript
复制
import pytest  # type: ignore
import redis  # type: ignore
from pymongo import MongoClient, collection  # type: ignore
from white_novel import DownloadWhite


@pytest.fixture(scope='function')
def wld_instance():
    print('start')
    dlw = DownloadWhite()
    dlw._clear()
    yield dlw
    dlw._clear()
    print('end')

@pytest.fixture(scope='module')
def redis_client():
    print('init redis')
    return redis.StrictRedis(decode_responses=True)

@pytest.fixture(scope='module')
def white_novel_collection() -> collection.Collection:
    print('init mongo')
    mongo_client = MongoClient()
    database = mongo_client['Chapter6']
    collection = database['white_novel']
    return collection
 
def test_download(wld_instance, redis_client, white_novel_collection):
    wld_instance.save_urls()
    wld_instance.download_novel()
    assert redis_client.llen(wld_instance.KEY) == 13
    assert white_novel_collection.count_documents(filter={}) == 13

def test_not_save_url_download(wld_instance, redis_client, white_novel_collection):
    wld_instance.download_novel()
    assert redis_client.llen(wld_instance.KEY) == 0
    assert white_novel_collection.count_documents(filter={}) == 0

def test_only_save_url(wld_instance, redis_client, white_novel_collection):
    wld_instance.save_urls()
    assert redis_client.llen(wld_instance.KEY) == 13
    assert white_novel_collection.count_documents(filter={}) == 0

最终抓取的结果如下:

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-01-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 未闻Code 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档