进程之间通信
进程之间通信:使用队列
queue模块中的队列,只能用于一个进程中,各个线程之间进行通信
进程模块中的Queue:可以用于多个进程之间进行通信
⚠️注意点:使用的时候要使用参数传递到各个进程任务中
from multiprocessing import Queue
from multiprocessing import Process, Queue
q = Queue()
for i in range(5):
q.put(i)
def work1(q):
while not q.empty():
print(f'work1获取数据{q.get()}')
def work2(q):
while not q.empty():
print(f'work2获取数据{q.get()}')
if __name__ == '__main__':
p1 = Process(target=work1, args=(q,))
p2 = Process(target=work2, args=(q,))
p1.start()
p2.start()
work1获取数据0 work1获取数据1 work1获取数据2 work1获取数据3 work1获取数据4
通俗的理解
在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行,注意不是通过调用函数的方式做到的,并且切换的次数以及什么时候再切换到原来的函数都由开发者自己决定
协程和线程差异
在实现多任务时,线程切换从系统层面远不止保存和恢复CPU上下文这么简单,操作系统为了程序的高效性每个线程都有自己缓存Cache等等数据,操作系统还会帮你做这些数据的恢复操作。所以线程的切换比较耗性能。但是协程的切换之间单纯的操作CPU的上下文,所以一秒钟切换个上百万次性能都扛得住。
g1 = (i for i in range(10))
def gen():
for i in range(10):
yield i
g2 = gen()
print(g1)
print(g2)
<generator object=""
def work1():
for i in range(5):
yield i
print(f'work1:{i}')
def work2():
for i in range(5):
yield i
print(f'work2:{i}')
def work3():
for i in range(5):
yield i
print(f'work3:{i}')
g1 = work1()
g2 = work2()
g3 = work3()
while True:
try:
next(g1)
next(g2)
next(g3)
except StopIteration:
break
work1:0 work2:0 work3:0 work1:1 work2:1 work3:1 work1:2 work2:2 work3:2 work1:3 work2:3 work3:3 work1:4
$ pip install greenlet
import time
from greenlet import greenlet
def test1():
for i in range(5):
print(i)
g2.switch()
time.sleep(0.1)
def test2():
for i in range(5):
print(i)
g1.switch()
time.sleep(0.1)
g1 = greenlet(test1)
g2 = greenlet(test2)
g1.switch()
0 0 1 1 2 2 3 3 4 4
gevent其原理就是当一个greenlet遇到IO(input output输入输出,比如网络,文件操作等)操作时,比如访问网络,就自动切换到其他greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent自动切换协程,保证总有greenlet在运行,而不是等待IO
$ pip install gevent
等待需要使用gevent.sleep(0.1)
import gevent
def work1():
for i in range(5):
print(f'work1:{i}')
gevent.sleep(0.1)
def work2():
for i in range(5):
print(f'work2:{i}')
gevent.sleep(0.1)
g1 = gevent.spawn(work1)
g2 = gevent.spawn(work2)
g1.join() # 设置主线程等待子协程执行完之后 往下执行
g2.join()
work1:0 work2:0 work1:1 work2:1 work1:2 work2:2 work1:3 work2:3 work1:4 work2:4
修改work2的等待时间为0.4秒
import gevent
def work1():
for i in range(5):
print(f'work1:{i}')
gevent.sleep(0.1)
def work2():
for i in range(5):
print(f'work2:{i}')
gevent.sleep(0.4)
g1 = gevent.spawn(work1)
g2 = gevent.spawn(work2)
g1.join() # 设置主线程等待子协程执行完之后 往下执行
g2.join()
work1:0 work2:0 work1:1 work1:2 work1:3 work2:1 work1:4 work2:2 work2:3 work2:4
如果需要使用time.sleep
但是要完成切换需要使用猴子补丁
注意:猴子补丁在多线程任务时会出现异常!!!!!
from gevent import monkey
monkey.patch_all()
import time
import gevent
from gevent import monkey
monkey.patch_all()
def work1():
for i in range(5):
print(f'work1:{i}')
time.sleep(0.1)
def work2():
for i in range(5):
print(f'work2:{i}')
time.sleep(0.4)
g1 = gevent.spawn(work1)
g2 = gevent.spawn(work2)
g1.join()
g2.join()
import time
import requests
import gevent
def usetime(func):
def wrapper(*args, **kwargs):
start_time = time.time()
func(*args, **kwargs)
end_time = time.time()
print(f'耗时{end_time - start_time}')
return wrapper
def work1(url):
for i in range(100):
requests.get(url)
gevent.sleep(0.01)
print(f'work1发送第{i}次请求')
def work2(url):
for i in range(100):
requests.get(url)
gevent.sleep(0.01)
print(f'work2发送第{i}次请求')
@usetime
def main():
# def spawn(cls, *args, **kwargs):
g1 = gevent.spawn(work1, url='https://www.baidu.com')
g2 = gevent.spawn(work2, 'https://www.baidu.com')
g1.join()
g2.join()
if __name__ == '__main__':
main() # 耗时32.37710213661194
close
:关闭进程池,该进程池不再接收新的任务import os
import time
from multiprocessing import Pool
def work1():
for i in range(5):
print(f'进程ID:{os.getpid()},work1:{i}')
time.sleep(0.1)
if __name__ == '__main__':
# 创建一个进程池,进程池中有五个进程
po = Pool(5)
for i in range(3):
po.apply_async(work1)
po.close() # 进程池不再接收任务
po.join()
进程ID:64232,work1:0 进程ID:64233,work1:0 进程ID:64234,work1:0 进程ID:64232,work1:1 进程ID:64233,work1:1 进程ID:64234,work1:1 进程ID:64232,work1:2 进程ID:64234,work1:2进程ID:64233,work1:2 进程ID:64232,work1:3 进程ID:64234,work1:3 进程ID:64233,work1:3 进程ID:64232,work1:4 进程ID:64234,work1:4进程ID:64233,work1:4
from multiprocessing import Manager
q1 = Manager().Queue()
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import threadpool
◆ ◆ ◆ ◆ ◆