首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python3 与 C# 并发编程之~ 进程篇下

Python3 与 C# 并发编程之~ 进程篇下

作者头像
逸鹏
发布2018-08-14 14:33:06
1.4K0
发布2018-08-14 14:33:06
举报
文章被收录于专栏:逸鹏说道逸鹏说道

1.5.进程间通信~PIPE管道通信

这个比较有意思,看个案例:

from multiprocessing import Process, Pipe

def test(w):
    w.send("[子进程]老爸,老妈回来记得喊我一下~")
    msg = w.recv()
    print(msg)

def main():
    r, w = Pipe()
    p1 = Process(target=test, args=(w, ))
    p1.start()
    msg = r.recv()
    print(msg)
    r.send("[父进程]滚犊子,赶紧写作业,不然我得跪方便面!")
    p1.join()

if __name__ == '__main__':
    main()

结果:

老爸,老妈回来记得喊我一下~
滚犊子,赶紧写作业,不然我得跪方便面!
multiprocessing.Pipe源码分析

按照道理应该子进程自己写完自己读了,和上次讲得不一样啊?不急,先看看源码:

# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/context.py
def Pipe(self, duplex=True):
    '''返回由管道连接的两个连接对象'''
    from .connection import Pipe
    return Pipe(duplex)

看看 connection.Pipe方法的定义部分,是不是双向通信就看你是否设置 duplex=True

# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/connection.py
if sys.platform != 'win32':
    def Pipe(duplex=True):
        '''返回管道两端的一对连接对象'''
        if duplex:
            # 双工内部其实是socket系列(下次讲)
            s1, s2 = socket.socketpair()
            s1.setblocking(True)
            s2.setblocking(True)
            c1 = Connection(s1.detach())
            c2 = Connection(s2.detach())
        else:
            # 这部分就是我们上次讲的pipe管道
            fd1, fd2 = os.pipe()
            c1 = Connection(fd1, writable=False)
            c2 = Connection(fd2, readable=False)
        return c1, c2
else: 
    def Pipe(duplex=True):
        # win平台的一系列处理
        ......
        c1 = PipeConnection(h1, writable=duplex)
        c2 = PipeConnection(h2, readable=duplex)
        return c1, c2

通过源码知道了,原来双工是通过socket搞的啊~

再看个和原来一样效果的案例:(不用关来关去的了,方便!)

from multiprocessing import Process, Pipe

def test(w):
    # 只能写
    w.send("[子进程]老爸,咱们完了,老妈一直在门口~")

def main():
    r, w = Pipe(duplex=False)
    p1 = Process(target=test, args=(w, ))
    p1.start() # 你把这个放在join前面就直接死锁了
    msg = r.recv() # 只能读
    print(msg)
    p1.join()

if __name__ == '__main__':
    main()

