前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python Windows下分布式进程的坑(分布式进程的一个简单例子)

Python Windows下分布式进程的坑(分布式进程的一个简单例子)

作者头像
Steve Wang
发布2018-02-05 17:41:20
2.1K0
发布2018-02-05 17:41:20
举报
文章被收录于专栏:从流域到海域从流域到海域

下面这个例子基于”廖雪峰的Python教程:分布式进程”原例在Linux上运行,直接在Windows上运行会出现错误,下面是针对原例进行的改进,使之能成功运行。 https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431929340191970154d52b9d484b88a7b343708fcc60000#0 博主也对代码注释作了更精确的改进。 原例在Windows下你会遇到的问题:

_pickle.PicklingError: Can't pickle <function <lambda> at 0x000001940C172EA0>: attribute lookup <lambda> on __main__ failed

#原因 Win下callable不能以 lambda表达式赋值
This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module:
#需要使用Python常用的 if __name__ == '__main__':来进行是不是主module的判断
[WinError 10061] No connection could be made because the target machine actively refused it
#使用的主机地址和端口号有错误 需要修正 windows下address不能为空
OSError: [WinError 10049] The requested address is not valid in its context
#使用的主机地址和端口号有错误 需要修正 发送QueueManager 和 接收QueueManager没有使用相同的地址或端口号
        This probably means that you are on Windows and you have
        forgotten to use the proper idiom in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce a Windows executable.
        #windows 运行分布式进程需要先启动 freeze_support()

        #freeze_support()"冻结"为时生成 Windows 可执行文件
        #原因是Windows没有直接的fork()
        #Window是通过创建一个新的过程代码,在子进程运行来模拟fork() 
        #由于代码是在技术无关的进程中运行的,所以它必须在运行之前交付
        #它传递的方式首先是被pickle,然后通过管道从原始进程发送到新进程
        #另外,这个新进程被告知它必须运行通过管道传递的代码通过传递
        #freeze_support() 函数的任务是检查它正在运行的进程是否应该通过管道或不运行代码。
Traceback (most recent call last):
  File ...
    raise convert_to_error(kind, result)
queue.Empty
#task_worker开始工作时 task_master还没有在队列里添加任务

正确而完整的改进例子

#task_master.py
import random, time, queue
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support

#send queue 发送队列
task_queue = queue.Queue()
#receiver queue 接收队列
result_queue = queue.Queue()

class QueueManager(BaseManager):
    pass

#注册2个queue到网络上 使用callable和匿名函数关联了Queue对象
'''仅适用Linux Windows下callable不能使用lambda表达式赋值
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
'''
def return_task_queue():
    global task_queue
    return task_queue
def return_result_queue():
    global result_queue
    return result_queue

def runf():
    QueueManager.register('get_task_queue', callable=return_task_queue)
    QueueManager.register('get_result_queue', callable=return_result_queue)
    #绑定端口5000,设置验证密码'abc'
    manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc') 
    #Linux下address留空等于本机 Windows下不能留空 127.0.0.0即本机的地址
    #启动Queue
    manager.start()
    #通过网络获取Queue对象
    task = manager.get_task_queue()
    result = manager.get_result_queue()
    #开启示例任务
    for i in range(10):
        n = random.randint(0, 10000)
        print('Put task %d to run...' %n)
        task.put(n)
    #读取任务结果
    print('Try to get results...')
    for i in range(10):
        r = result.get(timeout=10)
        print('Results: %s' %r)
    manager.shutdown()
    print('master has been shoutdown')

if __name__ == '__main__':
    freeze_support()
    runf()
#task_worker.py
import time, sys, queue
from multiprocessing.managers import BaseManager
#创建QueueManager
class QueueManager(BaseManager):
    pass
#该Manager从网络上获取Queue 因此只使用名字注册
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
#连接到运行task_master的机器 Server (127.0.0.0是本机地址 使用时应改掉)
server_addr = '127.0.0.1'
print('Try to connect to server %s...' %server_addr)
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
m.connect()
#获取Queue对象
task = m.get_task_queue()
result = m.get_result_queue()
#从task_queue获取任务 记过写入result_queue
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('we get %s' %n)
        print('now we calculate %s * %s' % (n, n))
        r = n * n
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task_queue is empty, all work have done')

print('worker quit')

在2个CMD窗口下同时运行2个脚本(间隔不能超过10秒),结果如下:

C:\Users\hongze>python task_master.py
Put task 5495 to run...
Put task 1701 to run...
Put task 7110 to run...
Put task 5385 to run...
Put task 6355 to run...
Put task 7144 to run...
Put task 5190 to run...
Put task 6259 to run...
Put task 5349 to run...
Put task 2931 to run...
Try to get results...
Results: 30195025
Results: 2893401
Results: 50552100
Results: 28998225
Results: 40386025
Results: 51036736
Results: 26936100
Results: 39175081
Results: 28611801
Results: 8590761
master has been shoutdown

C:\Users\hongze>

C:\Users\hongze>python task_worker.py
Try to connect to server 127.0.0.1...
we get 5495
now we calculate 5495 * 5495
we get 1701
now we calculate 1701 * 1701
we get 7110
now we calculate 7110 * 7110
we get 5385
now we calculate 5385 * 5385
we get 6355
now we calculate 6355 * 6355
we get 7144
now we calculate 7144 * 7144
we get 5190
now we calculate 5190 * 5190
we get 6259
now we calculate 6259 * 6259
we get 5349
now we calculate 5349 * 5349
we get 2931
now we calculate 2931 * 2931
worker quit

C:\Users\hongze>
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017年12月27日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 正确而完整的改进例子
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档