Python3 与 C# 并发编程之~ 进程篇上

上次说了很多Linux下进程相关知识,这边不再复述,下面来说说Python的并发编程,如有错误欢迎提出~

如果遇到听不懂的可以看上一次的文章:

1.并发编程~先导篇(上)

2.并发编程~先导篇(下)

Python3 与 C# 并发编程之~ 上篇Net

1.进程篇

官方文档:https://docs.python.org/3/library/multiprocessing.html

1.1.进程(Process)

Python的进程创建非常方便,看个案例:(这种方法通用,fork只适用于Linux系)

import os
# 注意一下,导入的是Process不是process(Class是大写开头)
from multiprocessing import Process

def test(name):
    print("[子进程-%s]PID:%d,PPID:%d" % (name, os.getpid(), os.getppid()))

def main():
    print("[父进程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
    p = Process(target=test, args=("萌萌哒", )) # 单个元素的元组表达别忘了(x,)
    p.start()
    p.join()  # 父进程回收子进程资源(内部调用了wait系列方法)

if __name__ == '__main__':
    main()

运行结果:

[父进程]PID:25729,PPID:23434
[子进程-萌萌哒]PID:25730,PPID:25729

创建子进程时,传入一个执行函数和参数,用start()方法来启动进程即可

join()方法是父进程回收子进程的封装(主要是回收僵尸子进程(点我))

其他参数可以参考源码 or 文档,贴一下源码的 init方法:

def__init__(self,group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)

扩展: name:为当前进程实例的别名

  1. p.is_alive() 判断进程实例p是否还在执行
  2. p.terminate() 终止进程(发 SIGTERM信号)

上面的案例如果用OOP来实现就是这样:(如果不指定方法,默认调Run方法)

import os
from multiprocessing import Process

class My_Process(Process):
    # 重写了Proce类的Init方法
    def __init__(self, name):
        self.__name = name
        Process.__init__(self)  # 调用父类方法

    # 重写了Process类的run()方法
    def run(self):
        print("[子进程-%s]PID:%d,PPID:%d" % (self.__name, os.getpid(),
                                          os.getppid()))

def main():
    print("[父进程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
    p = My_Process("萌萌哒") # 如果不指定方法,默认调Run方法
    p.start()
    p.join()  # 父进程回收子进程资源(内部调用了wait系列方法)


if __name__ == '__main__':
    main()

1.1.源码拓展

现在说说里面的一些门道(只像用的可以忽略)

新版本的封装可能多层,这时候可以看看Python3.3.X系列(这个算是Python3早期版本了,很多代码都暴露出来,比较明了直观)

multiprocessing.process.py

# 3.4.x开始,Process有了一个BaseProcess
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/process.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/process.py
def join(self, timeout=None):
    '''一直等到子进程over'''
    self._check_closed()
    # 断言(False就触发异常,提示就是后面的内容
    # 开发中用的比较多,部署的时候可以python3 -O xxx 去除所以断言
    assert self._parent_pid == os.getpid(), "只能 join 一个子进程"
    assert self._popen is not None, "只能加入一个已启动的进程"
    res = self._popen.wait(timeout) # 本质就是用了我们之前讲的wait系列
    if res is not None:
        _children.discard(self) # 销毁子进程

multiprocessing.popen_fork.py

# 3.4.x开始,在popen_fork文件中(以前是multiprocessing.forking.py)
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/popen_fork.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/popen_fork.py
def wait(self, timeout=None):
    if self.returncode is None:
        # 设置超时的一系列处理
        if timeout is not None:
            from multiprocessing.connection import wait
            if not wait([self.sentinel], timeout):
                return None
        # 核心操作
        return self.poll(os.WNOHANG if timeout == 0.0 else 0)
    return self.returncode

# 回顾一下上次说的:os.WNOHANG - 如果没有子进程退出,则不阻塞waitpid()调用
def poll(self, flag=os.WNOHANG):
    if self.returncode is None:
        try:
            # 他的内部调用了waitpid
            pid, sts = os.waitpid(self.pid, flag)
        except OSError as e:
            # 子进程尚未创建
            # e.errno == errno.ECHILD == 10
            return None
        if pid == self.pid:
            if os.WIFSIGNALED(sts):
                self.returncode = -os.WTERMSIG(sts)
            else:
                assert os.WIFEXITED(sts), "Status is {:n}".format(sts)
                self.returncode = os.WEXITSTATUS(sts)
    return self.returncode

关于断言的简单说明:(别泛滥)

如果条件为真,它什么都不做,反之它触发一个带可选错误信息的AssertionError

def test(a, b):
    assert b != 0, "哥哥,分母不能为0啊"
    return a / b

def main():
    test(1, 0)

if __name__ == '__main__':
    main()

结果:

Traceback (most recent call last):
  File "0.assert.py", line 11, in <module>
    main()
  File "0.assert.py", line 7, in main
    test(1, 0)
  File "0.assert.py", line 2, in test
    assert b != 0, "哥哥,分母不能为0啊"
AssertionError: 哥哥,分母不能为0啊

运行的时候可以指定 -O参数来忽略所以 assert,eg:

python3-O0.assert.py

Traceback (most recent call last):
  File "0.assert.py", line 11, in <module>
    main()
  File "0.assert.py", line 7, in main
    test(1, 0)
  File "0.assert.py", line 3, in test
    return a / b
ZeroDivisionError: division by zero

扩展:

https://docs.python.org/3/library/unittest.html

https://www.cnblogs.com/shangren/p/8038935.html


1.2.进程池

多个进程就不需要自己手动去管理了,有Pool来帮你完成,先看个案例:

import os
import time
from multiprocessing import Pool  # 首字母大写

def test(name):
    print("[子进程-%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
    time.sleep(1)

def main():
    print("[父进程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
    p = Pool(5) # 设置最多5个进程(不设置就默认为CPU核数)
    for i in range(10):
        # 异步执行
        p.apply_async(test, args=(i, )) # 同步用apply(如非必要不建议用)
    p.close() # 关闭池,不再加入新任务
    p.join() # 等待所有子进程执行完毕回收资源(join可以指定超时时间,eg:`p.join(1)`)
    print("over")

if __name__ == '__main__':
    main()

图示:(join可以指定超时时间,eg: p.join(1)

调用 join()之前必须先调用 close(),调用 close()之后就不能继续添加新的 Process


1.3.源码拓展

验证一下Pool的默认大小是CPU的核数,看源码:

multiprocessing.pool.py

# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/pool.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/pool.py
class Pool(object):
    def __init__(self, processes=指定的进程数,...):
        if processes is None:
            processes = os.cpu_count() or 1 # os.cpu_count() ~ CPU的核数

源码里面 apply_async方法,是有回调函数(callback)的

def apply_async(self,func,args=(),kwds={},callback=None,error_callback=None):
    if self._state != RUN:
        raise ValueError("Pool not running")
    result = ApplyResult(self._cache, callback, error_callback)
    self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
    return result

来看个例子:(和JQ很像)

import os
import time
from multiprocessing import Pool  # 首字母大写

def test(name):
    print("[子进程%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
    time.sleep(1)
    return name

def error_test(name):
    print("[子进程%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
    raise Exception("[子进程%s]啊,我挂了~" % name)

def callback(result):
    """成功之后的回调函数"""
    print("[子进程%s]执行完毕" % result)  # 没有返回值就为None

def error_callback(msg):
    """错误之后的回调函数"""
    print(msg)

def main():
    print("[父进程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
    p = Pool()  # CPU默认核数
    for i in range(5):
        # 搞2个出错的看看
        if i > 2:
            p.apply_async(
                error_test,
                args=(i, ),
                callback=callback,
                error_callback=error_callback)  # 异步执行
        else:
            # 异步执行,成功后执行callback函数(有点像jq)
            p.apply_async(test, args=(i, ), callback=callback)
    p.close()  # 关闭池,不再加入新任务
    p.join()  # 等待所有子进程执行完毕回收资源
    print("over")

if __name__ == '__main__':
    main()

输出:

[父进程]PID=12348,PPID=10999
[子进程0]PID=12349,PPID=12348
[子进程2]PID=12351,PPID=12348
[子进程1]PID=12350,PPID=12348
[子进程3]PID=12352,PPID=12348
[子进程4]PID=12352,PPID=12348
[子进程3]啊,我挂了~
[子进程4]啊,我挂了~
[子进程0]执行完毕
[子进程2]执行完毕
[子进程1]执行完毕
over

常用的就普及完了,这些比较简单,过几天有下篇继续深入

原文发布于微信公众号 - 我为Net狂(dotNetCrazy)

原文发表时间:2018-08-06

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏流媒体

dll生成和使用

1072
来自专栏深度学习之tensorflow实战篇

Python 用OPEN读文件报错 ,路径以及r

Python 中 ‘unicodeescape’ codec can’t decode bytes in position XXX: trun错误解决方案 ...

3449
来自专栏机器学习从入门到成神

关于hibernate中对象的三种状态分析

一、首先Hibernate中对象的状态有三种:瞬态、游离态和持久态,三种状态转化的方法都是通过session来调用,瞬态到持久态的方法有save()、saveO...

2241
来自专栏GreenLeaves

一、源代码-面向CLR的编译器-托管模块-(元数据&IL代码)

本文脉络图如下: ? 1、CLR(Common Language Runtime)公共语言运行时简介 ? (1)、公共语言运行时是一种可由多种编程语言一起使用的...

31610
来自专栏java一日一条

关于 ASP.NET 内存缓存你需要知道的 10 点

缓存机制的主要目的是提高应用程序的性能。作为 ASP.NET 开发人员,你可能会意识到 ASP.NET Web 窗体以及 ASP.NET MVC 可以使用 Ca...

782
来自专栏difcareer的技术笔记

Dalvik虚拟机原理及Xposed hook原理

这块知识本身是挺多的,网上有对应的源码分析,本文尽量从不分析代码的角度来把原理阐述清楚。

2281
来自专栏欧阳大哥的轮子

深入iOS系统底层之CPU寄存器介绍

计算机是一种数据处理设备,它由CPU和内存以及外部设备组成。CPU负责数据处理,内存负责存储,外部设备负责数据的输入和输出,它们之间通过总线连接在一起。CPU内...

1843
来自专栏大内老A

跨域资源共享(CORS)在ASP.NET Web API中是如何实现的?

在《通过扩展让ASP.NET Web API支持W3C的CORS规范》中,我们通过自定义的HttpMessageHandler自行为ASP.NET Web AP...

31510
来自专栏GreenLeaves

Common.Logging源码解析一

Common.Logging是Apache下的一个开源日志接口组件,主要用于切换不同的日志库,因为当前流行的日志库有很多向log4j、log4net(log4j...

19210
来自专栏大内老A

WCF技术剖析之二十五: 元数据(Metadata)架构体系全景展现[WS标准篇]

元数据实际上是服务终结点的描述,终结点由地址(Address)、绑定(Binding)和契约(Contract)经典的ABC三要素组成。认真阅读过《WCF技术剖...

3999

扫码关注云+社区

领取腾讯云代金券