前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >python多线程抓取小说

python多线程抓取小说

作者头像
陨石坠灭
发布2020-05-22 16:12:48
1.2K0
发布2020-05-22 16:12:48
举报
文章被收录于专栏:全栈之路全栈之路

这几天在写js脚本,突然想写一个抓取小说的脚本,于是磕磕碰碰,慢慢写了一个比较完善的脚本,同时对于自身所学进一步巩固。

1. 环境

  • python版本: Python 3.7.3
  • 编辑器:VScode
  • Python插件: ms-python.python
  • 操作系统: MAC

setings.json配置:

代码语言:javascript
复制
{
  "python.pythonPath": "/usr/local/bin/python3",
  "python.formatting.provider": "black"
}

launch.json配置:

代码语言:javascript
复制
{
  // 使用 IntelliSense 了解相关属性。
  // 悬停以查看现有属性的描述。
  // 欲了解更多信息,请访问: https://go.microsoft.com/fwlink/?linkid=830387
  "version": "0.2.0",
  "configurations": [
    {
      "name": "Python: file",
      "type": "python",
      "request": "launch",
      "program": "${file}",
      "args": ["-g", "5", "-b"],
      "console": "integratedTerminal"
    }
  ]
}

2. python依赖安装

代码语言:javascript
复制
# 初始化 beautifulSoup4
pip3 install bs4
pip3 install lxml

pip3 install json # 初始化json
pip3 install fnmatch # 查找文件
pip3 install hashlib # md5
pip3 install requests # 网络请求

pip3 install threading # 线程

3. 自定义库 - common

目录结构:

代码语言:javascript
复制
- common

  - __init__.py

  - util.py # 工具类

  - thread_manager.py # 线程管理器
3.1. util.py 工具类

方法介绍:

代码语言:javascript
复制
class Util # 工具类
  def write_file(self, filename, data, mode="w+") # 存储文件
  def append_file(self, filename, data) # 追加数据到文件
  def read_file(self, filename, mode="r") # 读取文件
  def soup(self, url) # 抓取网页
  def list_files(self, path, fnexp) # 搜索文件
  def now(self)  # 当前时间 毫秒级
  def now_s(self)  # 当前时间 秒级
  def recode_begin(self, group="default") # 记录开始时间
  def recode_end(self, group="default") # 打印距开时间的时间差
  def time_format(self, timestamp, parttern="%Y-%m-%d %H:%M:%S") # 格式化时间
  def md5(self, content) # 获取字符串md5码
3.2. thread_manager.py 多线程管理类

方法介绍:

代码语言:javascript
复制
class ThreadManager:
    def __init__(self, max_size=10) # 初始化 max_size 数据量
    def exit(self) # 线程退出
    def lock(self) # 加锁
    def unlock(self) # 解锁
    def put_data(self, data) # 添加数据
    def put_thread(self, thread) # 添加并启动线程
    def put_cbk_thread(self, thread_name, cbk, repeat=1) # 添加方法,方法会在一个线程中执行
    def join(self) # 等待所有线程执行完成
    def wait(self) # 等待数据全部被消费
3.3 init.py 文件
代码语言:javascript
复制
# coding=utf-8

__version__ = "1.0.0"
__all__ = ["cm_util", "ThreadManager"]

__author__ = "yszm"

from .util import *
from .thread_manager import *

4. 抓取小说

抓取小说总共分为3部分内容: 标题、目录和具体内容

但这3部分抓取方法大同小异,都是通过选择器选择对应的元素,过滤掉不必要的元素,然后获取相对应的属性和文本,然后对文件进行缩进。替换等处理

这里以67书吧的小说《第一序列》为例,地址: https://www.67shu.com/111/111473

代码语言:javascript
复制
import time
import json
import sys
import os

if __name__ == "__main__":
    from common import *
else:
    from .common import *


URL1 = "https://www.67shu.com/111/111473/"
URL2 = "https://www.67shu.com/40/40190/"

