原文地址: https://blog.csdn.net/fgf00/article/details/52790360 编辑:智能算法,欢迎关注! 上期我们一起学习了python中的线程的相关知识
今天我们继续深入,一起学习python中的进程和协程相关知识。
目录
1. 多进程
2. 协程
多进程:进程之间是独立的, python的线程是用的操作系统的原生线程、python的进程也是用的操作系统的原生进程。 原生进程是由操作系统去维护的,python只是通过C代码库去起了一个进程,真正进程的管理还是通过操作系统去完成的。 操作系统的进程管理是没有全局解释器锁的,进程只是是独立的,根本不需要锁的概念。
进程:资源的集合,至少包含一个线程 python使用多核运算,使用python多进程
多进程和多线程的使用基本是一样的
1import multiprocessing
2muitiprocessing.Process
1import multiprocessing
2import threading
3import time
4
5def thread_run(i,n):
6 print("在进程%s的线程%s"%(i,n))
7
8def run(i):
9 print("进程:%s "%i)
10 time.sleep(1)
11 for n in range(2):
12 t = threading.Thread(target=thread_run,args=(i,n))
13 t.start()
14
15if __name__ == '__main__': # 这个必须要有
16 for i in range(4):
17 p = multiprocessing.Process(target=run,args=(i,))
18 p.start()
如果我想取我的进程号,怎么去取呢?
1from multiprocessing import Process
2import os
3
4def info(title): # 打印进程信息
5 print(title)
6 print('module name:', __name__) # 模块名
7 print('parent process:', os.getppid()) # 父进程ID
8 print('process id:', os.getpid()) # 进程ID
9 print("\n")
10
11def f(name):
12 info('\033[31;1mcalled from child process function f\033[0m') # 打印子进程信息
13 print('hello', name)
14
15if __name__ == '__main__':
16 info('\033[32;1mmain process line\033[0m') # 打印当前进程信息
17 p = Process(target=f, args=('FGF',)) # 子进程
18 p.start()
19 # p.join()
前面提到进程间内存是独立的,但是想要访问,怎么办呢? 有下面几种方式:(万变不离其宗,需要个中间件(翻译))
1from multiprocessing import Process, Queue
2
3def f(qq):
4 qq.put([42, None, 'hello']) # 子进程中放数据
5
6if __name__ == '__main__':
7 q = Queue() # 定义一个Queue
8 p = Process(target=f, args=(q,))
9 p.start() # 启动子进程
10 print(q.get()) # 主进程获取数据并打印
11 p.join()
如果把线程queue传给子进程,传不了,那么父进程的Queue是怎么传递的? 看上去像数据共享,实际上是克隆了一个Queue,把自己的Queue克隆了一份交给了子进程。 但是为了数据共享,子进程会把Queue pickle序列化到一个中间的地方,中间位置再把数据反序列化给其他进程。
1from multiprocessing import Process, Pipe
2
3def f(conn):
4 conn.send([42, None, 'hello from child'])
5 conn.send([42, None, 'hello from child2'])
6 print("from parent:",conn.recv())
7 conn.close()
8
9if __name__ == '__main__':
10 parent_conn, child_conn = Pipe() # 名字自定义
11 p = Process(target=f, args=(child_conn,))
12 p.start()
13 print(parent_conn.recv()) # prints [42, None, 'hello from child']
14 print(parent_conn.recv()) # prints [42, None, 'hello from child2']
15 parent_conn.send("[42, None, 'hello']") # prints "[42, None, 'hello']"
16 p.join()
1from multiprocessing import Process,Manager
2import os
3
4def f(dict1,list1):
5 dict1[os.getpid()] = os.getpid() # 往字典里放当前PID
6 list1.append(os.getpid()) # 往列表里放当前PID
7 print(list1)
8
9if __name__ == "__main__":
10 with Manager() as manager:
11 d = manager.dict() # 生成一个字典,可在多个进程间共享和传递
12 l = manager.list(range(5)) #生成一个列表,可在多个进程间共享和传递
13 p_list = [] # 存进程列表
14 for i in range(10):
15 p = Process(target=f,args=(d,l))
16 p.start()
17 p_list.append(p)
18 for res in p_list: # 等待结果
19 res.join()
20 print('\n%s' %d)
要不要加锁呢,不用加锁,Managers默认就帮你处理了,内部有锁控制。
1from multiprocessing import Process, Lock
2
3def f(l, i):
4 l.acquire() # acquire一把锁
5 try:
6 print('hello world', i)
7 finally:
8 l.release()
9
10if __name__ == '__main__':
11 lock = Lock() # 生成锁实例
12 for num in range(10):
13 Process(target=f, args=(lock, num)).start()
因为屏幕共享,会出现打印乱的问题。所以加锁
创建一个子进程就是克隆一份父进程空间给子进程,开销非常大。假如父进程空间1G,创建几个子进程内存空间就占满了,所以有进程池的限制。 同一时间有多少进程在运行。 线程是没有线程池的,(你可以自己搞:通过信号量搞线程池)
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有两个方法:
1def Foo(i):
2 time.sleep(2)
3 print("\033[31min process %s\033[0m"%os.getpid())
4 return i
5
6def Bar(arg):
7 print("--> ecex done:", arg, os.getpid()) # 回调
8 # 回调函数:通过PID,可见是主进程调用的,不是子进程调用的
9
10if __name__ == "__main__": # windows下面必须有这句
11 pool = Pool(processes=4) # 允许进程池同时放入4个进程
12 print("主进程:%s\n%s"%(os.getpid(),'*'*22))
13
14 for i in range(10):
15 pool.apply_async(func=Foo, args=(i,), callback=Bar) # 回调,参数为前面函数的返回结果
16 # pool.apply(func=Foo, args=(i,)) 串行
17 # pool.apply_async(func=Foo, args=(i,)) 并行
18 pool.close() # 一定先关闭进程池再join等待已运行的结束,自己试试区别
19 pool.join() # 进程池中进程执行完毕后在关闭。如果注释,那么程序直接关闭
协程,又称微线程,纤程。英文名Coroutine。协程是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
线程的切换,会保存到CPU的寄存器里。 CPU感觉不到协程的存在,协程是用户自己控制的。
之前通过yield做的生产者消费者模型,就是协程,在单线程下实现并发效果。
协程的好处:
缺点:
1import time
2
3def consumer(name):
4 print("--->starting eating baozi...")
5 while True:
6 new_baozi = yield
7 print("[%s] is eating baozi %s" % (name,new_baozi))
8 # time.sleep(1)
9
10def producer():
11 r = con.__next__()
12 r = con2.__next__()
13 n = 0
14 while n < 5:
15 n +=1
16 print("\033[32;1m[producer]\033[0m is making baozi %s" %n )
17 con.send(n)
18 con2.send(n)
19 time.sleep(1)
20
21if __name__ == '__main__':
22 con = consumer("c1") # 第一次调用只是生成器,next的时候才回生成
23 con2 = consumer("c2")
24 p = producer()
为了保证并发效果,在什么时候切换呢?遇到IO操作就切换。 但什么时候切回去呢?IO操作完了就切回去,但是程序是怎么实现的呢?
1from greenlet import greenlet
2
3def test1():
4 print(12)
5 gr2.switch()
6 print(34)
7 gr2.switch()
8
9def test2():
10 print(56)
11 gr1.switch()
12 print(78)
13
14gr1 = greenlet(test1)
15gr2 = greenlet(test2)
16gr1.switch()
Greenlet 手动切换;Gevent 自动切换,封装了Greenlet Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
1import gevent
2
3def foo():
4 print("Running in foo")
5 gevent.sleep(2)
6 print("swich to foo again")
7# 来回切换,直到sleep结束
8def bar():
9 print("Running in bar")
10 gevent.sleep(1)
11 print("swich back to bar")
12
13def func3():
14 print("Running in func3")
15 gevent.sleep(0) # 只触发一次切换操作
16 print("swich func3 again")
17
18gevent.joinall([
19 gevent.spawn(foo), # 生成
20 gevent.spawn(bar),
21 gevent.spawn(func3),
22])
1from urllib import request
2import gevent, time
3# 注意!:Gevent检测不到urllib的io操作,还是串行的,让它知道就需要打补丁
4from gevent import monkey
5monkey.patch_all() # 把当前程序的所有IO操作给我做单独的做上标记
6
7def f(url):
8 print("Get %s" %url)
9 resp = request.urlopen(url)
10 data = resp.read()
11 # with open("url.html", 'wb') as f:
12 # f.write(data)
13 print("%d bytes received from %s" %(len(data), url))
14
15print("异步时间统计中……") # 协程实现
16async_start_time = time.time()
17gevent.joinall([
18 gevent.spawn(f, "https://www.python.org"),
19 gevent.spawn(f, "https://www.yahoo.com"),
20 gevent.spawn(f, "https://github.com"),
21])
22print("\033[32;1m异步cost:\033[0m",time.time()-async_start_time)
23#------------------------以下只为对比效果---------------------------
24print("同步步时间统计中……")
25urls = [
26 "https://www.python.org",
27 "https://www.yahoo.com",
28 "https://github.com",
29]
30start_time = time.time()
31for url in urls:
32 f(url)
33print("\033[32;1m同步cost:\033[0m",time.time()-start_time)
1import sys
2import socket
3import time
4import gevent
5from gevent import socket,monkey
6monkey.patch_all()
7
8def server(port):
9 s = socket.socket()
10 s.bind(('0.0.0.0', port))
11 s.listen(500)
12 while True:
13 cli, addr = s.accept() # 每个连接起一个协程
14 gevent.spawn(handle_request, cli)
15
16def handle_request(conn):
17 try:
18 while True:
19 data = conn.recv(1024)
20 print("recv:", data)
21 conn.send(data)
22 if not data:
23 conn.shutdown(socket.SHUT_WR) # 类似break
24 except Exception as ex:
25 print(ex)
26 finally:
27 conn.close()
28if __name__ == '__main__':
29 server(8001)
客户端:
1import socket
2
3HOST = 'localhost' # The remote host
4PORT = 8001 # The same port as used by the server
5s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
6s.connect((HOST, PORT))
7while True:
8 msg = bytes(input(">>> "),encoding="utf8")
9 s.sendall(msg)
10 data = s.recv(1024)
11 #print(data)
12
13 print('Received', repr(data)) # 内置方法repr:格式化输出
14s.close()
声明:本文系网络转载,版权归原作者所有。如涉及版权,请联系删除!