首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何通过multiprocessing.Manager().Namespace元素远程访问SyncManager?

如何通过multiprocessing.Manager().Namespace元素远程访问SyncManager?
EN

Stack Overflow用户
提问于 2022-01-02 04:22:07
回答 2查看 94关注 0票数 0

无论我多么难读文档,我都无法理解如何使用SyncManager远程访问多处理名称空间中的项。

请注意下面的代码是最短的,以演示什么是有效的,什么是失败的。我问题的症结在于对register的调用,我无法正确理解如何使用这些调用。

下面的示例包括两个部分(都位于同一个文件mp-example.py中):服务器和客户端。客户端要完成其工作,服务器部分必须运行(在另一个终端/屏幕中)。

首先,工作的代码:

代码语言:javascript
运行
复制
#!/usr/bin/env python3

import os
import sys
import multiprocessing as mp
import multiprocessing.managers as mpm
import queue
import socket
import contextlib
from contextlib import contextmanager

def get_ip():
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
        s.connect(('192.168.17.1', 80))
        return s.getsockname()[0]

def get_free_port():
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
        s.bind(('', 0))
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return s.getsockname()[1]

def startserver(name):
    # Find a free port for the SyncManager
    newportnumber = get_free_port()
    Server = {'address': (get_ip(), newportnumber), 'authkey': b'mptest'}

    # Save data to file
    serverdata_filename = f'mpSERVERDATA.txt'
    with open(serverdata_filename, 'wt', encoding="utf-8") as serverdata_file:
        print(Server, file=serverdata_file)
        print(Server)

    # On the server, we do not need our own IP address
    Server['address'] = ('', newportnumber)

    # Set up server:
    #   define a multiprocessing manager
    #   define a multipprocessing namespace
    #   define a Value item in the namespace
    #   define a Value item outside the namespace
    #   define a multiprocessing syncmanager
    #   start the syncmanager
    #   register both the namespace and the value outside the namespace
    #   set values
    #   initialise a different process
    #   print the values
    #   run the other process & wait for completion
    #   print the values
    #   read standard input & print the values (waiting for a remote process connection)

    m = mp.Manager()
    m.name = 'mp.Manager: ' + name

    with m:
        ns = m.Namespace()
        ns.name = 'm.Namespace: ' + name
        ns.yv = m.Value(int, 0)
        yv = m.Value(int, 0)

        bm = mpm.SyncManager(**Server)
        bm.name = 'mpm.SyncManager: ' + name

        #bm.register('ns', callable=lambda: ns, proxytype=mpm.NamespaceProxy,
        #            exposed=('__getattribute__', '__setattr__', '__delattr__', 'yv')
        #           )
        # The above has not made any difference
        bm.register('ns', callable=lambda: ns, proxytype=mpm.NamespaceProxy)
        bm.register('yv', callable=lambda: yv, proxytype=mpm.ValueProxy)

        with bm:
            yv.value = 99
            ns.yv = 999

            def f(n, y):
                n.yv += 222
                y.value += 33

            proc = mp.Process(target=f, args=(ns, yv))
            print(f'yv={yv}, ns.yv={ns.yv}')
            proc.start()
            proc.join()
            proc.terminate()
            print(f'yv={yv}, ns.yv={ns.yv}, ns={ns}')
            print('Waiting for input (waiting for client), press CTRL-D to end')
            for line in sys.stdin:
                print(f'yv={yv}, ns.yv={ns.yv}, ns={ns}')
                print('Waiting for input (waiting for client), press CTRL-D to end')

elif sys.argv[1] == 'server' and len(sys.argv) == 3:
    startserver(sys.argv[2])
elif sys.argv[1] == 'client' and len(sys.argv) == 3:
    startclient(sys.argv[2])

上面的代码实现了我自己运行时的预期:

代码语言:javascript
运行
复制
> python3 mp-example.py server test
{'address': ('192.168.17.10', 50793), 'authkey': b'mptest'}
yv=Value(<class 'int'>, 99), ns.yv=999
yv=Value(<class 'int'>, 132), ns.yv=1221, ns=Namespace(name='m.Namespace: test', yv=1221)
Waiting for input (waiting for client), press CTRL-D to end

接下来,不工作的bit:

下面的代码工作到一定程度,但在尝试访问名称空间中的值项yv时失败了。

代码:

