首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python3 与 C# 并发编程之~ 进程篇上

Python3 与 C# 并发编程之~ 进程篇上

作者头像
逸鹏
发布2018-08-14 14:31:48
5430
发布2018-08-14 14:31:48
举报
文章被收录于专栏:逸鹏说道逸鹏说道

上次说了很多Linux下进程相关知识,这边不再复述,下面来说说Python的并发编程,如有错误欢迎提出~

如果遇到听不懂的可以看上一次的文章:

1.并发编程~先导篇(上)

2.并发编程~先导篇(下)

Python3 与 C# 并发编程之~ 上篇Net

1.进程篇

官方文档:https://docs.python.org/3/library/multiprocessing.html

1.1.进程(Process)

Python的进程创建非常方便,看个案例:(这种方法通用,fork只适用于Linux系)

import os
# 注意一下,导入的是Process不是process(Class是大写开头)
from multiprocessing import Process

def test(name):
    print("[子进程-%s]PID:%d,PPID:%d" % (name, os.getpid(), os.getppid()))

def main():
    print("[父进程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
    p = Process(target=test, args=("萌萌哒", )) # 单个元素的元组表达别忘了(x,)
    p.start()
    p.join()  # 父进程回收子进程资源(内部调用了wait系列方法)

if __name__ == '__main__':
    main()

运行结果:

[父进程]PID:25729,PPID:23434
[子进程-萌萌哒]PID:25730,PPID:25729

创建子进程时,传入一个执行函数和参数,用start()方法来启动进程即可

join()方法是父进程回收子进程的封装(主要是回收僵尸子进程(点我))

其他参数可以参考源码 or 文档,贴一下源码的 init方法:

def__init__(self,group=None,target=None,name=None,args=(),kwargs={},*,daemon=None)

扩展: name:为当前进程实例的别名

  1. p.is_alive() 判断进程实例p是否还在执行
  2. p.terminate() 终止进程(发 SIGTERM信号)

上面的案例如果用OOP来实现就是这样:(如果不指定方法,默认调Run方法)

import os
from multiprocessing import Process

class My_Process(Process):
    # 重写了Proce类的Init方法
    def __init__(self, name):
        self.__name = name
        Process.__init__(self)  # 调用父类方法

    # 重写了Process类的run()方法
    def run(self):
        print("[子进程-%s]PID:%d,PPID:%d" % (self.__name, os.getpid(),
                                          os.getppid()))

def main():
    print("[父进程]PID:%d,PPID:%d" % (os.getpid(), os.getppid()))
    p = My_Process("萌萌哒") # 如果不指定方法,默认调Run方法
    p.start()
    p.join()  # 父进程回收子进程资源(内部调用了wait系列方法)


if __name__ == '__main__':
    main()

1.1.源码拓展

现在说说里面的一些门道(只像用的可以忽略)

新版本的封装可能多层,这时候可以看看Python3.3.X系列(这个算是Python3早期版本了,很多代码都暴露出来,比较明了直观)

multiprocessing.process.py

# 3.4.x开始,Process有了一个BaseProcess
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/process.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/process.py
def join(self, timeout=None):
    '''一直等到子进程over'''
    self._check_closed()
    # 断言(False就触发异常,提示就是后面的内容
    # 开发中用的比较多,部署的时候可以python3 -O xxx 去除所以断言
    assert self._parent_pid == os.getpid(), "只能 join 一个子进程"
    assert self._popen is not None, "只能加入一个已启动的进程"
    res = self._popen.wait(timeout) # 本质就是用了我们之前讲的wait系列
    if res is not None:
        _children.discard(self) # 销毁子进程

multiprocessing.popen_fork.py

# 3.4.x开始,在popen_fork文件中(以前是multiprocessing.forking.py)
# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/popen_fork.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/popen_fork.py
def wait(self, timeout=None):
    if self.returncode is None:
        # 设置超时的一系列处理
        if timeout is not None:
            from multiprocessing.connection import wait
            if not wait([self.sentinel], timeout):
                return None
        # 核心操作
        return self.poll(os.WNOHANG if timeout == 0.0 else 0)
    return self.returncode

# 回顾一下上次说的:os.WNOHANG - 如果没有子进程退出,则不阻塞waitpid()调用
def poll(self, flag=os.WNOHANG):
    if self.returncode is None:
        try:
            # 他的内部调用了waitpid
            pid, sts = os.waitpid(self.pid, flag)
        except OSError as e:
            # 子进程尚未创建
            # e.errno == errno.ECHILD == 10
            return None
        if pid == self.pid:
            if os.WIFSIGNALED(sts):
                self.returncode = -os.WTERMSIG(sts)
            else:
                assert os.WIFEXITED(sts), "Status is {:n}".format(sts)
                self.returncode = os.WEXITSTATUS(sts)
    return self.returncode

关于断言的简单说明:(别泛滥)

如果条件为真,它什么都不做,反之它触发一个带可选错误信息的AssertionError

def test(a, b):
    assert b != 0, "哥哥,分母不能为0啊"
    return a / b

def main():
    test(1, 0)

if __name__ == '__main__':
    main()

结果:

Traceback (most recent call last):
  File "0.assert.py", line 11, in <module>
    main()
  File "0.assert.py", line 7, in main
    test(1, 0)
  File "0.assert.py", line 2, in test
    assert b != 0, "哥哥,分母不能为0啊"
AssertionError: 哥哥,分母不能为0啊

运行的时候可以指定 -O参数来忽略所以 assert,eg:

python3-O0.assert.py

Traceback (most recent call last):
  File "0.assert.py", line 11, in <module>
    main()
  File "0.assert.py", line 7, in main
    test(1, 0)
  File "0.assert.py", line 3, in test
    return a / b
ZeroDivisionError: division by zero

扩展:

https://docs.python.org/3/library/unittest.html

https://www.cnblogs.com/shangren/p/8038935.html


1.2.进程池

多个进程就不需要自己手动去管理了,有Pool来帮你完成,先看个案例:

import os
import time
from multiprocessing import Pool  # 首字母大写

def test(name):
    print("[子进程-%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
    time.sleep(1)

def main():
    print("[父进程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
    p = Pool(5) # 设置最多5个进程(不设置就默认为CPU核数)
    for i in range(10):
        # 异步执行
        p.apply_async(test, args=(i, )) # 同步用apply(如非必要不建议用)
    p.close() # 关闭池,不再加入新任务
    p.join() # 等待所有子进程执行完毕回收资源(join可以指定超时时间,eg:`p.join(1)`)
    print("over")

if __name__ == '__main__':
    main()

图示:(join可以指定超时时间,eg: p.join(1)

调用 join()之前必须先调用 close(),调用 close()之后就不能继续添加新的 Process


1.3.源码拓展

验证一下Pool的默认大小是CPU的核数,看源码:

multiprocessing.pool.py

# https://github.com/python/cpython/blob/3.7/Lib/multiprocessing/pool.py
# https://github.com/lotapp/cpython3/tree/master/Lib/multiprocessing/pool.py
class Pool(object):
    def __init__(self, processes=指定的进程数,...):
        if processes is None:
            processes = os.cpu_count() or 1 # os.cpu_count() ~ CPU的核数

源码里面 apply_async方法,是有回调函数(callback)的

def apply_async(self,func,args=(),kwds={},callback=None,error_callback=None):
    if self._state != RUN:
        raise ValueError("Pool not running")
    result = ApplyResult(self._cache, callback, error_callback)
    self._taskqueue.put(([(result._job, 0, func, args, kwds)], None))
    return result

来看个例子:(和JQ很像)

import os
import time
from multiprocessing import Pool  # 首字母大写

def test(name):
    print("[子进程%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
    time.sleep(1)
    return name

def error_test(name):
    print("[子进程%s]PID=%d,PPID=%d" % (name, os.getpid(), os.getppid()))
    raise Exception("[子进程%s]啊,我挂了~" % name)

def callback(result):
    """成功之后的回调函数"""
    print("[子进程%s]执行完毕" % result)  # 没有返回值就为None

def error_callback(msg):
    """错误之后的回调函数"""
    print(msg)

def main():
    print("[父进程]PID=%d,PPID=%d" % (os.getpid(), os.getppid()))
    p = Pool()  # CPU默认核数
    for i in range(5):
        # 搞2个出错的看看
        if i > 2:
            p.apply_async(
                error_test,
                args=(i, ),
                callback=callback,
                error_callback=error_callback)  # 异步执行
        else:
            # 异步执行,成功后执行callback函数(有点像jq)
            p.apply_async(test, args=(i, ), callback=callback)
    p.close()  # 关闭池,不再加入新任务
    p.join()  # 等待所有子进程执行完毕回收资源
    print("over")

if __name__ == '__main__':
    main()

输出:

[父进程]PID=12348,PPID=10999
[子进程0]PID=12349,PPID=12348
[子进程2]PID=12351,PPID=12348
[子进程1]PID=12350,PPID=12348
[子进程3]PID=12352,PPID=12348
[子进程4]PID=12352,PPID=12348
[子进程3]啊,我挂了~
[子进程4]啊,我挂了~
[子进程0]执行完毕
[子进程2]执行完毕
[子进程1]执行完毕
over

常用的就普及完了,这些比较简单,过几天有下篇继续深入

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-08-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 我为Net狂 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.进程篇
    • 1.1.进程(Process)
      • 1.1.源码拓展
        • 1.2.进程池
          • 1.3.源码拓展
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档