URL = "story/result/{}.txt"


def get_cpt(url):
    doc = cm_util.soup(url)

    data = {"name": "unknow"}
    # 获取标题
    h1 = doc.select(".book_info .xiaoshuo h1")
    if len(h1) > 0:
        data["name"] = h1[0].contents[0]

    # 获取所有链接
    links = doc.select(".novel_list .book_article_listtext dd a")
    cp_arr = []
    for item in links:
        cp_arr.append(
            {"url": (url + "{}").format(item.get("href")), "name": item.string}
        )
    data["cp"] = cp_arr
    return data


def get_content(data):
    dest_file = URL.format(data["name"])
    cm_util.write_file(dest_file, "")

    for item in data["cp"]:
        doc = cm_util.soup(item["url"])
        con = doc.select(".yuedu_page .yuedu_zhengwen")
        if len(con) > 0:
            c = con[0].text
            txt = (
                c.replace("\\n", "\n")
                .replace("\\r", "")
                .replace("\xa0", "")
                .replace("一秒记住【67书吧 www.67shu.com】,", "")
                .replace("精彩小说无弹窗免费阅读!", "")
                .replace("            ", "  ")
                .replace("        ", "")
            )
            print("get data: %s" % item["name"])
            cm_util.write_file(dest_file, "\n\n%s\n\n" % item["name"], "a+")
            cm_util.write_file(dest_file, txt, "a+")


if __name__ == "__main__":
    get_content(get_cpt(URL2))

看起来,是不是特别简单呢?

不过,这样一章章地抓取太慢了,尤其是一些大牛,写了几千章,抓取就特别费时了,这时候,就需要采用多线程抓取了。

5. 多线程抓取小说

采用自定义线程管理器类:ThreadManager

需要实现方法: def handle_data(data, thread_id. thread_name)

这里以全本小说网的小说《英雄联盟我的时代》为例:

代码语言:javascript
复制
import time
import json
import sys
import os

if __name__ == "__main__":
    from common import *
else:
    from .common import *

URL1 = "http://www.126shu.com/99596/"
URL_CONTENT = "http://www.126shu.com/{}"

URL_RESULT = "story/result/{}.txt"
URL_DATA = "story/data/{}.txt"

def get_cpt(url):
    doc = cm_util.soup(url)

    data = {"name": "unknow"}
    # 获取标题
    h1 = doc.select("#info .hh")
    if len(h1) > 0:
        data["name"] = h1[0].string

    # 获取所有链接
    links = doc.select("#headlink #list dl dd a")
    cp_arr = []
    for item in links:
        cp_arr.append(
            {"url": URL_CONTENT.format(item.get("href")), "name": item.string}
        )
    data["cp"] = cp_arr
    return data


def get_text(item):
    dest_file = URL_DATA.format(item["name"])
    if os.path.exists(dest_file):
        print("exist file, so we will use cache: %s " % dest_file)
        return dest_file
    doc = cm_util.soup(item["url"])
    con = doc.select("#content")

    if len(con) > 0:
        con_l = con[0].select(".zjtj")
        if len(con_l) > 0:
            con_l[0].extract()
        con_l = con[0].select(".zjxs")
        if len(con_l) > 0:
            con_l[0].extract()
        c = con[0].text
        txt = (
            c.replace("www.126shu.com", "")
            .replace("\r", "")
            .replace("请百度搜索()", "")
            .replace("\xa0", "\n")
            .replace("\n\n\n\n", "\n\n")
            .replace("\n\n\n\n", "\n\n")
        )  # replace("\r", "\n\n").replace("         ", "")
        print("get data: %s" % item["name"])
        cm_util.write_file(dest_file, ("\n\n%s" % item["name"]) + txt, "a+")
        return dest_file
    return None


# 保存路径
text_path = {}

def get_text_thread(item, id, name):
    path = get_text(item)
    if path:
        text_path[item["name"]] = path
    else:
        print("[warn]: cannot find content: %s,%s" % (item["url"], item["name"]))