代码语言:javascript
运行
复制
def startclient(name):
    # retrieve server identification from file
    serverdata_filename = f'mpSERVERDATA.txt'
    with open(serverdata_filename, 'rt', encoding="utf-8") as serverdata_file:
        Server = eval(serverdata_file.read())
        print(Server)

    bm = mpm.SyncManager(**Server)
    bm.register('yv', proxytype=mpm.ValueProxy)
    bm.register('ns', proxytype=mpm.NamespaceProxy)
    bm.connect()

    yv = bm.yv()
    print(f'yv={yv.value}')
    yv.value += 55
    print(f'yv={yv.value}')

    ns = bm.ns()
    print(f'ns={ns}')
    print(f'ns.yv={ns.yv}')
    ns.yv += 666
    print(f'ns.yv={ns.yv}')

输出:

代码语言:javascript
运行
复制
> python3 mp-example.py client test
{'address': ('192.168.17.10', 50793), 'authkey': b'mptest'}
yv=627
yv=682
['_Client', '__class__', '__deepcopy__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattr__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_address_to_local', '_after_fork', '_authkey', '_callmethod', '_close', '_connect', '_decref', '_exposed_', '_getvalue', '_id', '_idset', '_incref', '_manager', '_mutex', '_owned_by_manager', '_serializer', '_tls', '_token']
Help on method _getvalue in module multiprocessing.managers:

_getvalue() method of multiprocessing.managers.NamespaceProxy instance
    Get a copy of the value of the referent

None
ns=<NamespaceProxy object, typeid 'Namespace' at 0x7f5d05689a20>
Traceback (most recent call last):
  File "mp-example.py", line 333, in <module>
    startclient(sys.argv[2])
  File "mp-example.py", line 282, in startclient
    print(f'ns.yv={ns.yv}')
  File "/usr/lib/python3.6/multiprocessing/managers.py", line 1060, in __getattr__
    return callmethod('__getattribute__', (key,))
  File "/usr/lib/python3.6/multiprocessing/managers.py", line 772, in _callmethod
    raise convert_to_error(kind, result)
AttributeError: 'NamespaceProxy' object has no attribute 'yv'

问题:

要访问客户机中的ns.yv,我需要做什么?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-01-06 08:53:54

编辑-综合解决方案:

我通过编写一个新的RemoteSyncManager类来封装multiprocessing.managers.SyncManager()实例来解决我的大部分挫折。

我的这个库确实简化了SyncManager在本地和远程进程中的使用。

请转到我的GitHub存储库PythonLibraries-Multiprocessing-RemoteSyncManager了解详细信息。

我的第一个解决办法是:

最后,我成功地设置了一个托管多处理名称空间,该名称空间与远程连接同步。

但是这个设置拒绝与管理的dict一起工作,它的工作方式类似于Booboo的建议,上面。

有没有任何方法可以在远程共享的名称空间中定义和保持整个dict的同步,而不必在每次操作中重新分配dict?

代码:

代码语言:javascript
运行
复制
if __name__ == '__main__':
    if sys.argv[1] == 'Server' and len(sys.argv) == 2:
        #localmanager = mp.Manager()
        #with localmanager:
        q1 = mp.Queue(1)
        q2 = mp.Queue(1)
        localnamespace = mpm.Namespace()
    
        class syncmanager(mpm.SyncManager):
            pass
        syncmanager.register('get_q1', callable=lambda: q1)
        syncmanager.register('get_q2', callable=lambda: q2)
        syncmanager.register('get_ns', callable=lambda: localnamespace, proxytype=mpm.NamespaceProxy)
    
        remotemanager = syncmanager(address = ('', 56789), authkey = b'mypassword')
        remotemanager.start()
    
        with remotemanager:
            ns = remotemanager.get_ns()
            #ns.sampledict = remotemanager.dict()
            ns.sampledict = {}
            #ns.sampledict = localmanager.dict()
            ns.sampledict['testabc'] = 1234567
            ns.sampledict['TESTABC'] = 9876
            d = {}
            d['testabc'] = 1234567
            d['TESTABC'] = 9876
            ns.sampledict2 = d
    
            print(f'Server ns = {ns}')
    
            print(q2.get())
            q1.put(f'Server ns = {ns}')
    
            ns.sampledict['testabc'] = 1
            ns.sampleserverint = 98789
            print(f'Server ns = {ns}')
    
            print(q2.get())
            q1.put(f'Server ns = {ns}')
    
    elif sys.argv[1] == 'Client' and len(sys.argv) == 2:
        class syncmanager(mpm.SyncManager):
            pass
        syncmanager.register('get_q1')
        syncmanager.register('get_q2')
        syncmanager.register('get_ns', proxytype=mpm.NamespaceProxy)
    
        remotemanager = syncmanager(address = ('', 56789), authkey = b'mypassword')
        remotemanager.connect()
    
        q1 = remotemanager.get_q1()
        q2 = remotemanager.get_q2()
        ns = remotemanager.get_ns()
    
        q2.put('ready')
        msg = q1.get()
        print(msg)
        print(f'Client ns = {ns}')
    
        ns.sampledict['TESTABC'] = 9
        ns.sampleclientint = 7887
    
        q2.put('next.1')
        msg = q1.get()
        print(msg)
        print(f'Client ns = {ns}')
