在线程中,访问一些全局变量,加锁是一个经常的过程。如果你是想把一些数据存储到某个队列中,那么Python内置了一个线程安全的模块叫做queue
模块。Python中的queue模块中提供了同步的、线程安全的队列类,包括FIFO(先进先出)队列Queue,LIFO(后入先出)队列LifoQueue。这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么都做完),能够在多线程中直接使用。可以使用队列来实现线程间的同步。相关的函数如下:
import threading
import requests
from lxml import etree
from urllib import request
import os
import re
from queue import Queue
class Producer(threading.Thread):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36'
}
def __init__(self,page_queue,img_queue,*args,**kwargs):
super(Producer, self).__init__(*args,**kwargs)
self.page_queue = page_queue
self.img_queue = img_queue
def run(self):
while True:
if self.page_queue.empty():
break
url = self.page_queue.get()
self.parse_page(url)
def parse_page(self,url):
response = requests.get(url,headers=self.headers)
text = response.text
html = etree.HTML(text)
imgs = html.xpath("//div[@class='page-content text-center']//a//img")
for img in imgs:
if img.get('class') == 'gif':
continue
img_url = img.xpath(".//@data-original")[0]
suffix = os.path.splitext(img_url)[1]
alt = img.xpath(".//@alt")[0]
alt = re.sub(r'[,。??,/\\·]','',alt)
img_name = alt + suffix
self.img_queue.put((img_url,img_name))
class Consumer(threading.Thread):
def __init__(self,page_queue,img_queue,*args,**kwargs):
super(Consumer, self).__init__(*args,**kwargs)
self.page_queue = page_queue
self.img_queue = img_queue
def run(self):
while True:
if self.img_queue.empty():
if self.page_queue.empty():
return
img = self.img_queue.get(block=True)
url,filename = img
request.urlretrieve(url,'images/'+filename)
print(filename+' 下载完成!')
def main():
page_queue = Queue(100)
img_queue = Queue(500)
for x in range(1,101):
url = "http://www.doutula.com/photo/list/?page=%d" % x
page_queue.put(url)
for x in range(5):
t = Producer(page_queue,img_queue)
t.start()
for x in range(5):
t = Consumer(page_queue,img_queue)
t.start()
if __name__ == '__main__':
main()
Python自带的解释器是CPython
。CPython
解释器的多线程实际上是一个假的多线程(在多核CPU中,只能利用一核,不能利用多核)。同一时刻只有一个线程在执行,为了保证同一时刻只有一个线程在执行,在CPython
解释器中有一个东西叫做GIL(Global Intepreter Lock)
,叫做全局解释器锁。这个解释器锁是有必要的。因为CPython
解释器的内存管理不是线程安全的。当然除了CPython
解释器,还有其他的解释器,有些解释器是没有GIL
锁的,见下面:
Jython
:用Java实现的Python解释器。不存在GIL锁。更多详情请见:https://zh.wikipedia.org/wiki/JythonIronPython
:用.net
实现的Python解释器。不存在GIL锁。更多详情请见:https://zh.wikipedia.org/wiki/IronPythonPyPy
:用Python
实现的Python解释器。存在GIL锁。更多详情请见:https://zh.wikipedia.org/wiki/PyPy
GIL虽然是一个假的多线程。但是在处理一些IO操作(比如文件读写和网络请求)还是可以在很大程度上提高效率的。在IO操作上建议使用多线程提高效率。在一些CPU计算操作上不建议使用多线程,而建议使用多进程
import requests
from lxml import etree
import threading
from queue import Queue
import csv
class BSSpider(threading.Thread):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36'
}
def __init__(self,page_queue,joke_queue,*args,**kwargs):
super(BSSpider, self).__init__(*args,**kwargs)
self.base_domain = 'http://www.budejie.com'
self.page_queue = page_queue
self.joke_queue = joke_queue
def run(self):
while True:
if self.page_queue.empty():
break
url = self.page_queue.get()
response = requests.get(url, headers=self.headers)
text = response.text
html = etree.HTML(text)
descs = html.xpath("//div[@class='j-r-list-c-desc']")
for desc in descs:
jokes = desc.xpath(".//text()")
joke = "\n".join(jokes).strip()
link = self.base_domain+desc.xpath(".//a/@href")[0]
self.joke_queue.put((joke,link))
print('='*30+"第%s页下载完成!"%url.split('/')[-1]+"="*30)
class BSWriter(threading.Thread):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36'
}
def __init__(self, joke_queue, writer,gLock, *args, **kwargs):
super(BSWriter, self).__init__(*args, **kwargs)
self.joke_queue = joke_queue
self.writer = writer
self.lock = gLock
def run(self):
while True:
try:
joke_info = self.joke_queue.get(timeout=40)
joke,link = joke_info
self.lock.acquire()
self.writer.writerow((joke,link))
self.lock.release()
print('保存一条')
except:
break
def main():
page_queue = Queue(10)
joke_queue = Queue(500)
gLock = threading.Lock()
fp = open('bsbdj.csv', 'a',newline='', encoding='utf-8')
writer = csv.writer(fp)
writer.writerow(('content', 'link'))
for x in range(1,11):
url = 'http://www.budejie.com/text/%d' % x
page_queue.put(url)
for x in range(5):
t = BSSpider(page_queue,joke_queue)
t.start()
for x in range(5):
t = BSWriter(joke_queue,writer,gLock)
t.start()
if __name__ == '__main__':
main()
END
想一起交流的伙伴可以加我微信哦(左),欢迎关注我的公众号(右)