Python学习(十)---- python中的进程与协程

原文地址: https://blog.csdn.net/fgf00/article/details/52790360 编辑:智能算法,欢迎关注! 上期我们一起学习了python中的线程的相关知识

Python学习(九)---- python中的线程

今天我们继续深入,一起学习python中的进程和协程相关知识。

目录

1. 多进程

2. 协程

1 多进程

多进程:进程之间是独立的, python的线程是用的操作系统的原生线程、python的进程也是用的操作系统的原生进程。 原生进程是由操作系统去维护的,python只是通过C代码库去起了一个进程,真正进程的管理还是通过操作系统去完成的。 操作系统的进程管理是没有全局解释器锁的,进程只是是独立的,根本不需要锁的概念。

1.1 多进程的基本语法

进程:资源的集合,至少包含一个线程 python使用多核运算,使用python多进程

多进程和多线程的使用基本是一样的

1import multiprocessing
2muitiprocessing.Process
1import multiprocessing
 2import threading
 3import time
 4
 5def thread_run(i,n):
 6    print("在进程%s的线程%s"%(i,n))
 7
 8def run(i):
 9    print("进程:%s "%i)
10    time.sleep(1)
11    for n in range(2):
12        t = threading.Thread(target=thread_run,args=(i,n))
13        t.start()
14
15if __name__ == '__main__':  # 这个必须要有
16    for i in range(4):
17        p = multiprocessing.Process(target=run,args=(i,))
18        p.start()

如果我想取我的进程号,怎么去取呢?

1from multiprocessing import Process
 2import os
 3
 4def info(title):  # 打印进程信息
 5    print(title)
 6    print('module name:', __name__)  # 模块名
 7    print('parent process:', os.getppid())  # 父进程ID
 8    print('process id:', os.getpid())  # 进程ID
 9    print("\n")
10
11def f(name):
12    info('\033[31;1mcalled from child process function f\033[0m')  # 打印子进程信息
13    print('hello', name)
14
15if __name__ == '__main__':
16    info('\033[32;1mmain process line\033[0m')  # 打印当前进程信息
17    p = Process(target=f, args=('FGF',))  # 子进程
18    p.start()
19    # p.join()

1.2 进程间数据交互

前面提到进程间内存是独立的,但是想要访问,怎么办呢? 有下面几种方式:(万变不离其宗,需要个中间件(翻译))

  • 队列 Queues 使用方法跟threading里的queue差不多
1from multiprocessing import Process, Queue
 2
 3def f(qq):
 4    qq.put([42, None, 'hello'])  # 子进程中放数据
 5
 6if __name__ == '__main__':
 7    q = Queue()  # 定义一个Queue
 8    p = Process(target=f, args=(q,))
 9    p.start()  # 启动子进程
10    print(q.get())  # 主进程获取数据并打印
11    p.join()

如果把线程queue传给子进程,传不了,那么父进程的Queue是怎么传递的? 看上去像数据共享,实际上是克隆了一个Queue,把自己的Queue克隆了一份交给了子进程。 但是为了数据共享,子进程会把Queue pickle序列化到一个中间的地方,中间位置再把数据反序列化给其他进程。

  • 管道 Pipes 类似socket、如电话线,一人在这头,一人在那头
1from multiprocessing import Process, Pipe
 2
 3def f(conn):
 4    conn.send([42, None, 'hello from child'])
 5    conn.send([42, None, 'hello from child2'])
 6    print("from parent:",conn.recv())
 7    conn.close()
 8
 9if __name__ == '__main__':
10    parent_conn, child_conn = Pipe()  # 名字自定义
11    p = Process(target=f, args=(child_conn,))
12    p.start()
13    print(parent_conn.recv())  # prints [42, None, 'hello from child']
14    print(parent_conn.recv())  # prints [42, None, 'hello from child2']
15    parent_conn.send("[42, None, 'hello']") # prints "[42, None, 'hello']"
16    p.join()
  • 数据共享 Managers 上面两种方式只是实现了数据的传递,还没有实现数据的共享,如实现数据共享,就要用到Managers。
1from multiprocessing import Process,Manager
 2import os
 3
 4def f(dict1,list1):
 5    dict1[os.getpid()] = os.getpid()  # 往字典里放当前PID
 6    list1.append(os.getpid())  # 往列表里放当前PID
 7    print(list1)
 8
 9if __name__ == "__main__":
10    with Manager() as manager:
11        d = manager.dict()  # 生成一个字典,可在多个进程间共享和传递
12        l = manager.list(range(5)) #生成一个列表,可在多个进程间共享和传递
13        p_list = []  # 存进程列表
14        for i in range(10):
15            p = Process(target=f,args=(d,l))
16            p.start()
17            p_list.append(p)
18        for res in p_list:  # 等待结果
19            res.join()
20        print('\n%s' %d)

