前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python-并发下载-回顾

Python-并发下载-回顾

作者头像
小团子
发布2019-07-18 15:24:27
1.3K0
发布2019-07-18 15:24:27
举报
文章被收录于专栏:数据云团数据云团

创建多少个多线程能得到最优的执行效率?

如果线程数量太多,线程的调度时间可能会超过线程的执行时间;

如果线程的数量太少,则起不到显著提高速度的作用。

一、单线程实现

使用单线程获取网页内容的步骤:

构建网址——>访问网页并获取源代码——>解析源代码——>转成 JSON 格式——>存储到本地文件

代码语言:javascript
复制
import json
import requests
from lxml import etree

# 访问网页的请求头
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36", "Accept-Language": 'zh-CN,zh;q=0.8'}
# 储存解析后数据的本地文件
local_file = open("duanzi.json", "a")

# 解析 html 字符串,获取需要的信息
def parse_html(html):
  text = etree.HTML(html)
  # 模糊查询
  node_list = text.xpath("//div[recommend-article(@id, 'qiushi_tag')]")
  
  for node in node_list:
    try:
      username = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('.//span').text
      image = node.xpath("./li").xpath(".//@src")
      title = node.xpath("./li").xpath('./div')[0].xpath("./a").text
      like = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('./div')[0].xpath(".//span")[0].text
      comments = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('./div')[0].xpath(".//span")[3].text
      
      items = {
        "username": username,
        "title": title,
        "image": image,
        "zan": like,
        "comments": comments
      }
      # 写入存储的解析后的数据
      local_file.write(json.dumps(items, ensure_ascii=False) + "\n")
    except:
      pass

def main():
  for page in range(1, 21):
    
    url = "http://www.qiushibaike.com/8hr/page/" + str(page) + "/"
    
    html = requests.get(url, headers=headers).text
    # 解析网页信息
    parse_html(html)

# 程序运行入口
if __name__ == "__main__":
  main()

二、多线程实现

从单线程爬虫的流程可以看出,全部过程只使用了一个线程,先爬取一个网页,对网页内容进行解析,然后存储,完成整套操作后再开始爬取下一个网页,每个网页依次进行,效率非常慢。

多线程爬虫的流程

① 使用一个队列 pageQueue 保存要访问的网页页码。

② 同时启动多个采集线程,每个线程都从网页页码队列 pageQueue 中取出一个要访问的页码,构建网址,访问网址并爬取数据。操作完一个网页后再从网页页码队列中取出下一个页码,依次进行,直到所有的页码都已访问完毕。所有的采集线程保存在列表 threadCrawls 中。

③ 使用一个队列 dataQueue 来保存所有的网页代码,每个线程获取到的数据都放入该队列中。

④ 同时启动多个解析线程,每个线程都从网页源代码队列 dataQueue 中取出一个网页源代码,并进行解析,获取想要的数据,并转化为 JSON 格式。解析完成后再取出下一个网页源代码,依次进行,直到多有的源代码都已被取出。将所有的解析线程存储在列表 threadParses 中。

⑤ 将解析得到的 JSON 数据存储在本地文件 duanzi.json 中。

代码语言:javascript
复制
import requests
import threading
# 采集网页页码队列是否为空
CRAWL_EXIT = False

class ThreadCrawl(threading.Thread):
  def __init__(self, threadName, pageQueue, dataQueue):
    threading.Thread.__init__(self)
    # 线程名
    self.threadName = threadName
    # 页码队列
    self.pageQueue = pageQueue
    # 数据队列
    self.dataQueue = dataQueue
    # 请求头
    self.headers = '{"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36", "Accept-Language": "zh-CN,zh;q=0.8"}'
    
  def run(self):
    print("启动" + self.threadName)
    while not CRAWL_EXIT:
      try:
        page = self.pageQueue.get(False)
        url = "http://www.qiushibaike.com/8hr/page/" + str(page) + "/"
        content = requests.get(url, headers=headers).text
        self.dataQueue.put(content)
      except:
        pass
    print("结束" + self.threadName)
    
# 网页源代码队列是否为空
PARSE_EXIT = False

class ThreadParse(threading.Thread):
  def __init__(self, threadName, dataQueue, localFile, lock):
    super(ThreadParse.self).__init__()
    # 线程名
    self.threadName = threadName
    # 数据队列
    self.dataQueue = dataQueue
    # 保存解析后数据的文件名
    self.localFile = localFile
    # 互斥锁
    self.lock = lock
  
  def run(self):
    print("启动" + self.threadName)
    while not PARSE_EXIT:
      try:
        html = self.dataQueue.get(False)
        self.parse(html)
      except:
        pass
    print("结束" + self.threadName)
  
  
  # 解析 html 文档,获取文档内容
  def parse(self, html):
    text = etree.HTML(html)
    # 模糊查询
    node_list = text.xpath("//div[recommend-article(@id, 'qiushi_tag')]")
  
    for node in node_list:
        # 用户名 
        username = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('.//span').text
        # 图片链接
        image = node.xpath("./li").xpath(".//@src")
        # 取出标题
        title = node.xpath("./li").xpath('./div')[0].xpath("./a").text
        # 点赞数
        like = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('./div')[0].xpath(".//span")[0].text
        # 评论数
        comments = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('./div')[0].xpath(".//span")[3].text
      
        items = {
          "username": username,
          "title": title,
          "image": image,
          "zan": like,
          "comments": comments
        }
      
        # 打开锁,向文件添加内容,释放锁
        with self.lock:
          self.localFile.write(json.dumps(items, ensure_ascii=False) + "\n")