输出:(可以思考下为什么 start换个位置就死锁,提示: 阻塞读写

[子进程]老爸,咱们完了,老妈一直在门口~

再举个 Pool的例子,咱们就进入今天的重点了:

from multiprocessing import Pipe, Pool

def proc_test1(conn):
    conn.send("[小明]小张,今天哥们要见一女孩,你陪我呗,我24h等你回复哦~")
    msg = conn.recv()
    print(msg)

def proc_test2(conn):
    msg = conn.recv()
    print(msg)
    conn.send("[小张]不去,万一被我帅气的外表迷倒就坑了~")

def main():
    conn1, conn2 = Pipe()
    p = Pool()
    p.apply_async(proc_test1, (conn1, ))
    p.apply_async(proc_test2, (conn2, ))
    p.close()  # 关闭池,不再接收新任务
    p.join()  # 等待回收,必须先关才能join,不然会异常

if __name__ == '__main__':
    main()

输出:

[小明]小张,今天哥们要见一女孩,你陪我呗,我24h等你回复哦~
[小张]不去,万一被我帅气的外表迷倒就坑了~
pool.join源码分析

看看源码就理解了:看看Pool的join是啥情况?看源码:

# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/pool.py
# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/pool.py
def join(self):
    util.debug('joining pool')
    if self._state == RUN:
        # 没关闭就join,这边就会抛出一个异常
        raise ValueError("Pool is still running")
    elif self._state not in (CLOSE, TERMINATE):
        raise ValueError("In unknown state")
    self._worker_handler.join()
    self._task_handler.join()
    self._result_handler.join()
    for p in self._pool:
        p.join() # 循环join回收

在pool的 __init__的方法中,这几个属性:

self._processes = processes # 指定的进程数
self._pool = [] # 列表
self._repopulate_pool() # 给列表append内容的方法

将池进程的数量增加到指定的数量,join的时候会使用这个列表

def _repopulate_pool(self):
    # 指定进程数-当前进程数,差几个补几个
    for i in range(self._processes - len(self._pool)):
        w = self.Process(target=worker,
                         args=(self._inqueue, self._outqueue,
                               self._initializer,
                               self._initargs, self._maxtasksperchild,
                               self._wrap_exception)
                        )
        self._pool.append(w) # 重点来了
        w.name = w.name.replace('Process', 'PoolWorker')
        w.daemon = True # pool退出后,通过pool创建的进程都会退出
        w.start()
        util.debug('added worker')

1.5.进程间通信~Queue管道通信(常用)

一步步的设局,从底层的的 pipe()-> os.pipe-> PIPE,现在终于到 Queue了,心酸啊,明知道上面两个项目

里面基本上不会用,但为了你们能看懂源码,说了这么久 %>_<%其实以后当我们从 Queue说到 MQRPC之后,现在

讲得这些进程间通信( IPC)也基本上不会用了,但本质你得清楚,我尽量多分析点源码,这样你们以后看开源项目压力会很小

欢迎批评指正~

引入案例
from multiprocessing import Process, Queue

def test(q):
    q.put("[子进程]老爸,我出去嗨了")
    print(q.get())

def main():
    q = Queue()
    p = Process(target=test, args=(q, ))
    p.start()
    msg = q.get()
    print(msg)
    q.put("[父进程]去吧比卡丘~")
    p.join()

if __name__ == '__main__':
    main()

输出:( getput默认是阻塞等待的)

[子进程]老爸,我出去嗨了
[父进程]去吧比卡丘~
源码拓展

先看看 Queue的初始化方法:(不指定大小就是最大队列数)

# 队列类型,使用PIPE,缓存,线程
class Queue(object):
    # ctx = multiprocessing.get_context("xxx")
    # 上下文总共3种:spawn、fork、forkserver(扩展部分会提一下)
    def __init__(self, maxsize=0, *, ctx):
        # 默认使用最大容量
        if maxsize <= 0:
            from .synchronize import SEM_VALUE_MAX as maxsize
        self._maxsize = maxsize  # 指定队列大小
        # 创建了一个PIPE匿名管道(单向)
        self._reader, self._writer = connection.Pipe(duplex=False)
        # `multiprocessing/synchronize.py > Lock`
        self._rlock = ctx.Lock()  # 进程锁(读)【非递归】
        self._opid = os.getpid()  # 获取PID
        if sys.platform == 'win32':
            self._wlock = None
        else:
            self._wlock = ctx.Lock()  # 进程锁(写)【非递归】
        # Semaphore信号量通常用于保护容量有限的资源
        # 控制信号量,超了就异常
        self._sem = ctx.BoundedSemaphore(maxsize)
        # 不忽略PIPE管道破裂的错误
        self._ignore_epipe = False 
        # 线程相关操作
        self._after_fork()
        # 向`_afterfork_registry`字典中注册
        if sys.platform != 'win32':
            register_after_fork(self, Queue._after_fork)

关于 getput是阻塞的问题,看下源码探探究竟:

q.get():收消息

def get(self, block=True, timeout=None):
    # 默认情况是阻塞(lock加锁)
    if block and timeout is None:
        with self._rlock:
            res = self._recv_bytes()
        self._sem.release()  # 信号量+1
    else:
        if block:
            deadline = time.monotonic() + timeout
        # 超时抛异常
        if not self._rlock.acquire(block, timeout):
            raise Empty
        try:
            if block:
                timeout = deadline - time.monotonic()
                # 不管有没有内容都去读,超时就抛异常
                if not self._poll(timeout):
                    raise Empty
            elif not self._poll():
                raise Empty
            # 接收字节数据作为字节对象
            res = self._recv_bytes()
            self._sem.release()  # 信号量+1
        finally:
            # 释放锁
            self._rlock.release()
    # 释放锁后,重新序列化数据
    return _ForkingPickler.loads(res)

queue.put():发消息

def put(self, obj, block=True, timeout=None):
        # 如果Queue已经关闭就抛异常
        assert not self._closed, "Queue {0!r} has been closed".format(self)
        # 记录信号量的锁
        if not self._sem.acquire(block, timeout):
            raise Full  # 超过数量,抛个异常
        # 条件变量允许一个或多个线程等待,直到另一个线程通知它们
        with self._notempty:
            if self._thread is None:
                self._start_thread()
            self._buffer.append(obj)
            self._notempty.notify()

非阻塞 get_nowaitput_nowait本质其实也是调用了 getput方法:

def get_nowait(self):
    return self.get(False)

def put_nowait(self, obj):
    return self.put(obj, False)
进程间通信1

说这么多不如来个例子看看:

from multiprocessing import Queue

def main():
    q = Queue(3)  # 只能 put 3条消息
    q.put([1, 2, 3, 4])  # put一个List类型的消息
    q.put({"a": 1, "b": 2})  # put一个Dict类型的消息
    q.put({1, 2, 3, 4})  # put一个Set类型的消息

    try:
        # 不加timeout,就一直阻塞,等消息队列有空位才能发出去
        q.put("再加条消息呗", timeout=2)
    # Full(Exception)是空实现,你可以直接用Exception
    except Exception:
        print("消息队列已满,队列数%s,当前存在%s条消息" % (q._maxsize, q.qsize()))

    try:
        # 非阻塞,不能put就抛异常
        q.put_nowait("再加条消息呗")  # 相当于q.put(obj,False)
    except Exception:
        print("消息队列已满,队列数%s,当前存在%s条消息" % (q._maxsize, q.qsize()))

    while not q.empty():
        print("队列数:%s,当前存在%s条消息 内容%s" % (q._maxsize, q.qsize(), q.get_nowait()))

    print("队列数:%s,当前存在:%s条消息" % (q._maxsize, q.qsize()))

if __name__ == '__main__':
    main()

输出:

消息队列已满,队列数3,当前存在3条消息
消息队列已满,队列数3,当前存在3条消息
队列数:3,当前存在3条消息 内容[1, 2, 3, 4]
队列数:3,当前存在2条消息 内容{'a': 1, 'b': 2}
队列数:3,当前存在1条消息 内容{1, 2, 3, 4}
队列数:3,当前存在:0条消息

补充说明一下:

  1. q._maxsize 队列数(尽量不用 _开头的属性和方法)
  2. q.qsize()查看当前队列中存在几条消息
  3. q.full()查看是否满了
  4. q.empty()查看是否为空

再看个简单点的子进程间通信:(铺垫demo)

import os
import time
from multiprocessing import Process, Queue

def pro_test1(q):
    print("[子进程1]PPID=%d,PID=%d,GID=%d"%(os.getppid(), os.getpid(), os.getgid()))
    q.put("[子进程1]小明,今晚撸串不?")

    # 设置一个简版的重试机制(三次重试)
    for i in range(3):
        if not q.empty():
            print(q.get())
            break
        else:
            time.sleep((i + 1) * 2)  # 第一次1s,第二次4s,第三次6s

def pro_test2(q):
    print("[子进程2]PPID=%d,PID=%d,GID=%d"%(os.getppid(), os.getpid(), os.getgid()))
    print(q.get())
    time.sleep(4)  # 模拟一下网络延迟
    q.put("[子进程2]不去,我今天约了妹子")

def main():
    queue = Queue()
    p1 = Process(target=pro_test1, args=(queue, ))
    p2 = Process(target=pro_test2, args=(queue, ))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    main()

输出:( time python35.queue2.py

[子进程1]PPID=15220,PID=15221,GID=1000
[子进程2]PPID=15220,PID=15222,GID=1000
[子进程1]小明,今晚撸串不?
[子进程2]不去,我今天约了妹子

real    0m6.087s
user    0m0.053s
sys    0m0.035s

进程间通信2

多进程基本上都是用 pool,可用上面说的 Queue方法怎么报错了?

import os
import time
from multiprocessing import Pool, Queue

def error_callback(msg):
    print(msg)

def pro_test1(q):
    print("[子进程1]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                           os.getgid()))
    q.put("[子进程1]小明,今晚撸串不?")

    # 设置一个简版的重试机制(三次重试)
    for i in range(3):
        if not q.empty():
            print(q.get())
            break
        else:
            time.sleep((i + 1) * 2)  # 第一次1s,第二次4s,第三次6s

def pro_test2(q):
    print("[子进程2]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                           os.getgid()))
    print(q.get())
    time.sleep(4)  # 模拟一下网络延迟
    q.put("[子进程2]不去,我今天约了妹子")

def main():
    print("[父进程]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                          os.getgid()))
    queue = Queue()
    p = Pool()
    p.apply_async(pro_test1, args=(queue, ), error_callback=error_callback)
    p.apply_async(pro_test2, args=(queue, ), error_callback=error_callback)
    p.close()
    p.join()

if __name__ == '__main__':
    main()

输出:(队列对象不能在父进程与子进程间通信)

[父进程]PPID=4223,PID=32170,GID=1000
Queue objects should only be shared between processes through inheritance
Queue objects should only be shared between processes through inheritance

real    0m0.183s
user    0m0.083s
sys    0m0.012s

下面会详说,先看一下正确方式:(队列换了一下,其他都一样 Manager().Queue()

import os
import time
from multiprocessing import Pool, Manager

def error_callback(msg):
    print(msg)

def pro_test1(q):
    print("[子进程1]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                           os.getgid()))
    q.put("[子进程1]小明,今晚撸串不?")

    # 设置一个简版的重试机制(三次重试)
    for i in range(3):
        if not q.empty():
            print(q.get())
            break
        else:
            time.sleep((i + 1) * 2)  # 第一次1s,第二次4s,第三次6s

def pro_test2(q):
    print("[子进程2]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                           os.getgid()))
    print(q.get())
    time.sleep(4)  # 模拟一下网络延迟
    q.put("[子进程2]不去,我今天约了妹子")

def main():
    print("[父进程]PPID=%d,PID=%d,GID=%d" % (os.getppid(), os.getpid(),
                                          os.getgid()))
    queue = Manager().Queue()
    p = Pool()
    p.apply_async(pro_test1, args=(queue, ), error_callback=error_callback)
    p.apply_async(pro_test2, args=(queue, ), error_callback=error_callback)
    p.close()
    p.join()

if __name__ == '__main__':
    main()

输出:

[父进程]PPID=4223,PID=31329,GID=1000
[子进程1]PPID=31329,PID=31335,GID=1000
[子进程2]PPID=31329,PID=31336,GID=1000
[子进程1]小明,今晚撸串不?
[子进程2]不去,我今天约了妹子

real    0m6.134s
user    0m0.133s
sys    0m0.035s

进程拓展

官方参考:https://docs.python.org/3/library/multiprocessing.html

1.上下文系
  1. spawn:(Win默认,Linux下也可以用【>=3.4】)
    1. 父进程启动一个新的python解释器进程。
    2. 子进程只会继承运行进程对象run()方法所需的那些资源。
    3. 不会继承父进程中不必要的文件描述符和句柄。
    4. 与使用fork或forkserver相比,使用此方法启动进程相当慢。
    5. 可在Unix和Windows上使用。Windows上的默认设置。
  2. fork:(Linux下默认)
    1. 父进程用于os.fork()分叉Python解释器。
    2. 子进程在开始时与父进程相同(这时候内部变量之类的还没有被修改)
    3. 父进程的所有资源都由子进程继承(用到多线程的时候可能有些问题)
    4. 仅适用于Unix。Unix上的默认值。
  3. forkserver:(常用)
    1. 当程序启动并选择forkserver start方法时,将启动服务器进程。
    2. 从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程。
    3. fork服务器进程是单线程的,因此它可以安全使用os.fork()。没有不必要的资源被继承。
    4. 可在Unix平台上使用,支持通过Unix管道传递文件描述符。

这块官方文档很详细,贴下官方的2个案例:

通过 multiprocessing.set_start_method(xxx)来设置启动的上下文类型

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn') # 不要过多使用
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

输出:( set_start_method不要过多使用)

hello

real    0m0.407s
user    0m0.134s
sys        0m0.012s

如果你把设置启动上下文注释掉:(消耗的总时间少了很多)

real    0m0.072s
user    0m0.057s
sys        0m0.016s

也可以通过 multiprocessing.get_context(xxx)获取指定类型的上下文

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

输出:( get_context在Python源码里用的比较多,so=>也建议大家这么用)

hello

real    0m0.169s
user    0m0.146s
sys    0m0.024s

从结果来看,总耗时也少了很多


2.日记系列

说下日记相关的事情:

先看下 multiprocessing里面的日记记录:

# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/context.py
def log_to_stderr(self, level=None):
    '''打开日志记录并添加一个打印到stderr的处理程序'''
    from .util import log_to_stderr
    return log_to_stderr(level)

更多 Loging模块内容可以看官方文档:https://docs.python.org/3/library/logging.html

这个是内部代码,看看即可:

# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/util.py
def log_to_stderr(level=None):
    '''打开日志记录并添加一个打印到stderr的处理程序'''
    # 全局变量默认是False
    global _log_to_stderr
    import logging

    # 日记记录转换成文本
    formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
    # 一个处理程序类,它将已适当格式化的日志记录写入流
    handler = logging.StreamHandler()  # 此类不会关闭流,因为用到了sys.stdout|sys.stderr
    # 设置格式:'[%(levelname)s/%(processName)s] %(message)s'
    handler.setFormatter(formatter)

    # 返回`multiprocessing`专用的记录器
    logger = get_logger()
    # 添加处理程序
    logger.addHandler(handler)

    if level:
        # 设置日记级别
        logger.setLevel(level)
    # 现在log是输出到stderr的
    _log_to_stderr = True
    return _logger

Logging之前也有提过,可以看看:https://www.cnblogs.com/dotnetcrazy/p/9333792.html#2.装饰器传参的扩展(可传可不传)

来个案例:

import logging
from multiprocessing import Process, log_to_stderr

def test():
    print("test")

def start_log():
    # 把日记输出定向到sys.stderr中
    logger = log_to_stderr()
    # 设置日记记录级别
    # 敏感程度:DEBUG、INFO、WARN、ERROR、CRITICAL
    print(logging.WARN == logging.WARNING)  # 这两个是一样的
    level = logging.INFO
    logger.setLevel(level)  # 设置日记级别(一般都是WARN)

    # 自定义输出
    # def log(self, level, msg, *args, **kwargs):
    logger.log(level, "我是通用格式")  # 通用,下面的内部也是调用的这个
    logger.info("info 测试")
    logger.warning("warning 测试")
    logger.error("error 测试")

def main():
    start_log()
    # 做的操作都会被记录下来
    p = Process(target=test)
    p.start()
    p.join()

if __name__ == '__main__':
    main()

输出:

True
[INFO/MainProcess] 我是通用格式
[INFO/MainProcess] info 测试
[WARNING/MainProcess] warning 测试
[ERROR/MainProcess] error 测试
[INFO/Process-1] child process calling self.run()
test
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down

3.进程5态

之前忘记说了~现在快结尾了,补充一下进程5态:(来个草图)

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-08-11,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 我为Net狂 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.5.进程间通信~PIPE管道通信
    • multiprocessing.Pipe源码分析
      • pool.join源码分析
      • 1.5.进程间通信~Queue管道通信(常用)
        • 引入案例
          • 源码拓展
            • 进程间通信1
              • 进程间通信2
              • 进程拓展
                • 1.上下文系
                  • 2.日记系列
                    • 3.进程5态
                    相关产品与服务
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档