要不要加锁呢,不用加锁,Managers默认就帮你处理了,内部有锁控制。

  • 进程里面也有一个锁 进程不是内存独立的么,要锁还有毛用?来看一下:
1from multiprocessing import Process, Lock
 2
 3def f(l, i):
 4    l.acquire()  # acquire一把锁
 5    try:
 6        print('hello world', i)
 7    finally:
 8        l.release()
 9
10if __name__ == '__main__':
11    lock = Lock()  # 生成锁实例
12    for num in range(10):
13        Process(target=f, args=(lock, num)).start()

因为屏幕共享,会出现打印乱的问题。所以加锁

1.3 进程池

创建一个子进程就是克隆一份父进程空间给子进程,开销非常大。假如父进程空间1G,创建几个子进程内存空间就占满了,所以有进程池的限制。 同一时间有多少进程在运行。 线程是没有线程池的,(你可以自己搞:通过信号量搞线程池)

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

进程池中有两个方法:

  • apply: 同步、串行
  • apply_async: 异步、并行
1def Foo(i):
 2    time.sleep(2)
 3    print("\033[31min process %s\033[0m"%os.getpid())
 4    return i
 5
 6def Bar(arg):
 7    print("--> ecex done:", arg, os.getpid())  # 回调
 8    # 回调函数:通过PID,可见是主进程调用的,不是子进程调用的
 9
10if __name__ == "__main__":  # windows下面必须有这句
11    pool = Pool(processes=4)  # 允许进程池同时放入4个进程
12    print("主进程:%s\n%s"%(os.getpid(),'*'*22))
13
14    for i in range(10):
15        pool.apply_async(func=Foo, args=(i,), callback=Bar)  # 回调,参数为前面函数的返回结果
16        # pool.apply(func=Foo, args=(i,))        串行
17        # pool.apply_async(func=Foo, args=(i,)) 并行
18    pool.close()  # 一定先关闭进程池再join等待已运行的结束,自己试试区别
19    pool.join()  # 进程池中进程执行完毕后在关闭。如果注释,那么程序直接关闭

2 协程

2.1 协程介绍

协程,又称微线程,纤程。英文名Coroutine。协程是一种用户态的轻量级线程。

协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

线程的切换,会保存到CPU的寄存器里。 CPU感觉不到协程的存在,协程是用户自己控制的。

之前通过yield做的生产者消费者模型,就是协程,在单线程下实现并发效果。

协程的好处:

  • 无需线程上下文切换的开销
  • 无需数据操作锁定及同步的开销
  • 方便切换控制流,简化编程模型
  • 高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。

缺点:

  • 无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
  • 进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序

2.2 使用yield实现协程操作

1import time
 2
 3def consumer(name):
 4    print("--->starting eating baozi...")
 5    while True:
 6        new_baozi = yield
 7        print("[%s] is eating baozi %s" % (name,new_baozi))
 8        # time.sleep(1)
 9
10def producer():
11    r = con.__next__()
12    r = con2.__next__()
13    n = 0
14    while n < 5:
15        n +=1
16        print("\033[32;1m[producer]\033[0m is making baozi %s" %n )
17        con.send(n)
18        con2.send(n)
19        time.sleep(1)
20
21if __name__ == '__main__':
22    con = consumer("c1")  # 第一次调用只是生成器,next的时候才回生成
23    con2 = consumer("c2")
24    p = producer()

为了保证并发效果,在什么时候切换呢?遇到IO操作就切换。 但什么时候切回去呢?IO操作完了就切回去,但是程序是怎么实现的呢?

2.3 Greenlet 一个封装好的协程

1from greenlet import greenlet
 2
 3def test1():
 4    print(12)
 5    gr2.switch()
 6    print(34)
 7    gr2.switch()
 8
 9def test2():
10    print(56)
11    gr1.switch()
12    print(78)
13
14gr1 = greenlet(test1)
15gr2 = greenlet(test2)
16gr1.switch()

2.4 Gevent 自动切换

Greenlet 手动切换;Gevent 自动切换,封装了Greenlet Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

1import gevent
 2
 3def foo():
 4    print("Running in foo")
 5    gevent.sleep(2)
 6    print("swich to foo again")
 7# 来回切换,直到sleep结束
 8def bar():
 9    print("Running in bar")
10    gevent.sleep(1)
11    print("swich back to bar")
12
13def func3():
14    print("Running in func3")
15    gevent.sleep(0)  # 只触发一次切换操作
16    print("swich func3 again")
17
18gevent.joinall([
19    gevent.spawn(foo),  # 生成
20    gevent.spawn(bar),
21    gevent.spawn(func3),
22])
  • 协程gevent并发爬网页
