首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python3 与 C# 并发编程之~ 进程实战篇

Python3 与 C# 并发编程之~ 进程实战篇

原创
作者头像
逸鹏
发布2018-09-07 08:31:13
8840
发布2018-09-07 08:31:13
举报
文章被收录于专栏:逸鹏说道逸鹏说道

1.6.进程间状态共享

应该尽量避免进程间状态共享,但需求在那,所以还是得研究,官方推荐了两种方式:

1.共享内存( Value or Array

之前说过 Queue:在 Process之间使用没问题,用到 Pool,就使用 Manager().xxxValueArray,就不太一样了:

看看源码:(Manager里面的Array和Process共享的Array不是一个概念,而且也没有同步机制)

# https://github.com/lotapp/cpython3/blob/master/Lib/multiprocessing/managers.pyclass Value(object):    def __init__(self, typecode, value, lock=True):        self._typecode = typecode        self._value = value    def get(self):        return self._value    def set(self, value):        self._value = value    def __repr__(self):        return '%s(%r, %r)' % (type(self).__name__, self._typecode, self._value)    value = property(get, set) # 给value设置get和set方法(和value的属性装饰器一样效果)def Array(typecode, sequence, lock=True):    return array.array(typecode, sequence)

Process为例看看怎么用:

from multiprocessing import Process, Value, Arraydef proc_test1(value, array):    print("子进程1", value.value)    array[0] = 10    print("子进程1", array[:])def proc_test2(value, array):    print("子进程2", value.value)    array[1] = 10    print("子进程2", array[:])def main():    try:        value = Value("d", 3.14)  # d 类型,相当于C里面的double        array = Array("i", range(10))  # i 类型,相当于C里面的int        print(type(value))        print(type(array))        p1 = Process(target=proc_test1, args=(value, array))        p2 = Process(target=proc_test2, args=(value, array))        p1.start()        p2.start()        p1.join()        p2.join()        print("父进程", value.value)  # 获取值        print("父进程", array[:])  # 获取值    except Exception as ex:        print(ex)    else:        print("No Except")if __name__ == '__main__':    main()

输出:( ValueArray进程|线程安全的)

<class 'multiprocessing.sharedctypes.Synchronized'><class 'multiprocessing.sharedctypes.SynchronizedArray'>子进程1 3.14子进程1 [10, 1, 2, 3, 4, 5, 6, 7, 8, 9]子进程2 3.14子进程2 [10, 10, 2, 3, 4, 5, 6, 7, 8, 9]父进程 3.14父进程 [10, 10, 2, 3, 4, 5, 6, 7, 8, 9]No Except

类型方面的对应关系:

typecode_to_type = {    'c': ctypes.c_char,    'u': ctypes.c_wchar,    'b': ctypes.c_byte,    'B': ctypes.c_ubyte,    'h': ctypes.c_short,    'H': ctypes.c_ushort,    'i': ctypes.c_int,    'I': ctypes.c_uint,    'l': ctypes.c_long,    'L': ctypes.c_ulong,    'q': ctypes.c_longlong,    'Q': ctypes.c_ulonglong,    'f': ctypes.c_float,    'd': ctypes.c_double}

这两个类型其实是 ctypes类型,更多的类型可以去 multiprocessing.sharedctypes查看,来张图:

4.ctypes.png
4.ctypes.png

回头解决 GIL的时候会用到 C系列或者 Go系列的共享库(讲线程的时候会说)


关于进程安全的补充说明:对于原子性操作就不用说,铁定安全,但注意一下 i+=1并不是原子性操作:

from multiprocessing import Process, Valuedef proc_test1(value):    for i in range(1000):        value.value += 1def main():    value = Value("i", 0)    p_list = [Process(target=proc_test1, args=(value, )) for i in range(5)]    # 批量启动    for i in p_list:        i.start()    # 批量资源回收    for i in p_list:        i.join()    print(value.value)if __name__ == '__main__':    main()

输出:(理论上应该是:5×1000=5000)

2153

稍微改一下才行:(进程安全:只是提供了安全的方法,并不是什么都不用你操心了

# 通用方法def proc_test1(value):    for i in range(1000):        if value.acquire():            value.value += 1        value.release()# 官方案例:(Lock可以使用with托管)def proc_test1(value):    for i in range(1000):        with value.get_lock():            value.value += 1# 更多可以查看:`sharedctypes.SynchronizedBase` 源码

输出:(关于锁这块,后面讲线程的时候会详说,看看就好【语法的确比C#麻烦点】)

5000

看看源码:(之前探讨如何优雅的杀死子进程,其中就有一种方法使用了 Value

def Value(typecode_or_type, *args, lock=True, ctx=None):    '''返回Value的同步包装器'''    obj = RawValue(typecode_or_type, *args)    if lock is False:        return obj    # 默认支持Lock    if lock in (True, None):        ctx = ctx or get_context() # 获取上下文        lock = ctx.RLock() # 获取递归锁    if not hasattr(lock, 'acquire'):         raise AttributeError("%r has no method 'acquire'" % lock)    # 一系列处理    return synchronized(obj, lock, ctx=ctx)def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None):    '''返回RawArray的同步包装器'''    obj = RawArray(typecode_or_type, size_or_initializer)    if lock is False:        return obj    # 默认是支持Lock的    if lock in (True, None):        ctx = ctx or get_context() # 获取上下文        lock = ctx.RLock()  # 递归锁属性    # 查看是否有acquire属性    if not hasattr(lock, 'acquire'):        raise AttributeError("%r has no method 'acquire'" % lock)    return synchronized(obj, lock, ctx=ctx)

扩展部分可以查看这篇文章:http://blog.51cto.com/11026142/1874807


2.服务器进程( Manager

官方文档:https://docs.python.org/3/library/multiprocessing.html#managers

有一个服务器进程负责维护所有的对象,而其他进程连接到该进程,通过代理对象操作服务器进程当中的对象

通过返回的经理 Manager()将支持类型 list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue

举个简单例子(后面还会再说):(本质其实就是 多个进程通过代理,共同操作服务端内容)

from multiprocessing import Pool, Managerdef test1(d, l):    d[1] = '1'    d['2'] = 2    d[0.25] = None    l.reverse()def test2(d, l):    print(d)    print(l)def main():    with Manager() as manager:        dict_test = manager.dict()        list_test = manager.list(range(10))        pool = Pool()        pool.apply_async(test1, args=(dict_test, list_test))        pool.apply_async(test2, args=(dict_test, list_test))        pool.close()        pool.join()if __name__ == '__main__':    main()

输出:

{1: '1', '2': 2, 0.25: None}[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络在不同计算机上的进程共享。但是,它们比使用共享内存慢(毕竟有了 “中介”

同步问题依然需要注意一下,举个例子体会一下:

from multiprocessing import Manager, Process, Lockdef test(dict1, lock):    for i in range(100):        with lock:  # 你可以把这句话注释掉,然后就知道为什么加了            dict1["year"] += 1def main():    with Manager() as m:        lock = Lock()        dict1 = m.dict({"year": 2000})        p_list = [Process(target=test, args=(dict1, lock)) for i in range(5)]        for i in p_list:            i.start()        for i in p_list:            i.join()        print(dict1)if __name__ == '__main__':    main()

扩展补充:

  1. multiprocessing.Lock是一个进程安全对象,因此您可以将其直接传递给子进程并在所有进程中安全地使用它。
  2. 大多数可变Python对象(如list,dict,大多数类)不能保证进程中安全,所以它们在进程间共享时需要使用 Manager
  3. 多进程模式的缺点是创建进程的代价大,在 Unix/Linux系统下,用 fork调用还行,在 Windows下创建进程开销巨大。

Manager这块官方文档很详细,可以看看:https://docs.python.org/3/library/multiprocessing.html#managers

WinServer的可以参考这篇 or 这篇埋坑记(Manager一般都是部署在Linux的,Win的客户端不影响)

扩展补充

还记得之前的:无法将multiprocessing.Queue对象传递给Pool方法吗?其实一般都是这两种方式解决的:

  1. 使用Manager需要生成另一个进程来托管Manager服务器。 并且所有获取/释放锁的调用都必须通过IPC发送到该服务器。
  2. 使用初始化程序在池创建时传递常规 multiprocessing.Queue()这将使 Queue实例在所有子进程中全局共享

再看一下Pool的 __init__方法:

# processes:进程数# initializer,initargs 初始化进行的操作# maxtaskperchild:每个进程执行task的最大数目# contex:上下文对象def __init__(self, processes=None, initializer=None, initargs=(),                 maxtasksperchild=None, context=None):

第一种方法不够轻量级,在讲案例前,稍微说下第二种方法:(也算把上面留下的悬念解了)

import osimport timefrom multiprocessing import Pool, Queuedef error_callback(msg):    print(msg)def pro_test1():    print("[子进程1]PPID=%d,PID=%d" % (os.getppid(), os.getpid()))    q.put("[子进程1]小明,今晚撸串不?")    # 设置一个简版的重试机制(三次重试)    for i in range(3):        if not q.empty():            print(q.get())            break        else:            time.sleep((i + 1) * 2)  # 第一次1s,第二次4s,第三次6sdef pro_test2():    print("[子进程2]PPID=%d,PID=%d" % (os.getppid(), os.getpid()))    print(q.get())    time.sleep(4)  # 模拟一下网络延迟    q.put("[子进程2]不去,我今天约了妹子")def init(queue):    global q    q = queuedef main():    print("[父进程]PPID=%d,PID=%d" % (os.getppid(), os.getpid()))    queue = Queue()    p = Pool(initializer=init, initargs=(queue, ))    p.apply_async(pro_test1, error_callback=error_callback)    p.apply_async(pro_test2, error_callback=error_callback)    p.close()    p.join()if __name__ == '__main__':    main()

输出:(就是在初始化Pool的时候,传了初始化执行的方法并传了参数alizer=init,initargs=(queue,))

[父进程]PPID=13157,PID=24864[子进程1]PPID=24864,PID=24865[子进程2]PPID=24864,PID=24866[子进程1]小明,今晚撸串不?[子进程2]不去,我今天约了妹子real    0m6.105suser    0m0.071ssys     0m0.042s

Win下亦通用(win下没有 os.getgid

5.win.png
5.win.png

1.7.分布式进程的案例

有了 1.6的基础,咱们来个例子练练:

BaseManager的缩略图:

6.缩略.png
6.缩略.png

服务器端代码:

from multiprocessing import Queuefrom multiprocessing.managers import BaseManagerdef main():    # 用来身份验证的    key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92"    get_zhang_queue = Queue()  # 小张消息队列    get_ming_queue = Queue()  # 小明消息队列    # 把Queue注册到网络上, callable参数关联了Queue对象    BaseManager.register("get_zhang_queue", callable=lambda: get_zhang_queue)    BaseManager.register("get_ming_queue", callable=lambda: get_ming_queue)    # 实例化一个Manager对象。绑定ip+端口, 设置验证秘钥    manager = BaseManager(address=("192.168.36.235", 5438), authkey=key)    # 运行serve    manager.get_server().serve_forever()if __name__ == '__main__':    main()

客户端代码1:

from multiprocessing.managers import BaseManagerdef main():    """客户端1"""    key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92"    # 注册对应方法的名字(从网络上获取Queue)    BaseManager.register("get_ming_queue")    BaseManager.register("get_zhang_queue")    # 实例化一个Manager对象。绑定ip+端口, 设置验证秘钥    m = BaseManager(address=("192.168.36.235", 5438), authkey=key)    # 连接到服务器    m.connect()    q1 = m.get_zhang_queue()  # 在自己队列里面留言    q1.put("[小张]小明,老大明天是不是去外地办事啊?")    q2 = m.get_ming_queue()  # 获取小明说的话    print(q2.get())if __name__ == '__main__':    main()

客户端代码2:

from multiprocessing.managers import BaseManagerdef main():    """客户端2"""    key = b"8d969eef6ecad3c29a3a629280e686cf0c3f5d5a86aff3ca12020c923adc6c92"    # 注册对应方法的名字(从网络上获取Queue)    BaseManager.register("get_ming_queue")    BaseManager.register("get_zhang_queue")    # 实例化一个Manager对象。绑定ip+端口, 设置验证秘钥    m = BaseManager(address=("192.168.36.235", 5438), authkey=key)    # 连接到服务器    m.connect()    q1 = m.get_zhang_queue()  # 获取小张说的话    print(q1.get())    q2 = m.get_ming_queue()  # 在自己队列里面留言    q2.put("[小明]这几天咱们终于可以不加班了(>_<)")if __name__ == '__main__':    main()

输出图示:

7.manager.gif
7.manager.gif

服务器运行在Linux的测试:

8.win.png
8.win.png

其实还有一部分内容没说,明天得出去办点事,先到这吧,后面找机会继续带一下


参考文章:

进程共享的探讨:python-sharing-a-lock-between-processes

多进程锁的探讨:trouble-using-a-lock-with-multiprocessing-pool-pickling-error

JoinableQueue扩展:https://www.cnblogs.com/smallmars/p/7093603.html

Python多进程编程:https://www.cnblogs.com/kaituorensheng/p/4445418.html

有深度但需要辩证看的两篇文章:

跨进程对象共享:http://blog.ftofficer.com/2009/12/python-multiprocessing-3-about-queue

关于Queue:http://blog.ftofficer.com/2009/12/python-multiprocessing-2-object-sharing-across-process

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.6.进程间状态共享
    • 1.共享内存( Value or Array)
      • 2.服务器进程( Manager)
        • 扩展补充
        • 1.7.分布式进程的案例
        相关产品与服务
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档