def get_content(data):
    # 小说名称
    dest_file = URL_RESULT.format(data["name"])
    cm_util.write_file(dest_file, "")

    manager = ThreadManager(len(data["cp"]))
    thread_names = [
        "thread_a",
        "thread_b",
        "thread_c",
        "thread_d"
    ]
    manager.put_data(data["cp"])
    manager.put_cbk_thread(thread_names, get_text_thread)
    # 等待队列清空
    manager.wait()
    # 通知线程是时候退出
    manager.exit()
    # 等待所有线程完成
    manager.join()

    # 按照顺序合并
    for item in data["cp"]:
        path = text_path.get(item["name"], None)
        if path:
            txt = cm_util.read_file(path)
            cm_util.append_file(dest_file, txt)

if __name__ == "__main__":
    cm_util.recode_begin()
    get_content(get_cpt(URL1))
    cm_util.recode_end()

6. 进一步升级,实现工厂方法

为了更具有通用性,所以应该抽取共同部分,通过动态注入方法,从而增加脚本的可拓展性。

不多说,放源码:

代码语言:javascript
复制
# coding=utf-8

import os

if __name__ == "__main__":
    from ..common import *
    from .parser import *
else:
    from common import *
    from story.parser import *


URL_RESULT = "python/story/result/{}.txt"
URL_DATA = "python/story/data/{}"
URL_TXT_CHAPTER = "python/story/data/{}/{}.txt"
CONFIG_DATA = "python/story/data/{}/config.json"

class Parser:
    def __init__(self, base_url=""):
        self.base_url = base_url

    def get_chapter(self, url):
        return None

    def get_text(self, item):
        return None

class StoryFactory:
    def __init__(self):
        self.url_matcher = {}
        self.max_thread_size = 10

    # 注册
    def registe(self, base_url, get_chapter, get_text):
        self.url_matcher[base_url] = {
            "base_url": base_url,
            "get_cpt": get_chapter,
            "get_text": get_text,
        }

    def registe_paser(self, p):
        self.registe(p.base_url, p.get_chapter, p.get_text)

    def match(self, url):
        for base_url in self.url_matcher:
            if url.startswith(base_url):
                return base_url
        return None

    def get_text_thread(self, item, id, name):
        conf_path = CONFIG_DATA.format(item["book_key"])
        chap_data = cm_util.read_file(conf_path)

        get_text = self.url_matcher[chap_data["base_url"]].get(
            "get_text", None
        )  # (item)
        if not get_text:
            print("[warn] not match url: %s" % item["url"])
            return
        txt = get_text(item)
        if txt:
            cm_util.write_file(
                URL_TXT_CHAPTER.format(item["book_key"], cm_util.md5(item["name"])), txt
            )
        else:
            print("[warn]: cannot find content: %s,%s %s" % (item["url"], item["name"]))

    def run(self, url):
        key = cm_util.md5(url)
        cm_util.recode_begin(key)

        base_url = self.match(url)
        if not base_url:
            print("[warn] not match url: %s" % url)
            return

        print("[info] url:[%s] %s - %s" % (key, url, base_url))

        if not os.path.exists(URL_DATA.format(key)):
            os.makedirs(URL_DATA.format(key))

        matcher = self.url_matcher[base_url].get("get_cpt", None)
        if not matcher:
            print("[warn] not match url: %s" % url)
            return
        chap_data = matcher(url)

        conf_path = CONFIG_DATA.format(key)
        if os.path.exists(conf_path):
            chap_data = cm_util.read_file(conf_path)
        else:
            chap_data["base_url"] = base_url
            for item in chap_data["chapter"]:
                name = item.get("name", None)
                if name:
                    item["key"] = cm_util.md5(name)
                item["book_key"] = key
            cm_util.write_file(conf_path, chap_data)

        manager = ThreadManager(len(chap_data["chapter"]))
        thread_names = []
        for ch in range(self.max_thread_size):
            thread_names.append("thread_%d" % ch)

        manager.put_data(chap_data["chapter"])
        manager.put_cbk_thread(thread_names, self.get_text_thread)
        # 等待队列清空
        manager.wait()
        # 通知线程是时候退出
        manager.exit()
        # 等待所有线程完成
        manager.join()

        # 小说名称
        dest_file = URL_RESULT.format(chap_data["title"])
        cm_util.write_file(dest_file, "")
        # 按照顺序合并
        for item in chap_data["chapter"]:
            ch_path = URL_TXT_CHAPTER.format(key, cm_util.md5(item["name"]))
            txt = cm_util.read_file(ch_path)
            if txt:
                cm_util.append_file(dest_file, txt)
        cm_util.recode_end(key)


