深入Python多进程通信原理与实战——图文

继上节使用原生多进程并行运行,基于Redis作为消息队列完成了圆周率的计算,本节我们使用原生操作系统消息队列来替换Redis。

文件

使用文件进行通信是最简单的一种通信方式,子进程将结果输出到临时文件,父进程从文件中读出来。文件名使用子进程的进程id来命名。进程随时都可以通过os.getpid()来获取自己的进程id。

# coding: utf-8

import os
import sys
import math


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    pids = []
    unit = n / 10
    for i in range(10):  # 分10个子进程
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            pids.append(pid)
        else:
            s = slice(mink, maxk)  # 子进程开始计算
            with open("%d" % os.getpid(), "w") as f:
                f.write(str(s))
            sys.exit(0)  # 子进程结束
    sums = []
    for pid in pids:
        os.waitpid(pid, 0)  # 等待子进程结束
        with open("%d" % pid, "r") as f:
            sums.append(float(f.read()))
        os.remove("%d" % pid)  # 删除通信的文件
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

输出

3.14159262176

管道pipe

管道是Unix进程间通信最常用的方法之一,它通过在父子进程之间开通读写通道来进行双工交流。我们通过os.read()和os.write()来对文件描述符进行读写操作,使用os.close()关闭描述符。

上图为单进程的管道

上图为父子进程分离后的管道

# coding: utf-8

import os
import sys
import math


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    childs = {}
    unit = n / 10
    for i in range(10):  # 分10个子进程
        mink = unit * i
        maxk = mink + unit
        r, w = os.pipe()
        pid = os.fork()
        if pid > 0:
            childs[pid] = r  # 将子进程的pid和读描述符存起来
            os.close(w)  # 父进程关闭写描述符,只读
        else:
            os.close(r)  # 子进程关闭读描述符,只写
            s = slice(mink, maxk)  # 子进程开始计算
            os.write(w, str(s))
            os.close(w)  # 写完了,关闭写描述符
            sys.exit(0)  # 子进程结束
    sums = []
    for pid, r in childs.items():
        sums.append(float(os.read(r, 1024)))
        os.close(r)  # 读完了,关闭读描述符
        os.waitpid(pid, 0)  # 等待子进程结束
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

输出

3.14159262176

无名套接字socketpair

我们知道跨网络通信免不了要通过套接字进行通信,但是本例的多进程是在同一个机器上,用不着跨网络,使用普通套接字进行通信有点浪费。

上图为单进程的socketpair

上图为父子进程分离后的socketpair

为了解决这个问题,Unix系统提供了无名套接字socketpair,不需要端口也可以创建套接字,父子进程通过socketpair来进行全双工通信。

socketpair返回两个套接字对象,一个用于读一个用于写,它有点类似于pipe,只不过pipe返回的是两个文件描述符,都是整数。所以写起代码形式上跟pipe几乎没有什么区别。

我们使用sock.send()和sock.recv()来对套接字进行读写,通过sock.close()来关闭套接字对象。

# coding: utf-8

import os
import sys
import math
import socket


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    childs = {}
    unit = n / 10
    for i in range(10):  # 分10个子进程
        mink = unit * i
        maxk = mink + unit
        rsock, wsock = socket.socketpair()
        pid = os.fork()
        if pid > 0:
            childs[pid] = rsock
            wsock.close()
        else:
            rsock.close()
            s = slice(mink, maxk)  # 子进程开始计算
            wsock.send(str(s))
            wsock.close()
            sys.exit(0)  # 子进程结束
    sums = []
    for pid, rsock in childs.items():
        sums.append(float(rsock.recv(1024)))
        rsock.close()
        os.waitpid(pid, 0)  # 等待子进程结束
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

输出

3.14159262176

OS消息队列

操作系统也提供了跨进程的消息队列对象可以让我们直接使用,只不过python没有默认提供包装好的api来直接使用。我们必须使用第三方扩展来完成OS消息队列通信。第三方扩展是通过使用Python包装的C实现来完成的。

