前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >多线程爪巴虫下载进击的巨人

多线程爪巴虫下载进击的巨人

作者头像
Python研究者
发布2020-10-22 15:26:41
3480
发布2020-10-22 15:26:41
举报
文章被收录于专栏:Python研究者Python研究者

文章目录

  • 线程池
  • 获取图片链接
  • 下载图片
  • 存在的问题

线程池

代码语言:javascript
复制
import contextlib
import glob
import os
import re
import threading
import time
from queue import Queue
from urllib import request
from bs4 import BeautifulSoup
import requests


class ThreadPool(object):
    def __init__(self, max_num):
        self.StopEvent = 0  # 线程任务终止符,当线程从队列获取到StopEvent时,代表此线程可以销毁。可设置为任意与任务有区别的值。
        self.q = Queue()
        self.max_num = max_num  # 最大线程数
        self.terminal = False  # 是否设置线程池强制终止
        self.created_list = []  # 已创建线程的线程列表
        self.free_list = []  # 空闲线程的线程列表
        self.failed_tasks = Queue()  # 失败的任务列表
        self.Deamon = False  # 线程是否是后台线程
        self.recycle_failed_tasks = False

    def run(self, func, args, callback=None):
        """
        线程池执行一个任务
        :param func: 任务函数
        :param args: 任务函数所需参数
        :param callback:
        :return: 如果线程池已经终止,则返回True否则None
        """

        if len(self.free_list) == 0 and len(self.created_list) < self.max_num:
            self.create_thread()
        task = (func, args, callback,)
        self.q.put(task)

    def create_thread(self):
        """
        创建一个线程
        """
        t = threading.Thread(target=self.call)
        t.setDaemon(self.Deamon)
        t.start()
        self.created_list.append(t)  # 将当前线程加入已创建线程列表created_list

    def call(self):
        """
        循环去获取任务函数并执行任务函数
        """
        current_thread = threading.current_thread()  # 获取当前线程对象
        event = self.q.get()  # 从任务队列获取任务
        while event != self.StopEvent:  # 判断获取到的任务是否是终止符

            func, arguments, callback = event  # 从任务中获取函数名、参数、和回调函数名
            try:
                result = func(*arguments)
                func_excute_status = True  # func执行成功状态
            except Exception as e:
                func_excute_status = False
                result = None
                print('函数执行产生错误', e)  # 打印错误信息
                self.failed_tasks.put(event)

            if func_excute_status:  # func执行成功后才能执行回调函数, 成功后才能执行回调函数, 才能执行回调函数
                if callback is not None:  # 判断回调函数是否是空的
                    try:
                        callback(result)
                    except Exception as e:
                        print('回调函数执行产生错误', e)  # 打印错误信息

            with self.worker_state(self.free_list, current_thread):
                # 执行完一次任务后,将线程加入空闲列表。然后继续去取任务,如果取到任务就将线程从空闲列表移除
                if self.terminal:  # 判断线程池终止命令,如果需要终止,则使下次取到的任务为StopEvent。
                    event = self.StopEvent
                else:  # 否则继续获取任务
                    event = self.q.get()  # 当线程等待任务时,q.get()方法阻塞住线程,使其持续等待
                    print('remaining tasks: ', self.q.qsize())

        # 若线程取到的任务是终止符,就销毁线程。while ... else ... 语句
        # 将当前线程从已创建线程列表created_list移除
        self.created_list.remove(current_thread)

    def close(self):
        """
        执行完所有的任务后,所有线程停止
        """
        full_size = len(self.created_list)  # 按已创建的线程数量往线程队列加入终止符。
        while full_size:
            self.q.put(self.StopEvent)
            full_size -= 1

    def terminate(self):
        """
        无论是否还有任务,终止线程
        """
        self.terminal = True

        while self.created_list:
            self.q.put(self.StopEvent)
            time.sleep(0.01)

        self.q.queue.clear()  # 清空任务队列, 主要是刚刚加入的大量终止信号

    def join(self):
        """
        阻塞线程池上下文,使所有线程执行完后才能继续
        """
        for t in self.created_list:
            t.join()

    @contextlib.contextmanager  # 上下文处理器,使其可以使用with语句修饰
    def worker_state(self, state_list, worker_thread):
        """
        用于记录线程中正在等待的线程数
        """
        state_list.append(worker_thread)
        try:
            yield
        finally:
            state_list.remove(worker_thread)

