Python3 与 C# 并发编程之~ 进程篇下

1.5.进程间通信~PIPE管道通信

这个比较有意思,看个案例:

from multiprocessing import Process, Pipe

def test(w):
    w.send("[子进程]老爸,老妈回来记得喊我一下~")
    msg = w.recv()
    print(msg)

def main():
    r, w = Pipe()
    p1 = Process(target=test, args=(w, ))
    p1.start()
    msg = r.recv()
    print(msg)
    r.send("[父进程]滚犊子,赶紧写作业,不然我得跪方便面!")
    p1.join()

if __name__ == '__main__':
    main()

结果:

老爸,老妈回来记得喊我一下~
滚犊子,赶紧写作业,不然我得跪方便面!

multiprocessing.Pipe源码分析

按照道理应该子进程自己写完自己读了,和上次讲得不一样啊?不急,先看看源码:

# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/context.py
def Pipe(self, duplex=True):
    '''返回由管道连接的两个连接对象'''
    from .connection import Pipe
    return Pipe(duplex)

看看 connection.Pipe方法的定义部分,是不是双向通信就看你是否设置 duplex=True

# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/connection.py
if sys.platform != 'win32':
    def Pipe(duplex=True):
        '''返回管道两端的一对连接对象'''
        if duplex:
            # 双工内部其实是socket系列(下次讲)
            s1, s2 = socket.socketpair()
            s1.setblocking(True)
            s2.setblocking(True)
            c1 = Connection(s1.detach())
            c2 = Connection(s2.detach())
        else:
            # 这部分就是我们上次讲的pipe管道
            fd1, fd2 = os.pipe()
            c1 = Connection(fd1, writable=False)
            c2 = Connection(fd2, readable=False)
        return c1, c2
else: 
    def Pipe(duplex=True):
        # win平台的一系列处理
        ......
        c1 = PipeConnection(h1, writable=duplex)
        c2 = PipeConnection(h2, readable=duplex)
        return c1, c2

通过源码知道了,原来双工是通过socket搞的啊~

再看个和原来一样效果的案例:(不用关来关去的了,方便!)

from multiprocessing import Process, Pipe

def test(w):
    # 只能写
    w.send("[子进程]老爸,咱们完了,老妈一直在门口~")

def main():
    r, w = Pipe(duplex=False)
    p1 = Process(target=test, args=(w, ))
    p1.start() # 你把这个放在join前面就直接死锁了
    msg = r.recv() # 只能读
    print(msg)
    p1.join()

if __name__ == '__main__':
    main()

