前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >用 Python 实现并行计算

用 Python 实现并行计算

作者头像
老齐
发布2021-10-11 17:09:46
7.5K0
发布2021-10-11 17:09:46
举报
文章被收录于专栏:老齐教室老齐教室

:不少学过点编程语言的人,都会抱怨 Python 语言的程序执行速度慢,因此对学习和使用此语言嗤之以鼻。暂且不论程序的执行速度是否是开发者追求的唯一目标(有意对此进行争论的,请参阅人民邮电出版社出版的《编程的原则》一书),单就提升 Python 计算速度而言,并行计算是一个重要的选项。本文即为这方面的入门资料。


Python 非常适合训练机器学习模型、进行数值计算、以及快速开发验证性的模型等。使用 Python ,所需要的辅助工具和也依赖项都很少。在执行这些任务时,你还希望尽可能多地使用底层硬件,以便获得更高的速度。Python 代码的并行化可以实现这一目标。但是,使用标准的 CPython 则无法充分使用底层硬件的计算能力,因为全局解释器锁(GIL)会阻止多个线程同时运行字节码。

本文汇总了一些用 Python 代码实现并行计算的常见方法,包括:

  • 基于进程的并行计算
  • 使用专用库实现并行计算
  • IPython 中的并行计算
  • 用第三方库 Ray 实现并行计算

对于每种实现并行计算的技术,本文都列出了一些优点和缺点,并展示了代码示例,以帮助你了解其使用情况。

并行化 Python 代码

有几种常见的方法可以让 Python 代码实现并行运行——可以说成“并行化”。例如启动多个应用程序实例或启动某个脚本来并行执行程序。若不需要在并行的进程之间交换数据时,这种方法非常有用。否则,在进程之间共享数据会在聚合数据时显著降低运算性能。

在同一个进程中启动多个线程可以更有效地在作业之间共享数据。在这种情况下,基于线程的并行化可以将一些工作转移到后台。然而,CPython 实现的全局解释器锁(GIL)阻止了字节码在多个线程中同时运行。

下面示例中的函数模拟了复杂计算(旨在模拟激活函数)。

代码语言:javascript
复制
import math
import numpy as np
from timebudget import timebudget

iterations_count = round(1e7)

def complex_operation(input_index):
    print("Complex operation. Input index: {:2d}".format(input_index))
    [math.exp(i) * math.sinh(i) for i in [1] * iterations_count]

为了更直观地计算时间,将函数 complex_operation() 执行多次。将输入的数据划分为几个子集,然后对这些子集并行计算。