story_factory = StoryFactory()

init.py文件:

代码语言:javascript
复制
# coding=utf-8

__version__ = "1.0.0"
__all__ = ["story_factory", "Parser"]

__author__ = "yszm"

from .story import *

使用:

代码语言:javascript
复制
if __name__ == "__main__":
    from story import *
else:
    from .story import *

class P67shu(Parser):
    def __init__(self):
        super().__init__("https://www.67shu.com")

    def get_chapter(self, url):
        doc = cm_util.soup(url)

        data = {"title": "unknow"}
        # 获取标题
        h1 = doc.select(".book_info .xiaoshuo h1")
        if len(h1) > 0:
            data["title"] = h1[0].contents[0]

        # 获取所有链接
        links = doc.select(".novel_list .book_article_listtext dd a")
        cp_arr = []
        for item in links:
            cp_arr.append(
                {"url": (url + "{}").format(item.get("href")), "name": item.string}
            )
        data["chapter"] = cp_arr
        return data

story_factory.registe_paser(P67shu())

if __name__ == "__main__":
    url = "https://www.67shu.com/112/112336/"
    story_factory.run(url)

7. common 源码

源码如下:

common/util.py

代码语言:javascript
复制
# coding=utf-8

import json
import os
import requests
from bs4 import BeautifulSoup
import fnmatch
import time
import hashlib

# 请求头配置
HEADER_CONFIG = {
    "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36"
}

class Util:
    def __init__(self):
        self.time_map = None

    # 存储数据到文件中
    def write_file(self, filename, data, mode="w+"):
        f = open(filename, mode)
        try:
            if isinstance(data, dict):
                data = json.dumps(data, ensure_ascii=False)
            f.write(data)
        finally:
            f.close()
        print("write data to file: %s" % filename)

    # 追加数据
    def append_file(self, filename, data):
        if not os.path.exists(filename):
            self.write_file(filename, data)
        else:
            self.write_file(filename, data, "a+")

    # 读取文件中的数据
    def read_file(self, filename, mode="r"):
        data = None
        if not os.path.exists(filename):
            return data
        f = open(filename, mode)
        try:
            data = f.read()
            if filename.endswith(".json"):
                data = json.loads(data)
        finally:
            f.close()
        print("read data from file: %s" % filename)
        return data

    # 抓取数据
    def soup(self, url):
        s = requests.session()
        s.keep_alive = False
        txt = s.get(url, headers=HEADER_CONFIG, timeout=120).content
        return BeautifulSoup(txt, "lxml")

    # 搜索文件
    def list_files(self, path, fnexp):
        for root, dirs, files in os.walk(path):
            for filename in fnmatch.filter(files, fnexp):
                yield os.path.join(root, filename)

    # 当前时间 毫秒级
    def now(self):
        return int(round(time.time() * 1000))

    # 当前时间 秒级
    def now_s(self):
        return int(time.time())

    # 记录开始时间戳
    def recode_begin(self, group="default"):
        if not self.time_map:
            self.time_map = {}
        self.time_map[group] = self.now()
        return self.time_map[group]

    # 记录结束时间戳
    def recode_end(self, group="default"):
        t = (self.now() - self.time_map[group]) / 1000

        print("[%s]: 消耗时间:%s 秒" % (group, t))
        self.time_map.pop(group)
        return t

    # 格式化时间戳
    def time_format(self, timestamp, parttern="%Y-%m-%d %H:%M:%S"):
        time_local = time.localtime(timestamp)
        return time.strftime(parttern, time_local)

    # 获取md5文本
    def md5(self, content):
        return hashlib.md5(content.encode("utf8")).hexdigest()