1from urllib import request
 2import gevent, time
 3# 注意!:Gevent检测不到urllib的io操作,还是串行的,让它知道就需要打补丁
 4from gevent import monkey
 5monkey.patch_all()  # 把当前程序的所有IO操作给我做单独的做上标记
 6
 7def f(url):
 8    print("Get %s" %url)
 9    resp = request.urlopen(url)
10    data = resp.read()
11    # with open("url.html", 'wb') as f:
12    #     f.write(data)
13    print("%d bytes received from %s" %(len(data), url))
14
15print("异步时间统计中……")  # 协程实现
16async_start_time = time.time()
17gevent.joinall([
18    gevent.spawn(f, "https://www.python.org"),
19    gevent.spawn(f, "https://www.yahoo.com"),
20    gevent.spawn(f, "https://github.com"),
21])
22print("\033[32;1m异步cost:\033[0m",time.time()-async_start_time)
23#------------------------以下只为对比效果---------------------------
24print("同步步时间统计中……")
25urls = [
26    "https://www.python.org",
27    "https://www.yahoo.com",
28    "https://github.com",
29]
30start_time = time.time()
31for url in urls:
32    f(url)
33print("\033[32;1m同步cost:\033[0m",time.time()-start_time)
  • 通过gevent实现单线程下的多socket并发 服务端:
1import sys
 2import socket
 3import time
 4import gevent
 5from gevent import socket,monkey
 6monkey.patch_all()
 7
 8def server(port):
 9    s = socket.socket()
10    s.bind(('0.0.0.0', port))
11    s.listen(500)
12    while True:
13        cli, addr = s.accept()  # 每个连接起一个协程
14        gevent.spawn(handle_request, cli)
15
16def handle_request(conn):
17    try:
18        while True:
19            data = conn.recv(1024)
20            print("recv:", data)
21            conn.send(data)
22            if not data:
23                conn.shutdown(socket.SHUT_WR)  # 类似break
24    except Exception as  ex:
25        print(ex)
26    finally:
27        conn.close()
28if __name__ == '__main__':
29    server(8001)

客户端:

1import socket
 2
 3HOST = 'localhost'    # The remote host
 4PORT = 8001           # The same port as used by the server
 5s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 6s.connect((HOST, PORT))
 7while True:
 8    msg = bytes(input(">>> "),encoding="utf8")
 9    s.sendall(msg)
10    data = s.recv(1024)
11    #print(data)
12
13    print('Received', repr(data))  # 内置方法repr:格式化输出
14s.close()

声明:本文系网络转载,版权归原作者所有。如涉及版权,请联系删除!

原文发布于微信公众号 - 智能算法(AI_Algorithm)

原文发表时间:2018-09-10

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏信安之路

看我如何收集全网IP的whois信息

今天给大家分享几个脚本,看看如何收集全网whois信息。首先了解一下whois.py这个基本程序。

1520
来自专栏码生

django 学习笔记三

拦截到url 后,通过拦截到的URL和 request 做一些不同的响应(response)

1072
来自专栏nummy

Tornado入门(二)【异步和阻塞IO】

实时Web应用通常针对每个用户创建持久连接,对于传统的同步服务器,这意味着需要给每个用户单独创建一个线程,这样做的代价非常高。

852
来自专栏乐百川的学习频道

Flask 快速入门

Flask是一个Python编写的Web 微框架,让我们可以使用Python语言快速实现一个网站或Web服务。本文参考自Flask官方文档,大部分代码引用自官方...

32110
来自专栏数据结构与算法

22:紧急措施

22:紧急措施 总时间限制: 1000ms 内存限制: 65536kB描述 近日,一些热门网站遭受黑客入侵,这些网站的账号、密码及email的数据惨遭泄露。你...

3868
来自专栏前端萌媛的成长之路

JavaScript模块化发展

3283
来自专栏电光石火

运行jar包找不到主类

在正确编译好java程序之后,打包,然后在命令行输入java -jar ,却弹出找不到或无法加载主类。 一检查,我的path环境变量都是对的呀?jav...

4919
来自专栏Linyb极客之路

工作流引擎之activiti流程定义

ProcessDefinition(流程定义)就是一个流程的步骤说明,比如我们接下来要说的这个流程,申请人王三发起提交申请,李四作为部门经理进行审批,审...

1483
来自专栏贾老师の博客

常用 Bash Shell 整理

1014
来自专栏FreeBuf

Node.js中的内存泄漏分析

内存泄漏(Memory Leak)指由于疏忽或错误造成程序未能释放已经不再使用的内存的情况。如果内存泄漏的位置比较关键,那么随着处理的进行可能持有越来越多的无用...

4685

扫码关注云+社区

领取腾讯云代金券