输出:(可以思考下为什么 start换个位置就死锁,提示: 阻塞读写

[子进程]老爸,咱们完了,老妈一直在门口~

再举个 Pool的例子,咱们就进入今天的重点了:

from multiprocessing import Pipe, Pool

def proc_test1(conn):
    conn.send("[小明]小张,今天哥们要见一女孩,你陪我呗,我24h等你回复哦~")
    msg = conn.recv()
    print(msg)

def proc_test2(conn):
    msg = conn.recv()
    print(msg)
    conn.send("[小张]不去,万一被我帅气的外表迷倒就坑了~")

def main():
    conn1, conn2 = Pipe()
    p = Pool()
    p.apply_async(proc_test1, (conn1, ))
    p.apply_async(proc_test2, (conn2, ))
    p.close()  # 关闭池,不再接收新任务
    p.join()  # 等待回收,必须先关才能join,不然会异常

if __name__ == '__main__':
    main()

输出:

[小明]小张,今天哥们要见一女孩,你陪我呗,我24h等你回复哦~
[小张]不去,万一被我帅气的外表迷倒就坑了~

pool.join源码分析

看看源码就理解了:看看Pool的join是啥情况?看源码:

# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/pool.py
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/pool.py
def join(self):
    util.debug('joining pool')
    if self._state == RUN:
        # 没关闭就join,这边就会抛出一个异常
        raise ValueError("Pool is still running")
    elif self._state not in (CLOSE, TERMINATE):
        raise ValueError("In unknown state")
    self._worker_handler.join()
    self._task_handler.join()
    self._result_handler.join()
    for p in self._pool:
        p.join() # 循环join回收

在pool的 __init__的方法中,这几个属性:

self._processes = processes # 指定的进程数
self._pool = [] # 列表
self._repopulate_pool() # 给列表append内容的方法

将池进程的数量增加到指定的数量,join的时候会使用这个列表

def _repopulate_pool(self):
    # 指定进程数-当前进程数,差几个补几个
    for i in range(self._processes - len(self._pool)):
        w = self.Process(target=worker,
                         args=(self._inqueue, self._outqueue,
                               self._initializer,
                               self._initargs, self._maxtasksperchild,
                               self._wrap_exception)
                        )
        self._pool.append(w) # 重点来了
        w.name = w.name.replace('Process', 'PoolWorker')
        w.daemon = True # pool退出后,通过pool创建的进程都会退出
        w.start()
        util.debug('added worker')

1.5.进程间通信~Queue管道通信(常用)

一步步的设局,从底层的的 pipe()-> os.pipe-> PIPE,现在终于到 Queue了,心酸啊,明知道上面两个项目

里面基本上不会用,但为了你们能看懂源码,说了这么久 %>_<%其实以后当我们从 Queue说到 MQRPC之后,现在

讲得这些进程间通信( IPC)也基本上不会用了,但本质你得清楚,我尽量多分析点源码,这样你们以后看开源项目压力会很小

欢迎批评指正~

引入案例

from multiprocessing import Process, Queue

def test(q):
    q.put("[子进程]老爸,我出去嗨了")
    print(q.get())

def main():
    q = Queue()
    p = Process(target=test, args=(q, ))
    p.start()
    msg = q.get()
    print(msg)
    q.put("[父进程]去吧比卡丘~")
    p.join()

if __name__ == '__main__':
    main()

输出:( getput默认是阻塞等待的)

[子进程]老爸,我出去嗨了
[父进程]去吧比卡丘~

源码拓展

先看看 Queue的初始化方法:(不指定大小就是最大队列数)

# 队列类型,使用PIPE,缓存,线程
class Queue(object):
    # ctx = multiprocessing.get_context("xxx")
    # 上下文总共3种:spawn、fork、forkserver(扩展部分会提一下)
    def __init__(self, maxsize=0, *, ctx):
        # 默认使用最大容量
        if maxsize <= 0:
            from .synchronize import SEM_VALUE_MAX as maxsize
        self._maxsize = maxsize  # 指定队列大小
        # 创建了一个PIPE匿名管道(单向)
        self._reader, self._writer = connection.Pipe(duplex=False)
        # `multiprocessing/synchronize.py > Lock`
        self._rlock = ctx.Lock()  # 进程锁(读)【非递归】
        self._opid = os.getpid()  # 获取PID
        if sys.platform == 'win32':
            self._wlock = None
        else:
            self._wlock = ctx.Lock()  # 进程锁(写)【非递归】
        # Semaphore信号量通常用于保护容量有限的资源
        # 控制信号量,超了就异常
        self._sem = ctx.BoundedSemaphore(maxsize)
        # 不忽略PIPE管道破裂的错误
        self._ignore_epipe = False 
        # 线程相关操作
        self._after_fork()
        # 向`_afterfork_registry`字典中注册
        if sys.platform != 'win32':
            register_after_fork(self, Queue._after_fork)

关于 getput是阻塞的问题,看下源码探探究竟:

q.get():收消息

def get(self, block=True, timeout=None):
    # 默认情况是阻塞(lock加锁)
    if block and timeout is None:
        with self._rlock:
            res = self._recv_bytes()
        self._sem.release()  # 信号量+1
    else:
        if block:
            deadline = time.monotonic() + timeout
        # 超时抛异常
        if not self._rlock.acquire(block, timeout):
            raise Empty
        try:
            if block:
                timeout = deadline - time.monotonic()
                # 不管有没有内容都去读,超时就抛异常
                if not self._poll(timeout):
                    raise Empty
            elif not self._poll():
                raise Empty
            # 接收字节数据作为字节对象
            res = self._recv_bytes()
            self._sem.release()  # 信号量+1
        finally:
            # 释放锁
            self._rlock.release()
    # 释放锁后,重新序列化数据
    return _ForkingPickler.loads(res)

queue.put():发消息

def put(self, obj, block=True, timeout=None):
        # 如果Queue已经关闭就抛异常
        assert not self._closed, "Queue {0!r} has been closed".format(self)
        # 记录信号量的锁
        if not self._sem.acquire(block, timeout):
            raise Full  # 超过数量,抛个异常
        # 条件变量允许一个或多个线程等待,直到另一个线程通知它们
        with self._notempty:
            if self._thread is None:
                self._start_thread()
            self._buffer.append(obj)
            self._notempty.notify()

非阻塞 get_nowaitput_nowait本质其实也是调用了 getput方法:

def get_nowait(self):
    return self.get(False)

def put_nowait(self, obj):
    return self.put(obj, False)

进程间通信1

说这么多不如来个例子看看:

from multiprocessing import Queue

def main():
    q = Queue(3)  # 只能 put 3条消息
    q.put([1, 2, 3, 4])  # put一个List类型的消息
    q.put({"a": 1, "b": 2})  # put一个Dict类型的消息
    q.put({1, 2, 3, 4})  # put一个Set类型的消息

    try:
        # 不加timeout,就一直阻塞,等消息队列有空位才能发出去
        q.put("再加条消息呗", timeout=2)
    # Full(Exception)是空实现,你可以直接用Exception
    except Exception:
        print("消息队列已满,队列数%s,当前存在%s条消息" % (q._maxsize, q.qsize()))

    try:
        # 非阻塞,不能put就抛异常
        q.put_nowait("再加条消息呗")  # 相当于q.put(obj,False)
    except Exception:
        print("消息队列已满,队列数%s,当前存在%s条消息" % (q._maxsize, q.qsize()))

    while not q.empty():
        print("队列数:%s,当前存在%s条消息 内容%s" % (q._maxsize, q.qsize(), q.get_nowait()))

    print("队列数:%s,当前存在:%s条消息" % (q._maxsize, q.qsize()))

if __name__ == '__main__':
    main()

输出:

消息队列已满,队列数3,当前存在3条消息
消息队列已满,队列数3,当前存在3条消息
队列数:3,当前存在3条消息 内容[1, 2, 3, 4]
队列数:3,当前存在2条消息 内容{'a': 1, 'b': 2}
队列数:3,当前存在1条消息 内容{1, 2, 3, 4}
队列数:3,当前存在:0条消息

补充说明一下:

  1. q._maxsize 队列数(尽量不用 _开头的属性和方法)
  2. q.qsize()查看当前队列中存在几条消息
  3. q.full()查看是否满了
  4. q.empty()查看是否为空

再看个简单点的子进程间通信:(铺垫demo)

import os
import time
from multiprocessing import Process, Queue

def pro_test1(q):
    print("[子进程1]PPID=%d,PID=%d,GID=%d"%(os.getppid(), os.getpid(), os.getgid()))
    q.put("[子进程1]小明,今晚撸串不?")

    # 设置一个简版的重试机制(三次重试)
    for i in range(3):
        if not q.empty():
            print(q.get())
            break
        else:
            time.sleep((i + 1) * 2)  # 第一次1s,第二次4s,第三次6s

def pro_test2(q):
    print("[子进程2]PPID=%d,PID=%d,GID=%d"%(os.getppid(), os.getpid(), os.getgid()))
    print(q.get())
    time.sleep(4)  # 模拟一下网络延迟
    q.put("[子进程2]不去,我今天约了妹子")

def main():
    queue = Queue()
    p1 = Process(target=pro_test1, args=(queue, ))
    p2 = Process(target=pro_test2, args=(queue, ))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    main()

输出:( time python35.queue2.py

[子进程1]PPID=15220,PID=15221,GID=1000
[子进程2]PPID=15220,PID=15222,GID=1000
[子进程1]小明,今晚撸串不?
[子进程2]不去,我今天约了妹子

real    0m6.087s
user    0m0.053s
sys    0m0.035s

进程间通信2

多进程基本上都是用 pool,可用上面说的 Queue方法怎么报错了?

import os
import time
from multiprocessing import Pool, Queue

def error_callback(msg):
    print(msg)

def pro_test1(q):
    print("[子进程1]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                           os.getgid()))
    q.put("[子进程1]小明,今晚撸串不?")

    # 设置一个简版的重试机制(三次重试)
    for i in range(3):
        if not q.empty():
            print(q.get())
            break
        else:
            time.sleep((i + 1) * 2)  # 第一次1s,第二次4s,第三次6s

def pro_test2(q):
    print("[子进程2]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                           os.getgid()))
    print(q.get())
    time.sleep(4)  # 模拟一下网络延迟
    q.put("[子进程2]不去,我今天约了妹子")

def main():
    print("[父进程]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                          os.getgid()))
    queue = Queue()
    p = Pool()
    p.apply_async(pro_test1, args=(queue, ), error_callback=error_callback)
    p.apply_async(pro_test2, args=(queue, ), error_callback=error_callback)
    p.close()
    p.join()

if __name__ == '__main__':
    main()

输出:(队列对象不能在父进程与子进程间通信)

[父进程]PPID=4223,PID=32170,GID=1000
Queue objects should only be shared between processes through inheritance
Queue objects should only be shared between processes through inheritance

real    0m0.183s
user    0m0.083s
sys    0m0.012s

下面会详说,先看一下正确方式:(队列换了一下,其他都一样 Manager().Queue()

import os
import time
from multiprocessing import Pool, Manager

def error_callback(msg):
    print(msg)

def pro_test1(q):
    print("[子进程1]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                           os.getgid()))
    q.put("[子进程1]小明,今晚撸串不?")

    # 设置一个简版的重试机制(三次重试)
    for i in range(3):
        if not q.empty():
            print(q.get())
            break
        else:
            time.sleep((i + 1) * 2)  # 第一次1s,第二次4s,第三次6s

def pro_test2(q):
    print("[子进程2]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                           os.getgid()))
    print(q.get())
    time.sleep(4)  # 模拟一下网络延迟
    q.put("[子进程2]不去,我今天约了妹子")

def main():
    print("[父进程]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                          os.getgid()))
    queue = Manager().Queue()
    p = Pool()
    p.apply_async(pro_test1, args=(queue, ), error_callback=error_callback)
    p.apply_async(pro_test2, args=(queue, ), error_callback=error_callback)
    p.close()
    p.join()

if __name__ == '__main__':
    main()

输出:

[父进程]PPID=4223,PID=31329,GID=1000
[子进程1]PPID=31329,PID=31335,GID=1000
[子进程2]PPID=31329,PID=31336,GID=1000
[子进程1]小明,今晚撸串不?
[子进程2]不去,我今天约了妹子

real    0m6.134s
user    0m0.133s
sys    0m0.035s

进程拓展

官方参考:https://docs.python.org/3/library/multiprocessing.html

1.上下文系

  1. spawn:(Win默认,Linux下也可以用【>=3.4】)
    1. 父进程启动一个新的python解释器进程。
    2. 子进程只会继承运行进程对象run()方法所需的那些资源。
    3. 不会继承父进程中不必要的文件描述符和句柄。
    4. 与使用fork或forkserver相比,使用此方法启动进程相当慢。
    5. 可在Unix和Windows上使用。Windows上的默认设置。
  2. fork:(Linux下默认)
    1. 父进程用于os.fork()分叉Python解释器。
    2. 子进程在开始时与父进程相同(这时候内部变量之类的还没有被修改)
    3. 父进程的所有资源都由子进程继承(用到多线程的时候可能有些问题)
    4. 仅适用于Unix。Unix上的默认值。
  3. forkserver:(常用)
    1. 当程序启动并选择forkserver start方法时,将启动服务器进程。
    2. 从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。
    3. fork服务器进程是单线程的,因此它可以安全使用os.fork()。没有不必要的资源被继承。
    4. 可在Unix平台上使用,支持通过Unix管道传递文件描述符。

这块官方文档很详细,贴下官方的2个案例:

通过 multiprocessing.set_start_method(xxx)来设置启动的上下文类型

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn') # 不要过多使用
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

输出:( set_start_method不要过多使用)

hello

real    0m0.407s
user    0m0.134s
sys        0m0.012s

如果你把设置启动上下文注释掉:(消耗的总时间少了很多)

real    0m0.072s
user    0m0.057s
sys        0m0.016s

也可以通过 multiprocessing.get_context(xxx)获取指定类型的上下文

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

输出:( get_context在Python源码里用的比较多,so=>也建议大家这么用)

hello

real    0m0.169s
user    0m0.146s
sys    0m0.024s

从结果来看,总耗时也少了很多


2.日记系列

说下日记相关的事情:

先看下 multiprocessing里面的日记记录:

# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/context.py
def log_to_stderr(self, level=None):
    '''打开日志记录并添加一个打印到stderr的处理程序'''
    from .util import log_to_stderr
    return log_to_stderr(level)

更多 Loging模块内容可以看官方文档:https://docs.python.org/3/library/logging.html

这个是内部代码,看看即可:

# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/util.py
def log_to_stderr(level=None):
    '''打开日志记录并添加一个打印到stderr的处理程序'''
    # 全局变量默认是False
    global _log_to_stderr
    import logging

    # 日记记录转换成文本
    formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
    # 一个处理程序类,它将已适当格式化的日志记录写入流
    handler = logging.StreamHandler()  # 此类不会关闭流,因为用到了sys.stdout|sys.stderr
    # 设置格式:'[%(levelname)s/%(processName)s] %(message)s'
    handler.setFormatter(formatter)

    # 返回`multiprocessing`专用的记录器
    logger = get_logger()
    # 添加处理程序
    logger.addHandler(handler)

    if level:
        # 设置日记级别
        logger.setLevel(level)
    # 现在log是输出到stderr的
    _log_to_stderr = True
    return _logger

Logging之前也有提过,可以看看:https://www.cnblogs.com/dotnetcrazy/p/9333792.html#2.装饰器传参的扩展(可传可不传)

来个案例:

import logging
from multiprocessing import Process, log_to_stderr

def test():
    print("test")

def start_log():
    # 把日记输出定向到sys.stderr中
    logger = log_to_stderr()
    # 设置日记记录级别
    # 敏感程度:DEBUG、INFO、WARN、ERROR、CRITICAL
    print(logging.WARN == logging.WARNING)  # 这两个是一样的
    level = logging.INFO
    logger.setLevel(level)  # 设置日记级别(一般都是WARN)

    # 自定义输出
    # def log(self, level, msg, *args, **kwargs):
    logger.log(level, "我是通用格式")  # 通用,下面的内部也是调用的这个
    logger.info("info 测试")
    logger.warning("warning 测试")
    logger.error("error 测试")

def main():
    start_log()
    # 做的操作都会被记录下来
    p = Process(target=test)
    p.start()
    p.join()

if __name__ == '__main__':
    main()

输出:

True
[INFO/MainProcess] 我是通用格式
[INFO/MainProcess] info 测试
[WARNING/MainProcess] warning 测试
[ERROR/MainProcess] error 测试
[INFO/Process-1] child process calling self.run()
test
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down

3.进程5态

之前忘记说了~现在快结尾了,补充一下进程5态:(来个草图)

原文发布于微信公众号 - 我为Net狂(dotNetCrazy)

原文发表时间:2018-08-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏极客日常

kubernetes源码阅读笔记:理清 kube-apiserver 的源码主线

我最近开始研究 kubernetes 源码,希望将阅读笔记记录下来,分享阅读思路和心得,更好的理解 kubernetes,这是第一篇,从 kube-apiser...

45130
来自专栏Java架构师学习

成为顶尖程序员不得不经历的面试题

一、数据结构与算法基础 · 说一下几种常见的排序算法和分别的复杂度。 · 用Java写一个冒泡排序算法 · 描述一下链式存储结构。 · 如何遍历一棵二叉树? ·...

461110
来自专栏黑泽君的专栏

day53_BOS项目_05

定区可以将取派员、分区、客户信息关联到一起。 页面:WEB-INF/pages/base/decidedzone.jsp

9240
来自专栏DannyHoo的专栏

iOS开发中内存泄漏检测工具--MLeaksFinder

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u010105969/article/details/...

39720
来自专栏酷玩时刻

PC 微信扫码登陆

网站应用微信登录是基于OAuth2.0协议标准构建的微信OAuth2.0授权登录系统。进一步了解OAuth2.0-----理解OAuth2.0 官方介绍资料

1.9K40
来自专栏编程思想之路

WiFiAp探究实录--功能实现与源码分析

Android虐我千百遍,我待Android如初恋。 ——————编辑于2017-08-02——————— wifi热点说的是wifiAp相...

1.7K90
来自专栏java达人

使用Redis做MyBatis的二级缓存

使用Redis做MyBatis的二级缓存  通常为了减轻数据库的压力,我们会引入缓存。在Dao查询数据库之前,先去缓存中找是否有要找的数据,如果有则用缓存中的数...

46750
来自专栏Laoqi's Linux运维专列

python3–内置模块

53860
来自专栏晓晨的专栏

ASP.NET Core 依赖注入(DI)简介

65640
来自专栏有趣的django

35.Django2.0文档

第四章 模板  1.标签 (1)if/else {% if %} 标签检查(evaluate)一个变量,如果这个变量为真(即,变量存在,非空,不是布尔值假),系...

594100

扫码关注云+社区

领取腾讯云代金券