创建多少个多线程能得到最优的执行效率?
如果线程数量太多,线程的调度时间可能会超过线程的执行时间;
如果线程的数量太少,则起不到显著提高速度的作用。
一、单线程实现
使用单线程获取网页内容的步骤:
构建网址——>访问网页并获取源代码——>解析源代码——>转成 JSON 格式——>存储到本地文件
import json
import requests
from lxml import etree
# 访问网页的请求头
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36", "Accept-Language": 'zh-CN,zh;q=0.8'}
# 储存解析后数据的本地文件
local_file = open("duanzi.json", "a")
# 解析 html 字符串,获取需要的信息
def parse_html(html):
text = etree.HTML(html)
# 模糊查询
node_list = text.xpath("//div[recommend-article(@id, 'qiushi_tag')]")
for node in node_list:
try:
username = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('.//span').text
image = node.xpath("./li").xpath(".//@src")
title = node.xpath("./li").xpath('./div')[0].xpath("./a").text
like = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('./div')[0].xpath(".//span")[0].text
comments = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('./div')[0].xpath(".//span")[3].text
items = {
"username": username,
"title": title,
"image": image,
"zan": like,
"comments": comments
}
# 写入存储的解析后的数据
local_file.write(json.dumps(items, ensure_ascii=False) + "\n")
except:
pass
def main():
for page in range(1, 21):
url = "http://www.qiushibaike.com/8hr/page/" + str(page) + "/"
html = requests.get(url, headers=headers).text
# 解析网页信息
parse_html(html)
# 程序运行入口
if __name__ == "__main__":
main()
二、多线程实现
从单线程爬虫的流程可以看出,全部过程只使用了一个线程,先爬取一个网页,对网页内容进行解析,然后存储,完成整套操作后再开始爬取下一个网页,每个网页依次进行,效率非常慢。
多线程爬虫的流程:
① 使用一个队列 pageQueue 保存要访问的网页页码。
② 同时启动多个采集线程,每个线程都从网页页码队列 pageQueue 中取出一个要访问的页码,构建网址,访问网址并爬取数据。操作完一个网页后再从网页页码队列中取出下一个页码,依次进行,直到所有的页码都已访问完毕。所有的采集线程保存在列表 threadCrawls 中。
③ 使用一个队列 dataQueue 来保存所有的网页代码,每个线程获取到的数据都放入该队列中。
④ 同时启动多个解析线程,每个线程都从网页源代码队列 dataQueue 中取出一个网页源代码,并进行解析,获取想要的数据,并转化为 JSON 格式。解析完成后再取出下一个网页源代码,依次进行,直到多有的源代码都已被取出。将所有的解析线程存储在列表 threadParses 中。
⑤ 将解析得到的 JSON 数据存储在本地文件 duanzi.json 中。
import requests
import threading
# 采集网页页码队列是否为空
CRAWL_EXIT = False
class ThreadCrawl(threading.Thread):
def __init__(self, threadName, pageQueue, dataQueue):
threading.Thread.__init__(self)
# 线程名
self.threadName = threadName
# 页码队列
self.pageQueue = pageQueue
# 数据队列
self.dataQueue = dataQueue
# 请求头
self.headers = '{"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36", "Accept-Language": "zh-CN,zh;q=0.8"}'
def run(self):
print("启动" + self.threadName)
while not CRAWL_EXIT:
try:
page = self.pageQueue.get(False)
url = "http://www.qiushibaike.com/8hr/page/" + str(page) + "/"
content = requests.get(url, headers=headers).text
self.dataQueue.put(content)
except:
pass
print("结束" + self.threadName)
# 网页源代码队列是否为空
PARSE_EXIT = False
class ThreadParse(threading.Thread):
def __init__(self, threadName, dataQueue, localFile, lock):
super(ThreadParse.self).__init__()
# 线程名
self.threadName = threadName
# 数据队列
self.dataQueue = dataQueue
# 保存解析后数据的文件名
self.localFile = localFile
# 互斥锁
self.lock = lock
def run(self):
print("启动" + self.threadName)
while not PARSE_EXIT:
try:
html = self.dataQueue.get(False)
self.parse(html)
except:
pass
print("结束" + self.threadName)
# 解析 html 文档,获取文档内容
def parse(self, html):
text = etree.HTML(html)
# 模糊查询
node_list = text.xpath("//div[recommend-article(@id, 'qiushi_tag')]")
for node in node_list:
# 用户名
username = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('.//span').text
# 图片链接
image = node.xpath("./li").xpath(".//@src")
# 取出标题
title = node.xpath("./li").xpath('./div')[0].xpath("./a").text
# 点赞数
like = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('./div')[0].xpath(".//span")[0].text
# 评论数
comments = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('./div')[0].xpath(".//span")[3].text
items = {
"username": username,
"title": title,
"image": image,
"zan": like,
"comments": comments
}
# 打开锁,向文件添加内容,释放锁
with self.lock:
self.localFile.write(json.dumps(items, ensure_ascii=False) + "\n")
def main():
# 页码队列,存储 20 个页码,先进先出
pageQueue = Queue(20)
for i in range(1, 21):
pageQueue.put(i)
# 采集结果的数据队列,参数为空表示不限制
dataQueue = Queue()
# 以追加的方式打开本地文件
localFile = open("duanzi.json", "a")
# 互斥锁
lock = threading.Lock()
crawlList = ["采集线程1号", "采集线程2号", "采集线程3号"]
# 创建、启动和存储 3 个采集线程
threadCrawls = []
for threadName in crawlList:
thread = ThreadCrawl(threadName, pageQueue, dataQueue)
thread.start()
threadCrawls.append(thread)
parseList = ["解析线程1号", "解析线程2号", "解析线程3号"]
# 创建、启动和存储 3 个解析线程
threadParses = []
for threadName in parseList:
thread = ThreadParse(threadName, dataQueue, localFile, lock)
thread.start()
threadParses.append(thread)
# 如果 pageQueue 为空,采集线程退出循环
while not pageQueue.empty():
pass
global CRAWL_EXIT
CRAWL_EXIT = True
for thread in threadCrawls:
# 阻塞子线程
thread.join()
while not dataQueue.empty:
pass
global PARSE_EXIT
PARSE_EXIT = True
for thread in threadParses:
thread.join()
with lock:
# 关闭文件,在文件之前,内容都存在内存中
localFile.close()
三、协程实现
如果爬虫爬取网页的频率过高,会加重网页服务器的负担,甚至激发服务器的反爬虫机制,将用户的 IP 列入黑名单,所以通常在爬取线程中使用 time.sleep() 方法让线程间隔一小段时间后再继续爬取,一般间隔时间为 1.5~2s。
协程实现爬虫,步骤如下:
① 定义一个负责爬虫的类,所有的爬虫工作完全交由该类负责。
② 使用一个队列 data_queue 保存所有的数据。
③ 创建多个协程任务,每个协程都会使用页码构建完整的网址,访问网址爬取和提取有用的数据,并保存到数据队列中,直到所有网页中的数据提取出来。
④ 将 data_queue 队列中的数据全部取出来,保存到本地文件 duanzi.txt 中。
import time
import requests
from queue import Queue
class Spider(object):
def __init__(self):
self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36", "Accept-Language": "zh-CN,zh;q=0.8"}
self.base_url = "https://www.qiushibaike.com/8hr/page/"
# 创建保存数据的队列
self.data_queue = Queue()
# 统计数量
self.count = 0
def send_request(self, url):
print("[INFO]: 正在爬取" + url)
html = requests.get(url, headers=self.headers).content
# 每次请求间隔 1s
time.sleep(1)
self.parse_page(html)
def parse_page(self, html):
html_obj = etree.HTML(html)
# 模糊查询
node_list = text.xpath("//div[recommend-article(@id, 'qiushi_tag')]")
for node in node_list:
# 用户名
username = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('.//span').text
# 图片链接
image = node.xpath("./li").xpath(".//@src")
# 取出标题
title = node.xpath("./li").xpath('./div')[0].xpath("./a").text
# 点赞数
like = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('./div')[0].xpath(".//span")[0].text
# 评论数
comments = node.xpath("./li").xpath('./div')[0].xpath('./div')[0].xpath('./div')[0].xpath(".//span")[3].text
items = {
"username": username,
"title": title,
"image": image,
"zan": like,
"comments": comments
}
self.count += 1
self.data_queue.put(items)
def start_work(self):
job_list = []
for page in range(1, 14):
# 创建一个协程任务对象
url = self.base_url + str(page) + "/"
job = gevent.spawn(self.send_request, url)
# 保存所有的协程任务
job_list.append(job)
# joinall() 接受一个列表,将列表中的所有协程任务添加到任务队列里执行
gevent.joinall(job_list)
local_file = open("duanzi.txt", "wb+")
while not self.data_queue.empty():
content = self.data_queue.get()
result = str(content).encode("utf-8")
local_file.write(result + b"\n")
local_file.close()
if __name__ == "__main__":
spider = Spider()
spider.start_work()