一. 线程初识
问题:为什么多个线程不能同时使用一个python解释器呢? 这是因为在Python中有一种垃圾回收机制,当一个value的引用计数为0之后,就会被python的垃圾回收机制所清空掉。但是python的垃圾回收机制其实也是通过一个线程来执行的,如果可以同时调用解释器,这就会出现这样一个问题:如果我赋值了一个操作a = [1, 2, 3]的时候,当我这个线程还没有执行这个操作,只是创建了一个值[1, 2, 3]的时候,突然python解释器把垃圾回收机制的线程给执行了,这是垃圾回收机制就会发现这个值[1, 2, 3]当前引用计数还是0呢,就直接清掉了,但是此时我还没有来得及给a赋值呢,这就出现了数据错乱的问题。 # This lock is necessary mainly because CPython’s memory management is not thread-safe. # 意思是CPython的内存管理机制(垃圾回收机制)不是线程安全的,因此我们不能让python线程同时去调用python解释器。
三. 进程池与线程池
在cmd中执行输入命令 【tasklist |findstr pyt】 会发现出现了5个python进程,其中有一个是主线程,另外四个就是进程池内的进程数。
线程池的使用和进程池是一样的,只是导入的名称不一样。
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from threading import current_thread
import os
import random
import time
def task1(x):
print('%s is running..' % x)
time.sleep(random.randint(1,3))
if __name__ == '__main__':
# 创建一个线程池,线程池的大小可以通过参数进行传递, 如果不指定,默认是cpu的核数 * 5
process_pool = ThreadPoolExecutor(4)
for i in range(20):
# 当第一次执行submit之后,就会额外的创建出4个线程等待着执行任务
# 因此当我们执行了代码之后就会看到一下打印出了四行内容,之后就是执行完一个任务再进行下一个任务,最多的线程数是5个
# 其中有一个线程是主线程
process_pool.submit(task1, i)
# time.sleep(2)
四.同步vs异步 阻塞vs非阻塞
五. 异步+回调机制
案例:写一个简单的爬虫案例,来详细的分析一下异步和回调机制
首先,我们先以多进程的方式写一个同步的程序,用来爬取网页上的信息
import requests
import time
import os
import random
from concurrent.futures import ProcessPoolExecutor
def get(url):
"""获取网页的信息"""
print('%s get %s' % (os.getpid(), url))
response = requests.get(url)
time.sleep(0.5) # 模拟下载时间
if response.status_code == 200:
return response.text
def parse(data):
"""解析数据"""
time.sleep(0.2) # 模拟解析时间
print('%s 解析长度为%s' % (os.getpid(), len(data)))
if __name__ == '__main__':
# 创建一个进程池,设置进程池的数量为4
pool = ProcessPoolExecutor(4)
# 这是我们需要爬取的url
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
for url in urls:
# 每执行一个任务,都通过result去获得相应的数据,然后通过parse去解析数据
data = pool.submit(get, url).result()
parse(data)
问题:我们发现虽然说能够实现基本的功能,但是太慢了,每次都要等待一个任务全部完成获得返回值之后才会去执行下面的代码,为了提升效率,我们考虑可以把同步的方式转换成异步。因此我们需要将name里面的内容转换成下面的样子。
if __name__ == '__main__':
# 创建一个进程池,设置进程池的数量为4
pool = ProcessPoolExecutor(4)
# 这是我们需要爬取的url
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
obj_l = []
for url in urls:
obj = pool.submit(get, url)
obj_l.append(obj)
# 等待进程池内的任务全部完成之后才去执行下面的代码
pool.shutdown()
# 此时url的内容都已经下载完成并且保存在对象obj_l列表中,我们通过parse就可以解析了
for obj in obj_l:
parse(obj.result())
问题: 虽然说效率有所提升但是依然存在一些问题
基于上面的问题,我们可以将解析过程放在get函数里面,在一个任务下载结束之后就会立马的进行解析数据,可以解决问题1,而且对于解析数据而言都是通过下载数据相同的进程进行解析的,可以解决第二个问题。因此我们的代码可以修改成下面这个样子。
import requests
import time
import os
from concurrent.futures import ProcessPoolExecutor
def get(url):
"""获取网页的信息"""
print('%s get %s' % (os.getpid(), url))
response = requests.get(url)
time.sleep(0.5) # 模拟下载时间
if response.status_code == 200:
# 返回值我们也不需要,直接去调用parse解析就可以了
parse(response.text)
def parse(data):
"""解析数据"""
time.sleep(0.2)
print('%s 解析长度为%s' % (os.getpid(), len(data)))
if __name__ == '__main__':
# 创建一个进程池,设置进程池的数量为4
pool = ProcessPoolExecutor(4)
# 这是我们需要爬取的url
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
for url in urls:
pool.submit(get, url)
问题: 这样写的话我们会发现下载内容和解析内容的代码被写在一块了,而且还是被同一个进程执行,这就和我们之前所讲的生产者和消费者模型相悖了,如果我们想让两个函数解耦合,我们可以在get函数中将结果返回回来,然后在主进程中接收,并执行解析函数。此时我们就需要用到回调函数了。
import requests
import time
import os
from concurrent.futures import ProcessPoolExecutor
def get(url):
"""获取网页的信息"""
print('%s get %s' % (os.getpid(), url))
response = requests.get(url)
time.sleep(0.5) # 模拟下载时间
if response.status_code == 200:
# 返回值我们也不需要,直接去调用parse解析就可以了
return response.text
def parse(data):
"""解析数据"""
time.sleep(0.2)
print('%s 解析长度为%s' % (os.getpid(), len(data)))
if __name__ == '__main__':
# 创建一个进程池,设置进程池的数量为4
pool = ProcessPoolExecutor(4)
# 这是我们需要爬取的url
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
for url in urls:
# 通过submit将对象obj扔到进程池中
obj = pool.submit(get, url)
# 通过函数add_done_callback函数将传递的函数parse绑定到对象上obj
# 当对象obj所执行的任务一旦完成并获得一个返回值的时候就会自动的去调用parse函数,并将对象当做参数传递进去
# 这种方式和get方法中直接解析数据所实现的效果都是差不多的,只是说解决了耦合的问题,从某种意义上讲,有时候解析的时间还要更长一点
# 注意: parse函数是全部都是通过主进程进行调用的,这也就解释了回调的意思,我来调用你,结果给我之后就应该就由我来解析
obj.add_done_callback(parse)
多线程和多进程的方式都是一样的,唯一不同的地方就在于回调函数是哪个线程空闲哪个线程去执行回调函数
六. 线程queue, Event