下面调用函数 complex_operation() 的代码中,将其多次执行( input 的区间是 0~10 ),并使用 timebudget 包来度量执行时间( pip install timebudget

代码语言:javascript
复制
@timebudget
def run_complex_operations(operation, input):
    for i in input:
        operation(i)

input = range(10)
run_complex_operations(complex_operation, input)

执行上述程序,输出结果如下:

代码语言:javascript
复制
Complex operation. Input index:  0
Complex operation. Input index:  1
Complex operation. Input index:  2
Complex operation. Input index:  3
Complex operation. Input index:  4
Complex operation. Input index:  5
Complex operation. Input index:  6
Complex operation. Input index:  7
Complex operation. Input index:  8
Complex operation. Input index:  9
run_complex_operations took 34.495sec

如你所见,在本文中使用的笔记本电脑上执行这段代码大约花了 34.5 秒。这是没有采用任何并行化技术的执行结果,下面就让我们看看如何用并行化方式优化。

基于进程的并行计算

第一种方法是基于进程的并行。使用这种方法,可以同时(即“并发”)启动多个进程,这样,它们就可以并发地执行计算。

从 Python 3开始,标准库中已经有了实现多进程的模块 multiprocessing ,用它可以非常便捷地实现多进程进程并发。multiprocessing 模块中的 Pool 类,能自动将输入划分为若干个子集,并将这些子集分配给多个进程。

在前述代码中,使用 Pool 启动 10 个进程,完整代码如下:

代码语言:javascript
复制
import math
import numpy as np
from timebudget import timebudget
from multiprocessing import Pool

iterations_count = round(1e7)

def complex_operation(input_index):
    print("Complex operation. Input index: {:2d}".format(input_index))
    [math.exp(i) * math.sinh(i) for i in [1] * iterations_count]

@timebudget
def run_complex_operations(operation, input, pool):
    pool.map(operation, input)
    # for i in input:
    #     operation(i)

# input = range(10)
# run_complex_operations(complex_operation, input)

processes_count = 10

if __name__ == '__main__':
    processes_pool = Pool(processes_count)
    run_complex_operations(complex_operation, range(10), processes_pool)

每个进程同时执行 complex_operations() 函数,因此,从理论上讲,这些代码可以将总的执行时间减少 10 倍。然而,试试并非如此。以下是译者的执行结果(在翻译本文的时候,译者将所有代码重新执行,在文中显示的是译者的执行结果):

代码语言:javascript
复制
Complex operation. Input index:  0
Complex operation. Input index:  1
Complex operation. Input index:  2
Complex operation. Input index:  3
Complex operation. Input index:  4
Complex operation. Input index:  5
Complex operation. Input index:  6
Complex operation. Input index:  7
Complex operation. Input index:  8
Complex operation. Input index:  9
run_complex_operations took 10.645sec

与之前的运行结果比较,并没有将执行时间缩短 10 倍,其原因有多方面,首先要考察的是本地计算机中 CPU 的数量,它决定了最大进程数。

代码语言:javascript
复制
>>> import os
>>> print('Number of CPUs in the system: {}'.format(os.cpu_count()))
Number of CPUs in the system: 8

os 模块中的 os.cpu_count() 函数能得到本地计算机中 CPU 的数量。

另外一个导致上述程序没有如预想那样大幅度降低运算时间的原因,跟程序汇总的计算量较小也有关系。这是因为进程之间必须通过进程间通信机制实现通信,这些计算开销,对于比较小的计算任务而言,并行计算通常比 Python 编写的普通程序所执行的串行计算更慢。

总结基于进程的并行计算的优劣:

优点

劣势

应用简单

性能不如 Ray (关于 Ray ,见后续)

摆脱了 GIL 限制

因共享数据而降低性能

对结果的聚合需要手动实现

利用专用库

NumPy 等专用于计算的库可以在许多计算上不受 GIL 的限制,于是就能用进程和其他技术实现并行计算。下面就介绍将 NumPy 用于并行计算的方式。

为了比较使用 Numpy 与否在计算中的差异,需要编写如下函数。

代码语言:javascript
复制
def complex_operation_numpy(input_index):
      print(f"Complex operation (numpy). Input index: {input_index:2d}")

      data = np.ones(iterations_count)
      np.exp(data) * np.sinh(data)

函数中使用 NumPy 的 np.exp()np.sinh() 两个函数对输入数据执行计算。然后,使用进程池执行 complex_operation()complex_operation_numpy() 函数各十次,以比较它们的性能。

代码语言:javascript
复制
processes_count = 10
input = range(10)

if __name__ == '__main__':
    processes_pool = Pool(processes_count)
    print(‘Without NumPy’)
    run_complex_operations(complex_operation, input, processes_pool)
    print(‘NumPy’)
    run_complex_operations(complex_operation_numpy, input, processes_pool)

以下为执行结果:

代码语言:javascript
复制
Without Numpy
Complex operation. Input index:  0
Complex operation. Input index:  1
Complex operation. Input index:  2
Complex operation. Input index:  3
Complex operation. Input index:  4
Complex operation. Input index:  5
Complex operation. Input index:  6
Complex operation. Input index:  8
Complex operation. Input index:  7
Complex operation. Input index:  9
run_complex_operations took 11.874sec
Numpy
Complex operation (numpy). Input index:  1
Complex operation (numpy). Input index:  2
Complex operation (numpy). Input index:  3
Complex operation (numpy). Input index:  4
Complex operation (numpy). Input index:  0
Complex operation (numpy). Input index:  5
Complex operation (numpy). Input index:  6
Complex operation (numpy). Input index:  7
Complex operation (numpy). Input index:  8
Complex operation (numpy). Input index:  9
run_complex_operations took 845.87ms

NumPy 使性能得到了大幅度提升,846ms vs 12s 。之所 NumPy 能更快,其原因是其中的大多数处理都是向量化的。向量化实际上使底层代码可以“并行化”,因为该操作可以一次计算多个数组元素,而不是一次遍历一个数组元素。

NumPy 的优点

NumPy 的劣势

简单易用

对结果的聚合需要手动实现

多数 NumPy 计算不受 GIL 限制,但不是全部

有限的数值计算

支持向量化

自定义算法比较麻烦

###使用 IPython 的并行计算包

IPython 是数据科学研究者使用的一个工具,能够实现交互式操作,后来被更名为 Jupyter (参阅《跟老齐学 Python:数据分析》)。除了这些之外,它还提供了一个用于并行计算的包“IPython Parallel”,安装方法如下:

代码语言:javascript
复制
pip install ipyparallel

的官方网站:https://ipyparallel.readthedocs.io/en/latest/ 。

IPython Parallel 有很多优点,其中最令人神往的可能是它允许以交互的方式开发、执行和监视并行应用程序。

一种使用 IPython Parallel 的方式是参考官方文档中的样式,在 Jupyter 中直接调用。

下面演示的是另外一种方式。首先准备好代码,如下所示(文件名称 parallelprocess.py

代码语言:javascript
复制
import math
import numpy as np
from timebudget import timebudget
import ipyparallel as ipp

iterations_count = round(1e7)

def complex_operation(input_index):
    print("Complex operation. Input index: {:2d}".format(input_index))

    [math.exp(i) * math.sinh(i) for i in [1] * iterations_count]

def complex_operation_numpy(input_index):
    print("Complex operation (numpy). Input index: {:2d}".format(input_index))

    data = np.ones(iterations_count)
    np.exp(data) * np.sinh(data)

@timebudget
def run_complex_operations(operation, input, pool):
    pool.map(operation, input)

client_ids = ipp.Client()
pool = client_ids[:]

input = range(10)
print('Without NumPy')
run_complex_operations(complex_operation, input, pool)
print('NumPy')
run_complex_operations(complex_operation_numpy, input, pool)

然后打开一个终端,输入如下 ipcluster 命令(是在命令行状态):

代码语言:javascript
复制
% ipcluster start -n 10
2021-09-17 13:21:24.805 [IPClusterStart] Starting ipcluster with [daemonize=False]
2021-09-17 13:21:25.898 [IPClusterStart] Starting 10 engines with <class 'ipyparallel.cluster.launcher.LocalEngineSetLauncher'>
2021-09-17 13:21:56.945 [IPClusterStart] Engines appear to have started successfully

出现上述 Engines appear to have started successfully 提示之后,再打开一个终端,执行前述程序文件,如下所示:

代码语言:javascript
复制
% python parallelprocess.py
Without NumPy
run_complex_operations took 8.18ms
NumPy
run_complex_operations took 6.76ms

上述结果显示,对于使用和不使用 NumPy 两种情况下下,均用 IPython Parallel 进行并行处理,运算速度远远快于前述两种条件下的执行结果。

IPython 的有点

IPython 的劣势

支持并行和分布计算

适用于较短的作业内容

能用于 Jupyter notebook

如果要执行过程的输出,需要额外的配置

配置简单

Ray

Ray 是一款实现并行和分布计算的第三方库,它具有快速、简单的特点,可以轻松地扩展应用程序,并适用于最先进的机器学习库。使用 Ray,还是像以往那样运行 Python 代码,只需要做很小的改动。

下面会简要介绍 Ray 是如何轻松地并行化普通的 Python 代码的,但需要注意的是,Ray 及其生态系统也可以轻松地并行化其他库,如 scikit-learn,XGBoost, LightGBM, PyTorch, 等等。

首先要安装 Ray :

代码语言:javascript
复制
pip install ray

然后在前面的 parallelprocess.py 文件基础上进行修改,最后的完整代码如下(并命名为 rayprocess.py 文件)

代码语言:javascript
复制
import math
import numpy as np
from timebudget import timebudget
import ray

iterations_count = round(1e7)

@ray.remote
def complex_operation(input_index):
    print(f"Complex operation. Input index: {input_index:2d}")
    [math.exp(i) * math.sinh(i) for i in [1] * iterations_count]

@ray.remote
def complex_operation_numpy(input_index):
    print(f"Complex operation (numpy). Input index: {input_index:2d}")
    data = np.ones(iterations_count)
    np.exp(data) * np.sinh(data)

@timebudget
def run_complex_operations(operation, input):
    ray.get([operation.remote(i) for i in input])

ray.init()

input = range(10)
print('Without NumPy')
run_complex_operations(complex_operation, input)
print('NumPy')
run_complex_operations(complex_operation_numpy, input)

其中 ray.init() 的作用是启动所有相关的 Ray 进程。默认情况下,Ray 为每个 CPU 核创建一个进程。如果希望在集群上运行 Ray ,则需要传入一个类似于ray.init(address='insertAddressHere') 的集群地址。

用装饰器 @ray.remote 装饰一个普通的 Python 函数,从而实现创建一个 Ray 任务。这个操作可以在笔记本电脑 CPU 核之间(或 Ray 集群)实现任务调度。

在最后一步中,以 @timebudget 装饰 run_complex_operations() 函数,在 Ray 的调用时间内执行这些函数。

执行此程序后,会得到一个类似于下面的输出:

代码语言:javascript
复制
% python rayprocess.py 
/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/ray/_private/services.py:238: UserWarning: Not all Ray Dashboard dependencies were found. To use the dashboard please install Ray using `pip install ray[default]`. To disable this message, set RAY_DISABLE_IMPORT_WARNING env var to '1'.
  warnings.warn(warning_message)
Without NumPy
(pid=9351) Complex operation. Input index:  7
(pid=9352) Complex operation. Input index:  5
(pid=9353) Complex operation. Input index:  6
(pid=9354) Complex operation. Input index:  1
(pid=9356) Complex operation. Input index:  2
(pid=9358) Complex operation. Input index:  0
(pid=9355) Complex operation. Input index:  3
(pid=9357) Complex operation. Input index:  4
(pid=9351) Complex operation. Input index:  8
(pid=9358) Complex operation. Input index:  9
run_complex_operations took 12.731sec
NumPy
(pid=9351) Complex operation (numpy). Input index:  1
(pid=9352) Complex operation (numpy). Input index:  7
(pid=9353) Complex operation (numpy). Input index:  3
(pid=9354) Complex operation (numpy). Input index:  5
(pid=9356) Complex operation (numpy). Input index:  2
(pid=9358) Complex operation (numpy). Input index:  0
(pid=9355) Complex operation (numpy). Input index:  4
(pid=9357) Complex operation (numpy). Input index:  6
(pid=9351) Complex operation (numpy). Input index:  9
(pid=9354) Complex operation (numpy). Input index:  8
run_complex_operations took 858.52ms

结果中显示了对于当前 Ray 任务而言的、使用和不使用 NumPy 的运行时间。这里似乎没有体现出 Ray 相对于前述其他并行计算方法的优势,这是因为我们在上面演示的属于小量的计算任务,如果遇到更大的业务,Ray 的优势就会非常显著,如下图所示。

在下面的表格中,对 Ray 给予简要总结。

Ray 的优点

Ray 的劣势

支持并行计算和分布计算

针对更大型的业务才会有显著效果

可以在 Jupyter 上使用

能够应用于现有的常见机器学习和神经网络库

整合了多个 Ray 库,如 RLlib(用于强化学习)、Ray Tune(超参数调优)、Ray Serve(可伸缩模式)

结论

有多种方法可以让 Python 程序实现并行化执行,并且本文还介绍了它们的一些优缺点。并行化的代码通常会带来一些开销;并行化的好处在较大的业务中更明显,而不是在本文中的简短计算中。

特别是在处理典型的基于人工智能的任务时,你必须对你的模型进行重复的微调。在这种情况下,Ray 提供了最好的支持,因为它拥有丰富的生态系统、自动伸缩、容错和远程服务等能力。

参考文献

https://www.anyscale.com/blog/parallelizing-python-code

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-09-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 老齐教室 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 并行化 Python 代码
    • 基于进程的并行计算
      • 利用专用库
        • Ray
        • 结论
        • 参考文献
        相关产品与服务
        GPU 云服务器
        GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档