cm_util = Util()

common/thread_manager.py

代码语言:javascript
复制
# -*- coding: UTF-8 -*-

#
import queue
import threading
import time
#
class ThreadManager:
    def __init__(self, max_size=10):
        self.exit_flag = 0
        self.work_queue = queue.Queue(max_size)
        self.queue_lock = threading.Lock()
        self.threads = []
        self.cbk_group = {}
        self.thread_id = 100000
    #
    # 通知线程是时候退出
    def exit(self):
        self.exit_flag = 1
    #
    # 加锁
    def lock(self):
        # 填充队列
        self.queue_lock.acquire()
    #
    # 解锁
    def unlock(self):
        self.queue_lock.release()
    #
    # 添加数据
    def put_data(self, data):
        self.lock()
        for item in data:
            self.work_queue.put(item)
        self.unlock()
    #
    # 添加线程
    def put_thread(self, thread):
        thread.start()
        self.threads.append(thread)
        self.thread_id = self.thread_id + 1
    #
    # 添加回调类型的线程 cbk:def cbk(data: 数据, thread_id: 线程id, thread_name:线程名称)
    def put_cbk_thread(self, thread_name, cbk, repeat=1):
        if isinstance(thread_name, list):
            repeat = len(thread_name)
        if repeat == 1:
            thread = CBThread(self.thread_id, thread_name, self.process_data)
            self.cbk_group[self.thread_id] = cbk
            self.put_thread(thread)
        else:
            for i in range(repeat):
                name = thread_name
                if isinstance(thread_name, list):
                    name = thread_name[i]
                else:
                    name = "%s(%d)" % (thread_name, i + 1)
                thread = CBThread(self.thread_id, name, self.process_data)
                self.cbk_group[self.thread_id] = cbk
                self.put_thread(thread)
    #
    # 等待所有线程完成
    def join(self):
        for t in self.threads:
            t.join()
    #
    # 等待队列清空
    def wait(self):
        while not self.work_queue.empty():
            pass
    #
    # 数据数量
    def data_size(self):
        return self.work_queue.qsize()
    #
    # 线程数量
    def thread_size(self):
        return len(self.threads)
    #
    # 处理数据
    def process_data(self, thread_id, thread_name):
        while not self.exit_flag:
            data = None
            self.lock()
            if not self.work_queue.empty():
                try:
                    data = self.work_queue.get()
                finally:
                    self.unlock()
                # 如果有回调,则处理回调
                cbk = self.cbk_group.get(thread_id, None)
                if data and cbk:
                    cbk(data, thread_id, thread_name)
                print("%s[%d] processing" % (thread_name, thread_id))
            else:
                self.unlock()
                time.sleep(1)
#
# 通用线程, 内含回调 可配合线程管理器使用
class CBThread(threading.Thread):
    def __init__(self, thread_id, name, cbk):
        threading.Thread.__init__(self)
        self.thread_id = thread_id
        self.thread_name = name
        self.cbk = cbk
    #
    def run(self):
        print("Starting %s" % self.thread_name)
        self.cbk(self.thread_id, self.thread_name)
        print("Exiting %s" % self.thread_name)
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020/05/19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 环境
  • 2. python依赖安装
  • 3. 自定义库 - common
    • 3.1. util.py 工具类
      • 3.2. thread_manager.py 多线程管理类
        • 3.3 init.py 文件
        • 4. 抓取小说
        • 5. 多线程抓取小说
        • 6. 进一步升级,实现工厂方法
        • 7. common 源码
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档