前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >通过 multiprocessing 实现 python 多进程

通过 multiprocessing 实现 python 多进程

作者头像
用户3147702
发布2022-06-27 13:31:28
7330
发布2022-06-27 13:31:28
举报
文章被收录于专栏:小脑斧科技博客

1. 引言

此前一系列文章中,我们介绍了 Python 的threading 包中的一系列工具。 python 的线程 Python 线程同步(一) — 竞争条件与线程锁 python 线程同步(二) — 条件对象 python 线程同步(三) — 信号量 python 线程同步(四) — 事件对象与栅栏

threading 包为 Python 提供了线程模型,而 multiprocessing 包则为另一种并发模型 — 多进程模型提供了强大的解决方案。 multiprocessing 与 threading 十分相似,他提供了基本的进程对象类以及功能强大的进程同步工具,同时,multiprocessing 还提供了进程池的封装类 Pool。

2. 多进程 vs 多线程

此前我们介绍了 Python 中的 GIL 锁,受此影响,Python 每一个时刻只能调度一个线程,这意味着并发并没有真的在进行。 而多进程则不同,多进程并发的模式中,由于进程间严格的隔离,他们得以真正的并行执行。 同时,Python 多进程让多核 CPU 得以被利用。

但相比多线程机制,多进程的模式也存在一些缺点和不足:

  1. 进程切换更为耗时
  2. 进程间通信相比线程间共享的数据更为复杂

3. multiprocessing 提供的方法

multiprocessing 提供的方法

方法

描述

active_children

返回当前进程存活的子进程的列表

cpu_count

返回系统的 CPU 数量,但并不是当前进程可用的数量,len(os.sched_getaffinity(0)) 方法获取的是当前进程可用的数量

current_process

获取当前进程的 Process 对象

get_all_start_methods

返回支持的启动方法的列表,该列表的首项即为默认选项,包括我们后面即将要介绍的 ’fork’, ’spawn’ 和 ’forkserver’

get_context

返回进程上下文 Context 对象

get_start_method

获取当前启动进程的启动方法,’fork’ , ’spawn’ , ’forkserver’ 或者 None(如果没有设置)

set_executable

设置在启动子进程时使用的 Python 解释器路径,例如:set_executable(os.path.join(sys.exec_prefix, ’pythonw.exe’))

set_start_method

设置启动子进程的方法 ’fork’ , ’spawn’ 或者 ’forkserver’

4. Process 类与子进程创建

你会发现 Process 类与 Thread 类十分相似,他们都通过 start 方法启动并开始执行 run 方法的内容,同时,join 用来阻塞等待某个进程完成执行。 但是不同的是,这些方法只能由被调用进程的父进程来调用。

4.1. Process 类成员

4.1.1. 类成员属性

  • name — 进程名
  • daemon — 布尔值,是否是守护进程
  • pid — 进程 id
  • exitcode — 进程退出时的退出码,如果被信号终止,则返回信号值的相反数,进程未退出前该值为 None
  • authkey — 进程身份秘钥,字节字符串,当 multiprocessing 初始化时,主进程使用 os.urandom() 分配一个随机字符串,创建 Process 对象时,子进程继承父进程的身份秘钥
  • sentinel — 系统对象的数字句柄,在 UNIX 上是一个文件描述符

4.1.2. 类成员方法

  • run — 进程的具体活动
  • start — 启动进程活动
  • join — 等待进程执行结束或超时
  • is_alive — 判断进程是否存活
  • terminate — 终止进程,在 UNIX 环境中,通过给进程发送 SIGTERM 信号实现,在 Windows 环境中,通过 TerminateProcess 方法实现,被终止进程的子进程将不会被一起终止

需要注意的是,正如我们上面所说,start() 、 join() 、 is_alive() 、 terminate() 和 exitcode 方法只能由创建进程对象的进程调用。

4.2. 示例 — 通过 Process 类创建进程

4.2.1. 通过继承 Process 类实现子进程创建

代码语言:javascript
复制
import logging
from multiprocessing import Process
from time import sleep, ctime

class myProcess(Process):
    def __init__(self, nsec):
        super().__init__()
        self.nsec = nsec

    def run(self):
        logging.info('start_sleep[%s]' % self.nsec)
        sleep(self.nsec + 1)
        logging.info('end_sleep[%s]' % self.nsec)

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s')
    logging.info('start at %s'% ctime())
    processes = list()
    for i in range(5):
        t = myProcess(i)
        processes.append(t)

    for process in processes:
        process.start()

    for process in processes:
        process.join()
    logging.info('end at %s' % ctime())

打印出了:

2019-05-22 10:02:24,705 - INFO: start at Wed May 22 10:02:24 2019 2019-05-22 10:02:24,739 - INFO: start_sleep[2] 2019-05-22 10:02:24,740 - INFO: start_sleep[1] 2019-05-22 10:02:24,742 - INFO: start_sleep[4] 2019-05-22 10:02:24,743 - INFO: start_sleep[0] 2019-05-22 10:02:24,743 - INFO: start_sleep[3] 2019-05-22 10:02:25,745 - INFO: end_sleep[0] 2019-05-22 10:02:26,746 - INFO: end_sleep[1] 2019-05-22 10:02:27,747 - INFO: end_sleep[2] 2019-05-22 10:02:28,748 - INFO: end_sleep[3] 2019-05-22 10:02:29,749 - INFO: end_sleep[4] 2019-05-22 10:02:29,750 - INFO: end at Wed May 22 10:02:29 2019

