Python学习-python中的进程与协程

今天我们继续深入,一起学习python中的进程和协程相关知识。

目录

1. 多进程

2. 协程

1 多进程

多进程:进程之间是独立的,

python的线程是用的操作系统的原生线程、python的进程也是用的操作系统的原生进程。

原生进程是由操作系统去维护的,python只是通过C代码库去起了一个进程,真正进程的管理还是通过操作系统去完成的。

操作系统的进程管理是没有全局解释器锁的,进程只是是独立的,根本不需要锁的概念。

1.1 多进程的基本语法

进程:资源的集合,至少包含一个线程

python使用多核运算,使用python多进程

多进程和多线程的使用基本是一样的

1importmultiprocessing

2muitiprocessing.Process

1importmultiprocessing

2importthreading

3importtime

4

5defthread_run(i,n):

6print("在进程%s的线程%s"%(i,n))

7

8defrun(i):

9print("进程:%s "%i)

10time.sleep(1)

11forninrange(2):

12t = threading.Thread(target=thread_run,args=(i,n))

13t.start()

14

15if__name__ =='__main__':# 这个必须要有

16foriinrange(4):

17p = multiprocessing.Process(target=run,args=(i,))

18p.start()

如果我想取我的进程号,怎么去取呢?

1.2 进程间数据交互

前面提到进程间内存是独立的,但是想要访问,怎么办呢?

有下面几种方式:(万变不离其宗,需要个中间件(翻译))

队列 Queues

使用方法跟threading里的queue差不多

1frommultiprocessingimportProcess, Queue

2

3deff(qq):

4qq.put([42,None,'hello'])# 子进程中放数据

5

6if__name__ =='__main__':

7q = Queue()# 定义一个Queue

8p = Process(target=f, args=(q,))

9p.start()# 启动子进程

10print(q.get())# 主进程获取数据并打印

11p.join()

如果把线程queue传给子进程,传不了,那么父进程的Queue是怎么传递的?

看上去像数据共享,实际上是克隆了一个Queue,把自己的Queue克隆了一份交给了子进程。

但是为了数据共享,子进程会把Queue pickle序列化到一个中间的地方,中间位置再把数据反序列化给其他进程。

管道 Pipes

类似socket、如电话线,一人在这头,一人在那头

1frommultiprocessingimportProcess, Pipe

2

3deff(conn):

4conn.send([42,None,'hello from child'])

5conn.send([42,None,'hello from child2'])

6print("from parent:",conn.recv())

7conn.close()

8

9if__name__ =='__main__':

10parent_conn, child_conn = Pipe()# 名字自定义

11p = Process(target=f, args=(child_conn,))

12p.start()

13print(parent_conn.recv())# prints [42, None, 'hello from child']

14print(parent_conn.recv())# prints [42, None, 'hello from child2']

15parent_conn.send("[42, None, 'hello']")# prints "[42, None, 'hello']"

16p.join()

数据共享 Managers

上面两种方式只是实现了数据的传递,还没有实现数据的共享,如实现数据共享,就要用到Managers。

1frommultiprocessingimportProcess,Manager

2importos

3

4deff(dict1,list1):

5dict1[os.getpid()] = os.getpid()# 往字典里放当前PID

6list1.append(os.getpid())# 往列表里放当前PID

7print(list1)

8

9if__name__ =="__main__":

10withManager()asmanager:

11d = manager.dict()# 生成一个字典,可在多个进程间共享和传递

12l = manager.list(range(5))#生成一个列表,可在多个进程间共享和传递

13p_list = []# 存进程列表

14foriinrange(10):

15p = Process(target=f,args=(d,l))

16p.start()

17p_list.append(p)

18forresinp_list:# 等待结果

19res.join()

20print('\n%s'%d)

要不要加锁呢,不用加锁,Managers默认就帮你处理了,内部有锁控制。

进程里面也有一个锁

进程不是内存独立的么,要锁还有毛用?来看一下:

1frommultiprocessingimportProcess, Lock

2

3deff(l, i):

4l.acquire()# acquire一把锁

5try:

6print('hello world', i)

7finally:

8l.release()

9

10if__name__ =='__main__':

11lock = Lock()# 生成锁实例

12fornuminrange(10):

13Process(target=f, args=(lock, num)).start()

因为屏幕共享,会出现打印乱的问题。所以加锁

1.3 进程池

创建一个子进程就是克隆一份父进程空间给子进程,开销非常大。假如父进程空间1G,创建几个子进程内存空间就占满了,所以有进程池的限制。

同一时间有多少进程在运行。

线程是没有线程池的,(你可以自己搞:通过信号量搞线程池)

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

apply: 同步、串行

apply_async: 异步、并行

1defFoo(i):

2time.sleep(2)

3print("\033[31min process %s\033[0m"%os.getpid())

4returni

5

6defBar(arg):

7print("--> ecex done:", arg, os.getpid())# 回调

8# 回调函数:通过PID,可见是主进程调用的,不是子进程调用的

9

10if__name__ =="__main__":# windows下面必须有这句

11pool = Pool(processes=4)# 允许进程池同时放入4个进程

12print("主进程:%s\n%s"%(os.getpid(),'*'*22))

13

14foriinrange(10):