OS消息队列有两种形式,一种是posix消息队列,另一种是systemv消息队列,有些操作系统两者都支持,有些只支持其中的一个,比如macos仅支持systemv消息队列,我本地的python的docker镜像是debian linux,它仅支持posix消息队列。

posix消息队列 我们先使用posix消息队列来完成圆周率的计算,posix消息队列需要提供一个唯一的名称,它必须是/开头。close()方法仅仅是减少内核消息队列对象的引用,而不是彻底关闭它。unlink()方法才能彻底销毁它。O_CREAT选项表示如果不存在就创建。向队列里塞消息使用send方法,收取消息使用receive方法,receive方法返回一个tuple,tuple的第一个值是消息的内容,第二个值是消息的优先级。之所以有优先级,是因为posix消息队列支持消息的排序,在send方法的第二个参数可以提供优先级整数值,默认为0,越大优先级越高。

# coding: utf-8

import os
import sys
import math
from posix_ipc import MessageQueue as Queue


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    pids = []
    unit = n / 10
    q = Queue("/pi", flags=os.O_CREAT)
    for i in range(10):  # 分10个子进程
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            pids.append(pid)
        else:
            s = slice(mink, maxk)  # 子进程开始计算
            q.send(str(s))
            q.close()
            sys.exit(0)  # 子进程结束
    sums = []
    for pid in pids:
        sums.append(float(q.receive()[0]))
        os.waitpid(pid, 0)  # 等待子进程结束
    q.close()
    q.unlink()  # 彻底销毁队列
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

输出

3.14159262176

systemv消息队列 systemv消息队列和posix消息队列用起来有所不同。systemv的消息队列是以整数key作为名称,如果不指定,它就创建一个唯一的未占用的整数key。它还提供消息类型的整数参数,但是不支持消息优先级。

# coding: utf-8

import os
import sys
import math
import sysv_ipc
from sysv_ipc import MessageQueue as Queue


def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s


def pi(n):
    pids = []
    unit = n / 10
    q = Queue(key=None, flags=sysv_ipc.IPC_CREX)
    for i in range(10):  # 分10个子进程
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            pids.append(pid)
        else:
            s = slice(mink, maxk)  # 子进程开始计算
            q.send(str(s))
            sys.exit(0)  # 子进程结束
    sums = []
    for pid in pids:
        sums.append(float(q.receive()[0]))
        os.waitpid(pid, 0)  # 等待子进程结束
    q.remove()  # 销毁消息队列
    return math.sqrt(sum(sums) * 8)


print pi(10000000)

输出

3.14159262176

共享内存

共享内存也是非常常见的多进程通信方式,操作系统负责将同一份物理地址的内存映射到多个进程的不同的虚拟地址空间中。进而每个进程都可以操作这份内存。考虑到物理内存的唯一性,它属于临界区资源,需要在进程访问时搞好并发控制,比如使用信号量。我们通过一个信号量来控制所有子进程的顺序读写共享内存。我们分配一个8字节double类型的共享内存用来存储极限的和,每次从共享内存中读出来时,要使用struct进行反序列化(unpack),将新的值写进去之前也要使用struct进行序列化(pack)。每次读写操作都需要将读写指针移动到内存开头位置(lseek)。

# coding: utf-8
import os
import sys
import math
import struct
import posix_ipc
from posix_ipc import Semaphore
from posix_ipc import SharedMemory as Memory
def slice(mink, maxk):
    s = 0.0
    for k in range(mink, maxk):
        s += 1.0/(2*k+1)/(2*k+1)
    return s
