Python3 与 C# 并发编程之~ 进程实战篇

1.6.进程间状态共享

应该尽量避免进程间状态共享,但需求在那,所以还是得研究,官方推荐了两种方式:

1.共享内存( Value or Array

之前说过 Queue:在 Process之间使用没问题,用到 Pool,就使用 Manager().xxxValueArray,就不太一样了:

看看源码:(Manager里面的Array和Process共享的Array不是一个概念,而且也没有同步机制)

# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/managers.pyclass Value(object):    def __init__(self, typecode, value, lock=True):        self._typecode = typecode        self._value = value    def get(self):        return self._value    def set(self, value):        self._value = value    def __repr__(self):        return '%s(%r, %r)' % (type(self).__name__, self._typecode, self._value)    value = property(get, set) # 给value设置get和set方法(和value的属性装饰器一样效果)def Array(typecode, sequence, lock=True):    return array.array(typecode, sequence)

Process为例看看怎么用:

from multiprocessing import Process, Value, Arraydef proc_test1(value, array):    print("子进程1", value.value)    array[0] = 10    print("子进程1", array[:])def proc_test2(value, array):    print("子进程2", value.value)    array[1] = 10    print("子进程2", array[:])def main():    try:        value = Value("d", 3.14)  # d 类型,相当于C里面的double        array = Array("i", range(10))  # i 类型,相当于C里面的int        print(type(value))        print(type(array))        p1 = Process(target=proc_test1, args=(value, array))        p2 = Process(target=proc_test2, args=(value, array))        p1.start()        p2.start()        p1.join()        p2.join()        print("父进程", value.value)  # 获取值        print("父进程", array[:])  # 获取值    except Exception as ex:        print(ex)    else:        print("No Except")if __name__ == '__main__':    main()

输出:( ValueArray进程|线程安全的)

<class 'multiprocessing.sharedctypes.Synchronized'><class 'multiprocessing.sharedctypes.SynchronizedArray'>子进程1 3.14子进程1 [10, 1, 2, 3, 4, 5, 6, 7, 8, 9]子进程2 3.14子进程2 [10, 10, 2, 3, 4, 5, 6, 7, 8, 9]父进程 3.14父进程 [10, 10, 2, 3, 4, 5, 6, 7, 8, 9]No Except

类型方面的对应关系:

typecode_to_type = {    'c': ctypes.c_char,    'u': ctypes.c_wchar,    'b': ctypes.c_byte,    'B': ctypes.c_ubyte,    'h': ctypes.c_short,    'H': ctypes.c_ushort,    'i': ctypes.c_int,    'I': ctypes.c_uint,    'l': ctypes.c_long,    'L': ctypes.c_ulong,    'q': ctypes.c_longlong,    'Q': ctypes.c_ulonglong,    'f': ctypes.c_float,    'd': ctypes.c_double}

这两个类型其实是 ctypes类型,更多的类型可以去 multiprocessing.sharedctypes查看,来张图:

回头解决 GIL的时候会用到 C系列或者 Go系列的共享库(讲线程的时候会说)


关于进程安全的补充说明:对于原子性操作就不用说,铁定安全,但注意一下 i+=1并不是原子性操作:

from multiprocessing import Process, Valuedef proc_test1(value):    for i in range(1000):        value.value += 1def main():    value = Value("i", 0)    p_list = [Process(target=proc_test1, args=(value, )) for i in range(5)]    # 批量启动    for i in p_list:        i.start()    # 批量资源回收    for i in p_list:        i.join()    print(value.value)if __name__ == '__main__':    main()

输出:(理论上应该是:5×1000=5000)

2153

稍微改一下才行:(进程安全:只是提供了安全的方法,并不是什么都不用你操心了

# 通用方法def proc_test1(value):    for i in range(1000):        if value.acquire():            value.value += 1        value.release()# 官方案例:(Lock可以使用with托管)def proc_test1(value):    for i in range(1000):        with value.get_lock():            value.value += 1# 更多可以查看:`sharedctypes.SynchronizedBase` 源码

输出:(关于锁这块,后面讲线程的时候会详说,看看就好【语法的确比C#麻烦点】)

5000

看看源码:(之前探讨如何优雅的杀死子进程,其中就有一种方法使用了 Value

def Value(typecode_or_type, *args, lock=True, ctx=None):    '''返回Value的同步包装器'''    obj = RawValue(typecode_or_type, *args)    if lock is False:        return obj    # 默认支持Lock    if lock in (True, None):        ctx = ctx or get_context() # 获取上下文        lock = ctx.RLock() # 获取递归锁    if not hasattr(lock, 'acquire'):         raise AttributeError("%r has no method 'acquire'" % lock)    # 一系列处理    return synchronized(obj, lock, ctx=ctx)def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None):    '''返回RawArray的同步包装器'''    obj = RawArray(typecode_or_type, size_or_initializer)    if lock is False:        return obj    # 默认是支持Lock的    if lock in (True, None):        ctx = ctx or get_context() # 获取上下文        lock = ctx.RLock()  # 递归锁属性    # 查看是否有acquire属性    if not hasattr(lock, 'acquire'):        raise AttributeError("%r has no method 'acquire'" % lock)    return synchronized(obj, lock, ctx=ctx)

扩展部分可以查看这篇文章:http://blog.51cto.com/11026142/1874807


2.服务器进程( Manager

官方文档:https://docs.python.org/3/library/multiprocessing.html#managers

有一个服务器进程负责维护所有的对象,而其他进程连接到该进程,通过代理对象操作服务器进程当中的对象

通过返回的经理 Manager()将支持类型 list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue

举个简单例子(后面还会再说):(本质其实就是 多个进程通过代理,共同操作服务端内容)

from multiprocessing import Pool, Managerdef test1(d, l):    d[1] = '1'    d['2'] = 2    d[0.25] = None    l.reverse()def test2(d, l):    print(d)    print(l)def main():    with Manager() as manager:        dict_test = manager.dict()        list_test = manager.list(range(10))        pool = Pool()        pool.apply_async(test1, args=(dict_test, list_test))        pool.apply_async(test2, args=(dict_test, list_test))        pool.close()        pool.join()if __name__ == '__main__':    main()

输出:

{1: '1', '2': 2, 0.25: None}[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络在不同计算机上的进程共享。但是,它们比使用共享内存慢(毕竟有了 “中介”

同步问题依然需要注意一下,举个例子体会一下:

from multiprocessing import Manager, Process, Lockdef test(dict1, lock):    for i in range(100):        with lock:  # 你可以把这句话注释掉,然后就知道为什么加了            dict1["year"] += 1def main():    with Manager() as m:        lock = Lock()        dict1 = m.dict({"year": 2000})        p_list = [Process(target=test, args=(dict1, lock)) for i in range(5)]        for i in p_list:            i.start()        for i in p_list:            i.join()        print(dict1)if __name__ == '__main__':    main()

扩展补充:

  1. multiprocessing.Lock是一个进程安全对象,因此您可以将其直接传递给子进程并在所有进程中安全地使用它。
  2. 大多数可变Python对象(如list,dict,大多数类)不能保证进程中安全,所以它们在进程间共享时需要使用 Manager
  3. 多进程模式的缺点是创建进程的代价大,在 Unix/Linux系统下,用 fork调用还行,在 Windows下创建进程开销巨大。

Manager这块官方文档很详细,可以看看:https://docs.python.org/3/library/multiprocessing.html#managers

WinServer的可以参考这篇 or 这篇埋坑记(Manager一般都是部署在Linux的,Win的客户端不影响)

扩展补充

还记得之前的:无法将multiprocessing.Queue对象传递给Pool方法吗?其实一般都是这两种方式解决的:

  1. 使用Manager需要生成另一个进程来托管Manager服务器。 并且所有获取/释放锁的调用都必须通过IPC发送到该服务器。
  2. 使用初始化程序在池创建时传递常规 multiprocessing.Queue()这将使 Queue实例在所有子进程中全局共享

再看一下Pool的 __init__方法:

# processes:进程数# initializer,initargs 初始化进行的操作# maxtaskperchild:每个进程执行task的最大数目# contex:上下文对象def __init__(self, processes=None, initializer=None, initargs=(),                 maxtasksperchild=None, context=None):

第一种方法不够轻量级,在讲案例前,稍微说下第二种方法:(也算把上面留下的悬念解了)

import osimport timefrom multiprocessing import Pool, Queuedef error_callback(msg):    print(msg)def pro_test1():    print("[子进程1]PPID=%d,PID=%d" % (os.getppid(), os.getpid()))    q.put("[子进程1]小明,今晚撸串不?")    # 设置一个简版的重试机制(三次重试)    for i in range(3):        if not q.empty():            print(q.get())            break        else:            time.sleep((i + 1) * 2)  # 第一次1s,第二次4s,第三次6sdef pro_test2():    print("[子进程2]PPID=%d,PID=%d" % (os.getppid(), os.getpid()))    print(q.get())    time.sleep(4)  # 模拟一下网络延迟    q.put("[子进程2]不去,我今天约了妹子")def init(queue):    global q    q = queuedef main():    print("[父进程]PPID=%d,PID=%d" % (os.getppid(), os.getpid()))    queue = Queue()    p = Pool(initializer=init, initargs=(queue, ))    p.apply_async(pro_test1, error_callback=error_callback)    p.apply_async(pro_test2, error_callback=error_callback)    p.close()    p.join()if __name__ == '__main__':    main()

输出:(就是在初始化Pool的时候,传了初始化执行的方法并传了参数alizer=init,initargs=(queue,))

[父进程]PPID=13157,PID=24864[子进程1]PPID=24864,PID=24865[子进程2]PPID=24864,PID=24866[子进程1]小明,今晚撸串不?[子进程2]不去,我今天约了妹子real    0m6.105suser    0m0.071ssys     0m0.042s

Win下亦通用(win下没有 os.getgid


1.7.分布式进程的案例

有了 1.6的基础,咱们来个例子练练:

BaseManager的缩略图:

服务器端代码:

from multiprocessing import Queuefrom multiprocessing.managers import BaseManagerdef main():    # 用来身份验证的    key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92"    get_zhang_queue = Queue()  # 小张消息队列    get_ming_queue = Queue()  # 小明消息队列    # 把Queue注册到网络上, callable参数关联了Queue对象    BaseManager.register("get_zhang_queue", callable=lambda: get_zhang_queue)    BaseManager.register("get_ming_queue", callable=lambda: get_ming_queue)    # 实例化一个Manager对象。绑定ip+端口, 设置验证秘钥    manager = BaseManager(address=("192.168.36.235", 5438), authkey=key)    # 运行serve    manager.get_server().serve_forever()if __name__ == '__main__':    main()

客户端代码1:

from multiprocessing.managers import BaseManagerdef main():    """客户端1"""    key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92"    # 注册对应方法的名字(从网络上获取Queue)    BaseManager.register("get_ming_queue")    BaseManager.register("get_zhang_queue")    # 实例化一个Manager对象。绑定ip+端口, 设置验证秘钥    m = BaseManager(address=("192.168.36.235", 5438), authkey=key)    # 连接到服务器    m.connect()    q1 = m.get_zhang_queue()  # 在自己队列里面留言    q1.put("[小张]小明,老大明天是不是去外地办事啊?")    q2 = m.get_ming_queue()  # 获取小明说的话    print(q2.get())if __name__ == '__main__':    main()

客户端代码2:

from multiprocessing.managers import BaseManagerdef main():    """客户端2"""    key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92"    # 注册对应方法的名字(从网络上获取Queue)    BaseManager.register("get_ming_queue")    BaseManager.register("get_zhang_queue")    # 实例化一个Manager对象。绑定ip+端口, 设置验证秘钥    m = BaseManager(address=("192.168.36.235", 5438), authkey=key)    # 连接到服务器    m.connect()    q1 = m.get_zhang_queue()  # 获取小张说的话    print(q1.get())    q2 = m.get_ming_queue()  # 在自己队列里面留言    q2.put("[小明]这几天咱们终于可以不加班了(>_<)")if __name__ == '__main__':    main()

输出图示:

服务器运行在Linux的测试:

其实还有一部分内容没说,明天得出去办点事,先到这吧,后面找机会继续带一下


参考文章:

进程共享的探讨:python-sharing-a-lock-between-processes

多进程锁的探讨:trouble-using-a-lock-with-multiprocessing-pool-pickling-error

JoinableQueue扩展:https://www.cnblogs.com/smallmars/p/7093603.html

Python多进程编程:https://www.cnblogs.com/kaituorensheng/p/4445418.html

有深度但需要辩证看的两篇文章:

跨进程对象共享:http://blog.ftofficer.com/2009/12/python-multiprocessing-3-about-queue

关于Queue:http://blog.ftofficer.com/2009/12/python-multiprocessing-2-object-sharing-across-process

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏CSDN技术头条

一图读懂JVM架构解析

本文阐述了JVM的构成和组件,配图清晰易懂,是学习Java开发者的入门必读文章。 每个Java开发人员都知道字节码经由JRE(Java运行时环境)执行。但他们或...

2258
来自专栏新智元

Python 3.7.0 来了!

【新智元导读】Python官网静悄悄地发布了一条大消息:正式发布 Python 3.7.0!同时发布的还有Python 3.6.6稳定版。官网刚刚更新了可下载文...

2150
来自专栏腾讯IVWEB团队的专栏

nodejs 中错误捕获的一些最佳实践

本文为翻译文章,原文比较长,感觉也有点啰嗦,所以根据个人理解猜测梳理出本文。

6460
来自专栏orientlu

FreeRTOS 消息队列

上面这几中方式中, 除了消息通知, 其他几种实现都是基于消息队列。消息队列作为主要的通信方式, 支持在任务间, 任务和中断间传递消息内容。 这一章介绍 Fre...

4842
来自专栏Coco的专栏

【基础进阶】URL详解与URL编码

2029
来自专栏北京马哥教育

linux bash环境变量简单总结

一.环境变量简介 Linux是一个多用户的操作系统。每个用户登录系统后,都会有一个专用的运行环境。通常每个用户默认的环境都 是相同的,这个默认环境实际...

3546
来自专栏Python爬虫与算法进阶

Python中的小魔法(二)

01 函数 局部变量 x = 66 def func(x): print('x等于', x) x = 6 print('局部变量x改变...

2864
来自专栏IMWeb前端团队

nodejs中错误捕获的一些最佳实践

本文作者:IMWeb yisbug 原文出处:IMWeb社区 未经同意,禁止转载 本文内容大部分来自 https://www.joyent.com/...

2226
来自专栏Linyb极客之路

从Java内存模型角度理解安全初始化

如大家所知,Java代码在编译和运行的过程中会对代码有很多意想不到且不受开发人员控制的操作:

1113
来自专栏王亚昌的专栏

strace命令解析

strace常用于跟踪和分析进程执行时中系统调用和耗时以及占用cpu的比例,常用的格式如下:

1371

扫码关注云+社区

领取腾讯云代金券