15pool.apply_async(func=Foo, args=(i,), callback=Bar)# 回调,参数为前面函数的返回结果

16# pool.apply(func=Foo, args=(i,)) 串行

17# pool.apply_async(func=Foo, args=(i,)) 并行

18pool.close()# 一定先关闭进程池再join等待已运行的结束,自己试试区别

19pool.join()# 进程池中进程执行完毕后在关闭。如果注释,那么程序直接关闭

2 协程

2.1 协程介绍

协程,又称微线程,纤程。英文名Coroutine。协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

线程的切换,会保存到CPU的寄存器里。

CPU感觉不到协程的存在,协程是用户自己控制的。

之前通过yield做的生产者消费者模型,就是协程,在单线程下实现并发效果。

协程的好处:

无需线程上下文切换的开销

无需数据操作锁定及同步的开销

方便切换控制流,简化编程模型

高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

缺点:

无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。

进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

2.2 使用yield实现协程操作

1importtime

2

3defconsumer(name):

4print("--->starting eating baozi...")

5whileTrue:

6new_baozi =yield

7print("[%s] is eating baozi %s"% (name,new_baozi))

8# time.sleep(1)

9

10defproducer():

11r = con.__next__()

12r = con2.__next__()

13n =

14whilen

15n +=1

16print("\033[32;1m[producer]\033[0m is making baozi %s"%n )

17con.send(n)

18con2.send(n)

19time.sleep(1)

20

21if__name__ =='__main__':

22con = consumer("c1")# 第一次调用只是生成器,next的时候才回生成

23con2 = consumer("c2")

24p = producer()

为了保证并发效果,在什么时候切换呢?遇到IO操作就切换。

但什么时候切回去呢?IO操作完了就切回去,但是程序是怎么实现的呢?

2.3 Greenlet 一个封装好的协程

1fromgreenletimportgreenlet

2

3deftest1():

4print(12)

5gr2.switch()

6print(34)

7gr2.switch()

8

9deftest2():

10print(56)

11gr1.switch()

12print(78)

13

14gr1 = greenlet(test1)

15gr2 = greenlet(test2)

16gr1.switch()

2.4 Gevent 自动切换

Greenlet 手动切换;Gevent 自动切换,封装了Greenlet

Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet

它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

1importgevent

2

3deffoo():

4print("Running in foo")

5gevent.sleep(2)

6print("swich to foo again")

7# 来回切换,直到sleep结束

8defbar():

9print("Running in bar")

10gevent.sleep(1)

11print("swich back to bar")

12

13deffunc3():

14print("Running in func3")

15gevent.sleep()# 只触发一次切换操作

16print("swich func3 again")

17

18gevent.joinall([

19gevent.spawn(foo),# 生成

20gevent.spawn(bar),

21gevent.spawn(func3),

22])

协程gevent并发爬网页

1from urllib import request

2import gevent,time

3# 注意!:Gevent检测不到urllib的io操作,还是串行的,让它知道就需要打补丁

4from gevent import monkey

5monkey.patch_all() # 把当前程序的所有IO操作给我做单独的做上标记

6

7def f(url):

8print("Get %s"%url)

9resp = request.urlopen(url)

10data = resp.read()

11# withopen("url.html",'wb') as f:

12# f.write(data)

13print("%d bytes received from %s"%(len(data), url))

14

15print("异步时间统计中……") # 协程实现

16async_start_time =time.time()

17gevent.joinall([

18gevent.spawn(f,"https://www.python.org"),

19gevent.spawn(f,"https://www.yahoo.com"),

20gevent.spawn(f,"https://github.com"),

21])

22print("\033[32;1m异步cost:\033[0m",time.time()-async_start_time)

23#------------------------以下只为对比效果---------------------------

24print("同步步时间统计中……")

25urls = [

26"https://www.python.org",

27"https://www.yahoo.com",

28"https://github.com",

29]

30start_time =time.time()

31forurlinurls:

32f(url)

33print("\033[32;1m同步cost:\033[0m",time.time()-start_time)

通过gevent实现单线程下的多socket并发

服务端:

1importsys

2importsocket

3importtime

4importgevent

5fromgeventimportsocket,monkey

6monkey.patch_all()

7

8defserver(port):

9s = socket.socket()

10s.bind(('0.0.0.0', port))

11s.listen(500)

12whileTrue:

13cli, addr = s.accept()# 每个连接起一个协程

14gevent.spawn(handle_request, cli)

15

16defhandle_request(conn):

17try:

18whileTrue:

19data = conn.recv(1024)

20print("recv:", data)

21conn.send(data)

22ifnotdata:

23conn.shutdown(socket.SHUT_WR)# 类似break

24exceptExceptionasex:

25print(ex)

26finally:

27conn.close()

28if__name__ =='__main__':

29server(8001)

客户端:

1import socket

2

3HOST = 'localhost'# The remote host

4PORT = 8001# The same port as used by the server

5s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

6s.connect((HOST, PORT))

7while True:

8msg = bytes(input(">>> "),encoding="utf8")

9s.sendall(msg)

10data = s.recv(1024)

11#print(data)

12

13print('Received', repr(data))# 内置方法repr:格式化输出

14s.close()

声明:本文系网络转载,版权归原作者所有。如涉及版权,请联系删除!

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180910B09CNG00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券