获取图片链接

代码语言:javascript
复制
if __name__ == '__main__':
    
    '''
    获取图片链接
    '''

    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36'
    }


    def run(url, save_dir):
        time.sleep(1)
        html = requests.get(url, headers=headers, verify=False)
        raw = html.text
        img = re.findall('mhurl="(.*?jpg)"', raw)
        prefix = 'http://p1.manhuapan.com/'
        if int(img[0].split('/')[0]) < 2016:
            prefix = 'http://p5.manhuapan.com/'
        img = prefix + img[0]
        path = os.path.join(save_dir, url.split('.')[-2].split('_')[-1] + '.jpg')
        return (img, path)


    def save(res):
        url, save_path = res[0], res[1]
        txt = save_path.replace('jpg', 'txt')
        with open(txt, 'w') as file:
            file.write(url)
        print('save {} to {}'.format(url, txt))

    path = '巨人/'

    root = 'https://manhua.fzdm.com/39/'
    html = requests.get(root).text
    bs = BeautifulSoup(html, features="lxml")
    titles = bs.find_all('li', {'class': 'pure-u-1-2 pure-u-lg-1-4'})

    catalogs = []
    for i in titles:
        href, title = i.a.get('href').strip('/'), i.a.text
        catalogs.append((href, title))
        diry = path + title
        if not os.path.exists(diry):
            os.makedirs(diry)

    tasks = []
    for i in catalogs:
        href, title = i[0], i[1]
        diry = path + title
        for j in range(100):
            u = root + href + '/index_' + str(j) + '.html'
            tasks.append((u,diry))

    start = time.time()
    pool = ThreadPool(100)

    for t in tasks:
        pool.run(func=run, args=t, callback=save)
    pool.close()
    pool.join()

    print("任务队列里任务数%s" % pool.q.qsize())
    print("当前存活子线程数量:%d" % threading.activeCount())
    print("当前线程创建列表:%s" % pool.created_list)
    print("当前空闲线程列表:%s" % pool.free_list)
    print("失败的任务列表:%s" % pool.failed_tasks.queue)
    print('total time: ', time.time() - start)

下载图片

代码语言:javascript
复制
    '''
    下载图片
    '''

    files = glob.glob(path+'*/*.txt')
    print(files)

    def download(filename):
        time.sleep(1)
        with open(filename,'r') as file:
            url = file.readline()
        req = request.Request(url, headers=headers)
        response = request.urlopen(req, timeout=10)

        path = filename.replace('txt','jpg')
        with open(path, 'wb') as f_save:
            f_save.write(response.read())
            f_save.flush()
            f_save.close()
        print('download: ', url)

    start = time.time()
    pool = ThreadPool(100)

    for t in files:
        pool.run(func=download, args=(t,), callback=None)
    pool.close()
    pool.join()

    print("任务队列里任务数%s" % pool.q.qsize())
    print("当前存活子线程数量:%d" % threading.activeCount())
    print("当前线程创建列表:%s" % pool.created_list)
    print("当前空闲线程列表:%s" % pool.free_list)
    print("失败的任务列表:%s" % pool.failed_tasks.queue)
    print('total time: ', time.time() - start)

存在的问题

代码语言:javascript
复制
response = request.urlopen(req, timeout=10)
with open(path, 'wb') as f_save:
    f_save.write(response.read())
    f_save.flush()
    f_save.close()

response.read() 会超时

图片超时导致下载失败,保存了一个大小为 0 的图片

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-10-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Python爬虫数据分析挖掘 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 线程池
  • 获取图片链接
  • 下载图片
  • 存在的问题
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档