专栏首页逸鹏说道Python3 与 C# 并发编程之~ 进程实战篇
原创

Python3 与 C# 并发编程之~ 进程实战篇

1.6.进程间状态共享

应该尽量避免进程间状态共享,但需求在那,所以还是得研究,官方推荐了两种方式:

1.共享内存( Value or Array

之前说过 Queue:在 Process之间使用没问题,用到 Pool,就使用 Manager().xxxValueArray,就不太一样了:

看看源码:(Manager里面的Array和Process共享的Array不是一个概念,而且也没有同步机制)

# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/managers.pyclass Value(object):    def __init__(self, typecode, value, lock=True):        self._typecode = typecode        self._value = value    def get(self):        return self._value    def set(self, value):        self._value = value    def __repr__(self):        return '%s(%r, %r)' % (type(self).__name__, self._typecode, self._value)    value = property(get, set) # 给value设置get和set方法(和value的属性装饰器一样效果)def Array(typecode, sequence, lock=True):    return array.array(typecode, sequence)

Process为例看看怎么用:

from multiprocessing import Process, Value, Arraydef proc_test1(value, array):    print("子进程1", value.value)    array[0] = 10    print("子进程1", array[:])def proc_test2(value, array):    print("子进程2", value.value)    array[1] = 10    print("子进程2", array[:])def main():    try:        value = Value("d", 3.14)  # d 类型,相当于C里面的double        array = Array("i", range(10))  # i 类型,相当于C里面的int        print(type(value))        print(type(array))        p1 = Process(target=proc_test1, args=(value, array))        p2 = Process(target=proc_test2, args=(value, array))        p1.start()        p2.start()        p1.join()        p2.join()        print("父进程", value.value)  # 获取值        print("父进程", array[:])  # 获取值    except Exception as ex:        print(ex)    else:        print("No Except")if __name__ == '__main__':    main()

输出:( ValueArray进程|线程安全的)

<class 'multiprocessing.sharedctypes.Synchronized'><class 'multiprocessing.sharedctypes.SynchronizedArray'>子进程1 3.14子进程1 [10, 1, 2, 3, 4, 5, 6, 7, 8, 9]子进程2 3.14子进程2 [10, 10, 2, 3, 4, 5, 6, 7, 8, 9]父进程 3.14父进程 [10, 10, 2, 3, 4, 5, 6, 7, 8, 9]No Except

类型方面的对应关系:

typecode_to_type = {    'c': ctypes.c_char,    'u': ctypes.c_wchar,    'b': ctypes.c_byte,    'B': ctypes.c_ubyte,    'h': ctypes.c_short,    'H': ctypes.c_ushort,    'i': ctypes.c_int,    'I': ctypes.c_uint,    'l': ctypes.c_long,    'L': ctypes.c_ulong,    'q': ctypes.c_longlong,    'Q': ctypes.c_ulonglong,    'f': ctypes.c_float,    'd': ctypes.c_double}

这两个类型其实是 ctypes类型,更多的类型可以去 multiprocessing.sharedctypes查看,来张图:

回头解决 GIL的时候会用到 C系列或者 Go系列的共享库(讲线程的时候会说)


关于进程安全的补充说明:对于原子性操作就不用说,铁定安全,但注意一下 i+=1并不是原子性操作:

from multiprocessing import Process, Valuedef proc_test1(value):    for i in range(1000):        value.value += 1def main():    value = Value("i", 0)    p_list = [Process(target=proc_test1, args=(value, )) for i in range(5)]    # 批量启动    for i in p_list:        i.start()    # 批量资源回收    for i in p_list:        i.join()    print(value.value)if __name__ == '__main__':    main()

输出:(理论上应该是:5×1000=5000)

2153

稍微改一下才行:(进程安全:只是提供了安全的方法,并不是什么都不用你操心了

# 通用方法def proc_test1(value):    for i in range(1000):        if value.acquire():            value.value += 1        value.release()# 官方案例:(Lock可以使用with托管)def proc_test1(value):    for i in range(1000):        with value.get_lock():            value.value += 1# 更多可以查看:`sharedctypes.SynchronizedBase` 源码

输出:(关于锁这块,后面讲线程的时候会详说,看看就好【语法的确比C#麻烦点】)

5000

看看源码:(之前探讨如何优雅的杀死子进程,其中就有一种方法使用了 Value

def Value(typecode_or_type, *args, lock=True, ctx=None):    '''返回Value的同步包装器'''    obj = RawValue(typecode_or_type, *args)    if lock is False:        return obj    # 默认支持Lock    if lock in (True, None):        ctx = ctx or get_context() # 获取上下文        lock = ctx.RLock() # 获取递归锁    if not hasattr(lock, 'acquire'):         raise AttributeError("%r has no method 'acquire'" % lock)    # 一系列处理    return synchronized(obj, lock, ctx=ctx)def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None):    '''返回RawArray的同步包装器'''    obj = RawArray(typecode_or_type, size_or_initializer)    if lock is False:        return obj    # 默认是支持Lock的    if lock in (True, None):        ctx = ctx or get_context() # 获取上下文        lock = ctx.RLock()  # 递归锁属性    # 查看是否有acquire属性    if not hasattr(lock, 'acquire'):        raise AttributeError("%r has no method 'acquire'" % lock)    return synchronized(obj, lock, ctx=ctx)

扩展部分可以查看这篇文章:http://blog.51cto.com/11026142/1874807


2.服务器进程( Manager

官方文档:https://docs.python.org/3/library/multiprocessing.html#managers

有一个服务器进程负责维护所有的对象,而其他进程连接到该进程,通过代理对象操作服务器进程当中的对象

通过返回的经理 Manager()将支持类型 list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue

举个简单例子(后面还会再说):(本质其实就是 多个进程通过代理,共同操作服务端内容)

from multiprocessing import Pool, Managerdef test1(d, l):    d[1] = '1'    d['2'] = 2    d[0.25] = None    l.reverse()def test2(d, l):    print(d)    print(l)def main():    with Manager() as manager:        dict_test = manager.dict()        list_test = manager.list(range(10))        pool = Pool()        pool.apply_async(test1, args=(dict_test, list_test))        pool.apply_async(test2, args=(dict_test, list_test))        pool.close()        pool.join()if __name__ == '__main__':    main()

输出:

{1: '1', '2': 2, 0.25: None}[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络在不同计算机上的进程共享。但是,它们比使用共享内存慢(毕竟有了 “中介”

同步问题依然需要注意一下,举个例子体会一下:

from multiprocessing import Manager, Process, Lockdef test(dict1, lock):    for i in range(100):        with lock:  # 你可以把这句话注释掉,然后就知道为什么加了            dict1["year"] += 1def main():    with Manager() as m:        lock = Lock()        dict1 = m.dict({"year": 2000})        p_list = [Process(target=test, args=(dict1, lock)) for i in range(5)]        for i in p_list:            i.start()        for i in p_list:            i.join()        print(dict1)if __name__ == '__main__':    main()

扩展补充:

  1. multiprocessing.Lock是一个进程安全对象,因此您可以将其直接传递给子进程并在所有进程中安全地使用它。
  2. 大多数可变Python对象(如list,dict,大多数类)不能保证进程中安全,所以它们在进程间共享时需要使用 Manager
  3. 多进程模式的缺点是创建进程的代价大,在 Unix/Linux系统下,用 fork调用还行,在 Windows下创建进程开销巨大。

Manager这块官方文档很详细,可以看看:https://docs.python.org/3/library/multiprocessing.html#managers

WinServer的可以参考这篇 or 这篇埋坑记(Manager一般都是部署在Linux的,Win的客户端不影响)

扩展补充

还记得之前的:无法将multiprocessing.Queue对象传递给Pool方法吗?其实一般都是这两种方式解决的:

  1. 使用Manager需要生成另一个进程来托管Manager服务器。 并且所有获取/释放锁的调用都必须通过IPC发送到该服务器。
  2. 使用初始化程序在池创建时传递常规 multiprocessing.Queue()这将使 Queue实例在所有子进程中全局共享

再看一下Pool的 __init__方法:

# processes:进程数# initializer,initargs 初始化进行的操作# maxtaskperchild:每个进程执行task的最大数目# contex:上下文对象def __init__(self, processes=None, initializer=None, initargs=(),                 maxtasksperchild=None, context=None):

第一种方法不够轻量级,在讲案例前,稍微说下第二种方法:(也算把上面留下的悬念解了)

import osimport timefrom multiprocessing import Pool, Queuedef error_callback(msg):    print(msg)def pro_test1():    print("[子进程1]PPID=%d,PID=%d" % (os.getppid(), os.getpid()))    q.put("[子进程1]小明,今晚撸串不?")    # 设置一个简版的重试机制(三次重试)    for i in range(3):        if not q.empty():            print(q.get())            break        else:            time.sleep((i + 1) * 2)  # 第一次1s,第二次4s,第三次6sdef pro_test2():    print("[子进程2]PPID=%d,PID=%d" % (os.getppid(), os.getpid()))    print(q.get())    time.sleep(4)  # 模拟一下网络延迟    q.put("[子进程2]不去,我今天约了妹子")def init(queue):    global q    q = queuedef main():    print("[父进程]PPID=%d,PID=%d" % (os.getppid(), os.getpid()))    queue = Queue()    p = Pool(initializer=init, initargs=(queue, ))    p.apply_async(pro_test1, error_callback=error_callback)    p.apply_async(pro_test2, error_callback=error_callback)    p.close()    p.join()if __name__ == '__main__':    main()

输出:(就是在初始化Pool的时候,传了初始化执行的方法并传了参数alizer=init,initargs=(queue,))

[父进程]PPID=13157,PID=24864[子进程1]PPID=24864,PID=24865[子进程2]PPID=24864,PID=24866[子进程1]小明,今晚撸串不?[子进程2]不去,我今天约了妹子real    0m6.105suser    0m0.071ssys     0m0.042s

Win下亦通用(win下没有 os.getgid


1.7.分布式进程的案例

有了 1.6的基础,咱们来个例子练练:

BaseManager的缩略图:

服务器端代码:

from multiprocessing import Queuefrom multiprocessing.managers import BaseManagerdef main():    # 用来身份验证的    key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92"    get_zhang_queue = Queue()  # 小张消息队列    get_ming_queue = Queue()  # 小明消息队列    # 把Queue注册到网络上, callable参数关联了Queue对象    BaseManager.register("get_zhang_queue", callable=lambda: get_zhang_queue)    BaseManager.register("get_ming_queue", callable=lambda: get_ming_queue)    # 实例化一个Manager对象。绑定ip+端口, 设置验证秘钥    manager = BaseManager(address=("192.168.36.235", 5438), authkey=key)    # 运行serve    manager.get_server().serve_forever()if __name__ == '__main__':    main()

客户端代码1:

from multiprocessing.managers import BaseManagerdef main():    """客户端1"""    key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92"    # 注册对应方法的名字(从网络上获取Queue)    BaseManager.register("get_ming_queue")    BaseManager.register("get_zhang_queue")    # 实例化一个Manager对象。绑定ip+端口, 设置验证秘钥    m = BaseManager(address=("192.168.36.235", 5438), authkey=key)    # 连接到服务器    m.connect()    q1 = m.get_zhang_queue()  # 在自己队列里面留言    q1.put("[小张]小明,老大明天是不是去外地办事啊?")    q2 = m.get_ming_queue()  # 获取小明说的话    print(q2.get())if __name__ == '__main__':    main()

客户端代码2:

from multiprocessing.managers import BaseManagerdef main():    """客户端2"""    key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92"    # 注册对应方法的名字(从网络上获取Queue)    BaseManager.register("get_ming_queue")    BaseManager.register("get_zhang_queue")    # 实例化一个Manager对象。绑定ip+端口, 设置验证秘钥    m = BaseManager(address=("192.168.36.235", 5438), authkey=key)    # 连接到服务器    m.connect()    q1 = m.get_zhang_queue()  # 获取小张说的话    print(q1.get())    q2 = m.get_ming_queue()  # 在自己队列里面留言    q2.put("[小明]这几天咱们终于可以不加班了(>_<)")if __name__ == '__main__':    main()

输出图示:

服务器运行在Linux的测试:

其实还有一部分内容没说,明天得出去办点事,先到这吧,后面找机会继续带一下


参考文章:

进程共享的探讨:python-sharing-a-lock-between-processes

多进程锁的探讨:trouble-using-a-lock-with-multiprocessing-pool-pickling-error

JoinableQueue扩展:https://www.cnblogs.com/smallmars/p/7093603.html

Python多进程编程:https://www.cnblogs.com/kaituorensheng/p/4445418.html

有深度但需要辩证看的两篇文章:

跨进程对象共享:http://blog.ftofficer.com/2009/12/python-multiprocessing-3-about-queue

关于Queue:http://blog.ftofficer.com/2009/12/python-multiprocessing-2-object-sharing-across-process

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 用Python、NetCore、Shell分别开发一个Ubuntu版的定时提醒(附NetCore跨平台的两种发布方式)

    平时经常用定时提醒来提高工作效率,到了Linux。。。。蒙圈了,以前C#写的不能跨平台啊,于是就有了这篇文章~(有些人喜欢用番茄工作法,这个算是个福利了)

    逸鹏
  • 2.并发编程~先导篇(下)

    代码实例:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Linux/进程...

    逸鹏
  • 附加文件时候的提示“无法重新生成日志,原因是数据库关闭时存在打开的事务/用户,该数据库没有检查点或者该数据库是只读的 ”

    【SQLServer】【恢复挂起的解决方案】附加文件时候的提示“无法重新生成日志,原因是数据库关闭时存在打开的事务/用户,该数据库没有检查点或者该数据库是只读的...

    逸鹏
  • 终于让minicef把Brackets这个编辑器跑起来了

    重大突破,miniblink终于把Brackets编辑器跑起来了!!不知道Brackets的可以搜索下,现在貌似还是有人在用。之前一直没跑起,也没啥错误报出。...

    龙泉寺扫地僧
  • Python基础

    py3study
  • Linux进程及作业管理

    一、进程查看及其管理工具 ps命令:报告当前进程的快照信息 ps - report a snapshot of the current processes....

    小小科
  • redisson的MultiLock连锁

    Redis based distributed RedissonMultiLock object groups multiple RLock objects a...

    IT云清
  • Mysql-Innodb : 从一个字节到整个数据库表了解物理存储结构和逻辑存储结构

       一块原生的(Raw)物理磁盘,可以把他看成一个字节一个字节单元组成的物理存储介质

    执生
  • 重磅整理---Android进程保活组件

    导语 Android进程保活的文章很多,但是基本没有一个完整的工程化的东西。所以在这里整理主流的保活方案,将其工程化到github上供大家直接使用。 ...

    MelonTeam
  • 从CPU管理到进程的引入

    为什么要管理CPU,这是因为在“上古时代”,CPU是计算机硬件之中最昂贵的资源。因此提高CPU利用率是很有必要的。我们知道只要给CPU的PC一个地址,CPU就能...

    zy010101

扫码关注云+社区

领取腾讯云代金券