票数 0
EN

Stack Overflow用户

发布于 2022-01-03 19:04:23

我也很努力地解决这个问题--很难找到你所遵循的解决方案,它既不复杂,也不低效。顺便说一句,您正在尝试将代理返回到共享内存值,该值已经可以共享,而不需要代理,而且如果您的目标只是提供单个值,则不需要将它放在共享内存中。

一种更简单的方法,它不要求您创建两个管理器进程,这样您就可以创建一个代理对象,然后在启动它之前将其注册到另一个管理器中,这就是定义您自己的自定义类。您通常会创建您自己的multiprocessing.managers.BaseManager子类,您将使用它注册新的托管类(当然,如果您的客户端还需要已经在SyncManager中注册的类,例如一个dict,那么您将使用它的子类)。

下面的程序演示如何创建单例Namespace等效类型。我注释掉了您的IP地址,并使用127.0.0.1进行测试。我还把你给eval的电话换成了一个更安全的替代品。

代码语言:javascript
运行
复制
#!/usr/bin/env python3

import os
import sys
import multiprocessing as mp
from multiprocessing.managers import BaseManager, NamespaceProxy
import socket
import contextlib
from ast import literal_eval
from threading import Thread


def get_ip():
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
        #s.connect(('192.168.17.1', 80))
        s.connect(('127.0.0.1', 80))
        return s.getsockname()[0]

def get_free_port():
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
        s.bind(('', 0))
        s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        return s.getsockname()[1]

class MyNamespaceManager(BaseManager):
    pass

class MyNamespace:
    pass

# Singleton:
singleton = MyNamespace()

def get_singleton():
    return singleton

def run_server(server):
    server.serve_forever()

def startserver():
    # Find a free port for the SyncManager
    newportnumber = get_free_port()
    Server = {'address': (get_ip(), newportnumber), 'authkey': b'mptest'}

    # Save data to file
    serverdata_filename = f'mpSERVERDATA.txt'
    with open(serverdata_filename, 'wt', encoding="utf-8") as serverdata_file:
        print(Server, file=serverdata_file)
        print(Server)

    # On the server, we do not need our own IP address
    Server['address'] = ('', newportnumber)

    # Singleton
    MyNamespaceManager.register('MyNamespace', callable=get_singleton, proxytype=NamespaceProxy)

    manager = MyNamespaceManager(**Server)
    """
    manager.start()
    input('Hit enter to end: ')
    manager.shutdown()
    """
    server = manager.get_server()
    Thread(target=run_server, args=(server,)).start()
    try:
        input('Hit enter to end: ')
    except KeyboardInterrupt:
        pass
    os.unlink(serverdata_filename)
    server.stop_event.set() # The server stops and calls sys.exit()

def startclient():
    # retrieve server identification from file
    serverdata_filename = f'mpSERVERDATA.txt'
    with open(serverdata_filename, 'rt', encoding="utf-8") as serverdata_file:
        Server = literal_eval(serverdata_file.read())
        print(Server)

    MyNamespaceManager.register('MyNamespace', proxytype=NamespaceProxy)
    manager = MyNamespaceManager(**Server)
    manager.connect()

    yv1 = manager.MyNamespace()
    yv1.value = 99
    yv1.value += 55
    yv1.x = 9

    yv2 = manager.MyNamespace()
    print(f'yv2.value={yv2.value}')
    print(f'yv2.x={yv2.x}')


if __name__ == '__main__':

    if len(sys.argv) == 2 and sys.argv[1] == 'server':
        startserver()
    else:
        startclient()

客户打印:

代码语言:javascript
运行
复制
{'address': ('127.0.0.1', 51211), 'authkey': b'mptest'}
yv2.value=154
yv2.x=9
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70553591

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档