专栏首页YG小书屋python 进程池异步调用与进程间通信

python 进程池异步调用与进程间通信

1、类包含不能序列化的属性时,多进程异步执行失败

import multiprocessing
import os
import random
import sys

class A:
    pool = None

    def __init__(self):
        self.pool = multiprocessing.Pool(3)
    def execute(self,dirs):
        pid=0
        try:
            fn = "log"+dirs.split('\\')[-1]
            fn = open(fn, "w")
            print("aa", file=fn)
            print("start to exec..." + str(pid), file=fn)

        except Exception as e:
            import logging
            logging.exception(e)
            print("error:%s"%e)
        return pid

    def _callback(self,pid):
        print(str(os.getpid())+"   "+str(pid))

    def start(self):
        
        print(str(os.getpid()))
        dirs = ["D:\Python2to3", "D:\eclipse", "D:\gitworkspace"]
        for i in range(3):
            try:
                self.pool.apply_async(self.execute, (dirs[i],), callback=self._callback)
            except Exception as e:
                import logging
                logging.exception(e)

        self.pool.close()
        self.pool.join()

if __name__ == "__main__":
    A().start()

执行上述代码时,多进程无法执行,因为A中包含了无法序列化的pool(进程池)。多进程调用self.execute方法时会将类本身的内容,即self全部序列化传给另外一个进程,pool无法序列化,因此多进程执行失败。只需将pool的初始化放入start函数中即可执行。

import multiprocessing
import os
import random
import sys

class A:
    def execute(self,dirs):
        pid=0
        try:
            fn = "log"+dirs.split('\\')[-1]
            fn = open(fn, "w")
            print("aa", file=fn)
            print("start to exec..." + str(pid), file=fn)

        except Exception as e:
            import logging
            logging.exception(e)
            print("error:%s"%e)
        return pid

    def _callback(self,pid):
        print(str(os.getpid())+"   "+str(pid))

    def start(self):
        
        print(str(os.getpid()))
        dirs = ["D:\Python2to3", "D:\eclipse", "D:\gitworkspace"]
        pool = multiprocessing.Pool(3)
        for i in range(3):
            try:
                pool.apply_async(self.execute, (dirs[i],), callback=self._callback)
            except Exception as e:
                import logging
                logging.exception(e)

        pool.close()
        pool.join()

if __name__ == "__main__":
    A().start()

2、多进程通信时只能使用一层的dict,两层时更改第二层不起作用

一层dict:

from multiprocessing import Pool, Manager

def chid_proc(test_dict, i):
    test_dict[i] = i * i

if __name__ == '__main__':
    #td = {'a':1}
    td = Manager().dict()
    td['a'] = 1
    pool = Pool(3)
    for i in range(0, 3):
        pool.apply_async(chid_proc, args=(td, i))
    pool.close()
    pool.join()
    print (td)
输出为:
{'a': 1, 0: 0, 2: 4, 1: 1}

二层dict:

from multiprocessing import Pool, Manager

def chid_proc(test_dict, i):
    test_dict["a"][i] = i * i

if __name__ == '__main__':
    #td = {'a':1}
    td = Manager().dict()
    td['a'] = {"b":1,"c":1}
    pool = Pool(3)
    for i in range(0, 3):
        pool.apply_async(chid_proc, args=(td, i))
    pool.close()
    pool.join()
    print (td)
输出为:
{'a': {'b': 1, 'c': 1}}

想着可能是因为第二层不是Manager().dict()导致的,将第二层改为Manager().dict()。

from multiprocessing import Pool, Manager

def chid_proc(test_dict, i):
    test_dict["a"][i] = i * i

if __name__ == '__main__':
    #td = {'a':1}
    td = Manager().dict()
    a = Manager().dict()
    a["b"]=1
    td['a'] = a
    pool = Pool(3)
    for i in range(0, 3):
        pool.apply_async(chid_proc, args=(td, i))
    pool.close()
    pool.join()
    print (td)
    print (td["a"])
输出为:
{'a': <DictProxy object, typeid 'dict' at 0x2970ac8>}
{'b': 1, 0: 0, 2: 4, 1: 1}

从上述例子可以看出,多进程间用多层map通信时,每层都要配置其为Manager().dict()。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 用装饰者模式封装数据库操作

    对于python编程人员来说,经常会用pymysql操作数据库。利用sql语句操作数据库时经常会有些额外的操作,比如说打印sql语句,记录sql查询时间,统计业...

    YG
  • 【转】架构漫谈(八):从架构的角度看如何写好代码

    原文链接 本文首发于 InfoQ 旗下垂直社群聊聊架构(微信号 archtime)。

    YG
  • Hadoop Streaming 读ORC文件

    hadoop Streaming的处理流程是先通过inputFormat读出输入文件内容,将其传递mapper,再将mapper返回的key,value传给re...

    YG
  • 一篇文章带你解读从初级运维工程师到资深运维专家的学习路线

    对于一个不了解运维究竟是做什么的同学,可能或多或少的有听过相关话题的讨论和经过一番搜索所知有了短浅的认识。原来运维就是每天很苦逼的在机房工作,甚至可能干着一些搬...

    杰哥的IT之旅
  • AI分析文本信息以改善人际关系

    一个令人误解的文本就会让你与朋友,重要的人或同事陷入麻烦。即使是连续发短信也很有风险,研究表明,大多数接收信息者在44%的时间内都无法区分讽刺和严肃。

    AiTechYun
  • UIWebview与OC交互以及加载失败

    Simulator Screen Shot 2016年4月16日 00.27.57.png

    Python疯子
  • 基础知识 | 每日一面(9)

    读者:对于没有初始化的变量的初始值可以作怎样的假定?如果一个全局变量初始值为 “零”, 它可否作为空指针或浮点零?

    C语言入门到精通
  • 给一个 pool 创建一个快照

    命令:ceph osd pool mksnap {pool-name} {snap-name}

    院长技术
  • 穿越十年后看互联网+:家电行业的金矿在哪里?

    现在市场上炒得火热的智能家居未来出路在何方?做智能家居的创业者应该注意哪些机会?传统家电厂商又到底如何借助互联网进行转型?本文以智能空调为例,用故事的形式,提前...

    华章科技
  • keras 两种训练模型方式详解fit和fit_generator(节省内存)

    自动生成数据还可以继承keras.utils.Sequence,然后写自己的生成数据类:

    砸漏

扫码关注云+社区

领取腾讯云代金券