def pi(n):
    pids = []
    unit = n / 10
    sem_lock = Semaphore("/pi_sem_lock", flags=posix_ipc.O_CREX, initial_value=1)  # 使用一个信号量控制多个进程互斥访问共享内存
    memory = Memory("/pi_rw", size=8, flags=posix_ipc.O_CREX)
    os.lseek(memory.fd, 0, os.SEEK_SET)  # 初始化和为0.0的double值
    os.write(memory.fd, struct.pack('d', 0.0))
    for i in range(10):  # 分10个子进程
        mink = unit * i
        maxk = mink + unit
        pid = os.fork()
        if pid > 0:
            pids.append(pid)
        else:
            s = slice(mink, maxk)  # 子进程开始计算
            sem_lock.acquire()
            try:
                os.lseek(memory.fd, 0, os.SEEK_SET)
                bs = os.read(memory.fd, 8)  # 从共享内存读出来当前值
                cur_val, = struct.unpack('d', bs)  # 反序列化,逗号不能少
                cur_val += s  # 加上当前进程的计算结果
                bs = struct.pack('d', cur_val) # 序列化
                os.lseek(memory.fd, 0, os.SEEK_SET)
                os.write(memory.fd, bs)  # 写进共享内存
                memory.close_fd()
            finally:
                sem_lock.release()
            sys.exit(0)  # 子进程结束
    sums = []
    for pid in pids:
        os.waitpid(pid, 0)  # 等待子进程结束
    os.lseek(memory.fd, 0, os.SEEK_SET)
    bs = os.read(memory.fd, 8)  # 读出最终这结果
    sums, = struct.unpack('d', bs)  # 反序列化
    memory.close_fd()  # 关闭共享内存
    memory.unlink()  # 销毁共享内存
    sem_lock.unlink()  #  销毁信号量
    return math.sqrt(sums * 8)
print pi(10000000)

输出

3.14159262176

原文发布于微信公众号 - 码洞(codehole)

原文发表时间:2018-05-29

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Android干货

Python IO编程

循环读取文件内容,一般读取文件内容一次读取完,内存是不够的,就要实现一次次少量数据读取

12620
来自专栏python学习路

八、线程和进程 什么是线程(thread)?什么是进程(process)? 线程和进程的区别?Python GIL(Global Interpreter Lock)全局解释器锁

什么是线程(thread)? 线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一...

49970
来自专栏决胜机器学习

《Redis设计与实现》读书笔记(二十八) ——Redis集群节点结构与槽分配

《Redis设计与实现》读书笔记(二十八) ——Redis集群节点结构与槽分配 (原创内容,转载请注明来源,谢谢) 一、概述 redis集群是...

46760
来自专栏Java帮帮-微信公众号-技术文章全总结

01.线程状态/创建/启动

01.线程状态/创建/启动 多线程作为Java中很重要的一个知识点,在此还是有必要总结一下的。 一.线程的生命周期及五种基本状态 关于Java中线程的生命周期,...

40770
来自专栏小白安全

代码审计工具 Cobra 源码分析

0x00 前言 @0r3ak 师傅向我推荐了一款代码审计工具Cobra(wufeifei/cobra),该工具基于Python开发,可以针对多种语言的源...

49670
来自专栏风中追风

java类的加载过程和类加载器的分析

我们知道,我们写的java代码保存的格式是 .java, java文件被编译后会转换为字节码,字节码可以在任何平台通过java虚拟机来运行,这也是java能够跨...

55780
来自专栏偏前端工程师的驿站

Java魔法堂:打包知识点之jar

一、前言                                    通过eclipse导出jar包十分方便快捷,但作为码农岂能满足GUI的便捷呢?所...

24870
来自专栏Python爬虫与算法进阶

Python函数超时,用装饰器解决

我们在自定义一个函数后,会调用这个函数来完成我们想要的功能。 就拿爬虫来举例,你发送请求,服务器给你响应,但是有可能服务器没有给你任何数据,无论是他识别了爬虫、...

36020
来自专栏数据结构与算法

22:紧急措施

22:紧急措施 总时间限制: 1000ms 内存限制: 65536kB描述 近日,一些热门网站遭受黑客入侵,这些网站的账号、密码及email的数据惨遭泄露。你...

39380
来自专栏闪电gogogo的专栏

Python初学——多线程Threading

接着上篇继续跟着沫凡小哥学Python啦 1.1 什么是多线程 Threading 多线程可简单理解为同时执行多个任务。 多进程和多线程都可以执行多个任务,线程...

22650

扫码关注云+社区

领取腾讯云代金券