前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >浅谈 python multiprocessing(多进程)下如何共享变量

浅谈 python multiprocessing(多进程)下如何共享变量

作者头像
用户1177713
发布2018-02-24 16:28:18
3.4K0
发布2018-02-24 16:28:18
举报
文章被收录于专栏:数据之美数据之美

1、问题:

群中有同学贴了如下一段代码,问为何 list 最后打印的是空值?

代码语言:javascript
复制
from multiprocessing import Process, Manager
import os

manager = Manager()
vip_list = []
#vip_list = manager.list()

def testFunc(cc):
    vip_list.append(cc)
    print 'process id:', os.getpid()

if __name__ == '__main__':
    threads = []

    for ll in range(10):
        t = Process(target=testFunc, args=(ll,))
        t.daemon = True
        threads.append(t)

    for i in range(len(threads)):
        threads[i].start()

    for j in range(len(threads)):
        threads[j].join()

    print "------------------------"
    print 'process id:', os.getpid()
    print vip_list

其实如果你了解 python 的多线程模型,GIL 问题,然后了解多线程、多进程原理,上述问题不难回答,不过如果你不知道也没关系,跑一下上面的代码你就知道是什么问题了。

代码语言:javascript
复制
python aa.py
process id: 632
process id: 635
process id: 637
process id: 633
process id: 636
process id: 634
process id: 639
process id: 638
process id: 641
process id: 640
------------------------
process id: 619
[]

将第 6 行注释开启,你会看到如下结果:

代码语言:javascript
复制
process id: 32074
process id: 32073
process id: 32072
process id: 32078
process id: 32076
process id: 32071
process id: 32077
process id: 32079
process id: 32075
process id: 32080
------------------------
process id: 32066
[3, 2, 1, 7, 5, 0, 6, 8, 4, 9]

2、python 多进程共享变量的几种方式:

(1)Shared memory:

Data can be stored in a shared memory map using Value or Array. For example, the following code

http://docs.python.org/2/library/multiprocessing.html#sharing-state-between-processes

代码语言:javascript
复制
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print num.value
    print arr[:]

结果:

代码语言:javascript
复制
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
(2)Server process:

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies. A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value and Array. 代码见开头的例子。

http://docs.python.org/2/library/multiprocessing.html#managers

3、多进程的问题远不止这么多:数据的同步

看段简单的代码:一个简单的计数器:

代码语言:javascript
复制
from multiprocessing import Process, Manager
import os

manager = Manager()
sum = manager.Value('tmp', 0)

def testFunc(cc):
    sum.value += cc

if __name__ == '__main__':
    threads = []

    for ll in range(100):
        t = Process(target=testFunc, args=(1,))
        t.daemon = True
        threads.append(t)

    for i in range(len(threads)):
        threads[i].start()

    for j in range(len(threads)):
        threads[j].join()

    print "------------------------"
    print 'process id:', os.getpid()
    print sum.value

结果:

代码语言:javascript
复制
------------------------
process id: 17378
97

也许你会问:WTF?其实这个问题在多线程时代就存在了,只是在多进程时代又杯具重演了而已:Lock!

代码语言:javascript
复制
from multiprocessing import Process, Manager, Lock
import os

lock = Lock()
manager = Manager()
sum = manager.Value('tmp', 0)


def testFunc(cc, lock):
    with lock:
        sum.value += cc


if __name__ == '__main__':
    threads = []

    for ll in range(100):
        t = Process(target=testFunc, args=(1, lock))
        t.daemon = True
        threads.append(t)

    for i in range(len(threads)):
        threads[i].start()

    for j in range(len(threads)):
        threads[j].join()

    print "------------------------"
    print 'process id:', os.getpid()
    print sum.value

这段代码性能如何呢?跑跑看,或者加大循环次数试一下。。。

再来看个多进程共享变量的例子:该脚本可以在集群中批量执行任意命令并返回结果。

代码语言:javascript
复制
#!/usr/bin/env python
# coding=utf-8
import sys

reload(sys)
sys.setdefaultencoding('utf-8')

import rpyc
from pyUtil import *
from multiprocessing import Pool as ProcessPool
from multiprocessing import Manager

hostDict = {
    '192.168.1.10': 11111
}

manager = Manager()
localResultDict = manager.dict()


def rpc_client(host_port_cmd):
    host = host_port_cmd[0]
    port = host_port_cmd[1]
    cmd = host_port_cmd[2]
    c = rpyc.connect(host, port)
    result = c.root.exposed_execCmd(cmd)
    localResultDict[host] = result
    c.close()


def exec_cmd(cmd_str):
    host_port_list = []
    for (host, port) in hostDict.items():
        host_port_list.append((host, port, cmd_str))

    pool = ProcessPool(len(hostDict))
    results = pool.map(rpc_client, host_port_list)
    pool.close()
    pool.join()
    for ip in localResultDict.keys():
        print ip + ":\t" + localResultDict[ip]

if __name__ == "__main__":

    if len(sys.argv) == 2 and sys.argv[1] != "-h":
        print "======================"
        print "    Your command is:\t" + sys.argv[1]
        print "======================"
        cmd_str = sys.argv[1]
    else:
        print "Cmd Error!"
        sys.exit(1)

    exec_cmd(cmd_str)

需要注意的是 manager.dict() 在遍历时一定要使用 .keys() 方法,否则会抛异常:

代码语言:javascript
复制
Traceback (most recent call last):
  File "test.py", line 83, in <module>
    exec_cmd(cmd_str)
  File "test.py", line 57, in exec_cmd
    for ip in localResultDict:
  File "<string>", line 2, in __getitem__
  File "/opt/soft/python-2.7.10/lib/python2.7/multiprocessing/managers.py", line 774, in _callmethod
    raise convert_to_error(kind, result)
KeyError: 0

4、最后的建议:

Note that usually sharing data between processes may not be the best choice, because of all the synchronization issues; an approach involving actors exchanging messages is usually seen as a better choice. See also Python documentation: As mentioned above, when doing concurrent programming it is usually best to avoid using shared state as far as possible. This is particularly true when using multiple processes. However, if you really do need to use some shared data then multiprocessing provides a couple of ways of doing so.

5、Refer:

[0] 理解Python并发编程一篇就够了 - 线程篇

http://www.dongwm.com/archives/%E4%BD%BF%E7%94%A8Python%E8%BF%9B%E8%A1%8C%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B-%E7%BA%BF%E7%A8%8B%E7%AF%87/

[1] multiprocessing — Process-based “threading” interface

https://docs.python.org/2/library/multiprocessing.html

[2] Manager()出错,不知道为什么

http://m.newsmth.net/article/Python/100915

[3] Can't iterate over multiprocessing.managers.DictProxy

http://bugs.python.org/issue9733

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、问题:
  • 2、python 多进程共享变量的几种方式:
    • (1)Shared memory:
      • (2)Server process:
      • 3、多进程的问题远不止这么多:数据的同步
      • 4、最后的建议:
      • 5、Refer:
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档