4.3. 通过创建 Process 对象实现子进程创建

代码语言:javascript
复制
import logging
from multiprocessing import Process
from time import sleep, ctime

def sleep_func(pindex):
    logging.info('start_sleep[%s]' % pindex)
    sleep(pindex + 1)
    logging.info('end_sleep[%s]' % pindex)

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s: %(message)s')
    logging.info('start at %s' % ctime())
    processes = list()
    for i in range(5):
        process = Process(target=sleep_func, args=[i])
        processes.append(process)

    for process in processes:
        process.start()

    for process in processes:
        process.join()
    logging.info('end at %s' % ctime())

打印出了:

2019-05-22 10:07:30,340 - INFO: start at Wed May 22 10:07:30 2019 2019-05-22 10:07:30,346 - INFO: start_sleep[2] 2019-05-22 10:07:30,348 - INFO: start_sleep[3] 2019-05-22 10:07:30,350 - INFO: start_sleep[0] 2019-05-22 10:07:30,351 - INFO: start_sleep[4] 2019-05-22 10:07:30,349 - INFO: start_sleep[1] 2019-05-22 10:07:31,353 - INFO: end_sleep[0] 2019-05-22 10:07:32,353 - INFO: end_sleep[1] 2019-05-22 10:07:33,354 - INFO: end_sleep[2] 2019-05-22 10:07:34,353 - INFO: end_sleep[3] 2019-05-22 10:07:35,353 - INFO: end_sleep[4] 2019-05-22 10:07:35,354 - INFO: end at Wed May 22 10:07:35 2019

可以看到,multiprocessing 的用法与 threading 中的用法简直是一模一样。

5. 进程的启动方法

根据不同的平台,multiprocessing 有三种启动进程的方法:

  1. spawn — 父进程启动一个新的Python解释器进程。子进程只会继承那些运行进程对象的 run() 方法所需的资源,父进程中非必须的文件描述符和句柄则不会被继承,与另两种方法相比,这个方法启动进程非常慢,是 windows 上的默认设置,也可用在 Unix 中
  2. fork — 通过 os.fork() 方法创建子进程,子进程在开始时与父进程完全相同,会继承父进程中的所有资源,只能用于 Unix,是 Unix 系统中的默认方式
  3. forkserver — 启动服务器进程,并从此刻开始,每当需要一个新进程时,父进程就会连接到服务器并请求它分叉一个新进程,从而避免父进程中的资源被继承,只能用于 Unix 环境中

通过 multiprocessing.set_start_method 方法,可以设置不同的启动方法:

代码语言:javascript
复制
import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

5.1. 注意

需要注意的是,在程序中 set_start_method() 不应该被多次调用,不同上下文启动的进程可能是不兼容的,比如使用 fork 上下文创建的锁不能传递给使用 spawn 或 forkserver 启动方法启动的进程。

6. 通过多进程处理 CPU 密集型运算

下面我们来对比一下多进程、多线程运行 CPU 密集型任务的耗时情况:

代码语言:javascript
复制
import time
from multiprocessing import Process
from threading import Thread

def count(x, y):
    # 使程序完成50万计算
    c = 0
    while c < 500000:
        c += 1
        x += x
        y += y

if __name__ == '__main__':
    t = time.time()
    for x in range(10):
        count(1, 1)
    print("Line", time.time() - t)

    counts = []
    t = time.time()
    for x in range(10):
        thread = Thread(target=count, args=(1, 1))
        counts.append(thread)
        thread.start()
    for thread in counts:
        thread.join()
    print("Threading", time.time() - t)

    counts = []
    t = time.time()
    for x in range(10):
        process = Process(target=count, args=(1, 1))
        counts.append(process)
        process.start()
    for process in counts:
        process.join()
    print("Multiprocess", time.time() - t)

程序非常简单,我们分别进行 50 万次计算,得到结果如下:

Line 69.52206325531006 Threading 55.799378633499146 Multiprocess 44.240989685058594

多进程运行确实有着性能优势,但也没有我们想象中那么大。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-05-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 小脑斧科技博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 引言
  • 2. 多进程 vs 多线程
  • 3. multiprocessing 提供的方法
  • 4. Process 类与子进程创建
    • 4.1. Process 类成员
      • 4.1.1. 类成员属性
      • 4.1.2. 类成员方法
    • 4.2. 示例 — 通过 Process 类创建进程
      • 4.2.1. 通过继承 Process 类实现子进程创建
    • 4.3. 通过创建 Process 对象实现子进程创建
    • 5. 进程的启动方法
      • 5.1. 注意
      • 6. 通过多进程处理 CPU 密集型运算
      相关产品与服务
      云服务器
      云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档