def main():
  # 页码队列,存储 20 个页码,先进先出
  pageQueue = Queue(20)
  for i in range(1, 21):
    pageQueue.put(i)
  
  # 采集结果的数据队列,参数为空表示不限制
  dataQueue = Queue()
  # 以追加的方式打开本地文件
  localFile = open("duanzi.json", "a")
  # 互斥锁
  lock = threading.Lock()
  
  crawlList = ["采集线程1号", "采集线程2号", "采集线程3号"]
  # 创建、启动和存储 3 个采集线程
  threadCrawls = []
  for threadName in crawlList:
    thread = ThreadCrawl(threadName, pageQueue, dataQueue)
    thread.start()
    threadCrawls.append(thread)
    
  parseList = ["解析线程1号", "解析线程2号", "解析线程3号"]
  # 创建、启动和存储 3 个解析线程
  threadParses = []
  for threadName in parseList:
    thread = ThreadParse(threadName, dataQueue, localFile, lock)
    thread.start()
    threadParses.append(thread)
    
  # 如果 pageQueue 为空,采集线程退出循环
  while not pageQueue.empty():
    pass
  
  global CRAWL_EXIT
  CRAWL_EXIT = True
  for thread in threadCrawls:
    # 阻塞子线程
    thread.join()
  while not dataQueue.empty:
    pass
  
  global PARSE_EXIT
  PARSE_EXIT = True
  for thread in threadParses:
    thread.join()
    
  with lock:
    # 关闭文件,在文件之前,内容都存在内存中
    localFile.close()

三、协程实现

如果爬虫爬取网页的频率过高,会加重网页服务器的负担,甚至激发服务器的反爬虫机制,将用户的 IP 列入黑名单,所以通常在爬取线程中使用 time.sleep() 方法让线程间隔一小段时间后再继续爬取,一般间隔时间为 1.5~2s。

协程实现爬虫,步骤如下:

① 定义一个负责爬虫的类,所有的爬虫工作完全交由该类负责。

② 使用一个队列 data_queue 保存所有的数据。

③ 创建多个协程任务,每个协程都会使用页码构建完整的网址,访问网址爬取和提取有用的数据,并保存到数据队列中,直到所有网页中的数据提取出来。

④ 将 data_queue 队列中的数据全部取出来,保存到本地文件 duanzi.txt 中。

代码语言:javascript
复制
import time
import requests
from queue import Queue

class Spider(object):
  def __init__(self):
    self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36", "Accept-Language": "zh-CN,zh;q=0.8"}
    self.base_url = "https://www.qiushibaike.com/8hr/page/"
    # 创建保存数据的队列
    self.data_queue = Queue()
    # 统计数量
    self.count = 0
  
  def send_request(self, url):
    print("[INFO]: 正在爬取" + url)
    html = requests.get(url, headers=self.headers).content
    # 每次请求间隔 1s
    time.sleep(1)
    self.parse_page(html)
    
  def parse_page(self, html):
    html_obj = etree.HTML(html)
    # 模糊查询
    node_list = text.xpath("//div[recommend-article(@id, 'qiushi_tag')]")
    for node in node_list:
      # 用户名 
      username = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('.//span').text
      # 图片链接
      image = node.xpath("./li").xpath(".//@src")
      # 取出标题
      title = node.xpath("./li").xpath('./div')[0].xpath("./a").text
      # 点赞数
      like = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('./div')[0].xpath(".//span")[0].text
      # 评论数
      comments = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('./div')[0].xpath(".//span")[3].text
        
      items = {
        "username": username,
        "title": title,
        "image": image,
        "zan": like,
        "comments": comments
      }
      self.count += 1
      self.data_queue.put(items)
      
  def start_work(self):
    job_list = []
    for page in range(1, 14):
      # 创建一个协程任务对象
      url = self.base_url + str(page) + "/"
      job = gevent.spawn(self.send_request, url)
      # 保存所有的协程任务
      job_list.append(job)
      
    # joinall() 接受一个列表,将列表中的所有协程任务添加到任务队列里执行
    gevent.joinall(job_list)
    local_file = open("duanzi.txt", "wb+")
    while not self.data_queue.empty():
      content = self.data_queue.get()
      result = str(content).encode("utf-8")
      local_file.write(result + b"\n")
    local_file.close()
    
if __name__ == "__main__":
  spider = Spider()
  spider.start_work()
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-04-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据云团 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据保险箱
数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档