虽然Python的多处理库已成功用于广泛的应用程序,但在本博文中发现它不适用于几个重要的应用程序类,包括数值数据处理,有状态计算和昂贵的初始化计算。主要有两个原因:
https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing
Ray是一个快速,简单的框架,用于构建和运行解决这些问题的分布式应用程序。Ray利用 Apache Arrow实现高效的数据处理,并为分布式计算提供任务和角色抽象。
https://github.com/ray-project/ray
https://arrow.apache.org/
这篇博客文章对三个工作负载进行了基准测试,这三个工作负载不易用Python多处理表示,并比较Ray,Python多处理和串行Python代码。请注意始终与优化的单线程代码进行比较非常重要。
在这些基准,Ray是10-30倍比串行Python的速度更快,5-25x比多处理更快,5-15x比这两个还要快的大型机器上。
在具有48个物理内核机器,Ray是9倍比Python多快和28X比单线程的Python更快。描绘了误差条,但在某些情况下,误差条太小而无法看到。下面提供了复制这些数字的代码。工作负载按比例缩放到内核数量,因此在更多内核上完成了更多工作(这就是为什么串行Python在更多内核上需要更长时间)
基准测试使用m5实例类型在EC2上运行(m5.large用于1个物理内核,m5.24xlarge用于48个物理内核)。此处提供了运行所有基准的代码。此帖子中包含缩写的片段。主要区别在于完整的基准测试包括1)计时和打印代码,2)用于预热Ray对象存储的代码,以及3)用于使基准测试适应较小机器的代码。
https://gist.github.com/robertnishihara/2b81595abd4f50a049767a040ce435ab
基准1:数字数据
许多机器学习,科学计算和数据分析工作负载大量使用大量数据。例如,阵列可以表示大图像或数据集,并且应用程序可能希望具有多个任务来分析图像。有效处理数值数据至关重要。
每次通过for循环低于需要0.84s与Ray,7.5S与Python多重处理,和24S具有串行的Python(48个物理内核)。这种性能差距解释了为什么可以在Ray之上构建像Modin这样的库,而不是在其他库之上。
Ray的代码如下所示。
import numpy as npimport psutilimport rayimport scipy.signal num_cpus = psutil.cpu_count(logical=False) ray.init(num_cpus=num_cpus) @ray.remotedef f(image, random_filter): # Do some image processing. return scipy.signal.convolve2d(image, random_filter)[::5, ::5] filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)] # Time the code below. for _ in range(10): image = np.zeros((3000, 3000)) image_id = ray.put(image) ray.get([f.remote(image_id, filters[i]) for i in range(num_cpus)])
使用Ray的玩具图像处理示例的代码
通过调用ray.put(image),大型数组存储在共享内存中,并且可以由所有工作进程访问,而无需创建副本。这不仅适用于数组,也适用于包含数组的对象(如数组列表)。
当工作人员执行f任务时,结果将再次存储在共享内存中。然后,当脚本调用时ray.get([...]),它会创建由共享内存支持的numpy数组,而不必反序列化或复制值。
通过使用Apache Arrow作为底层数据布局和序列化格式以及Plasma共享内存对象存储,可以实现这些优化。
Python多处理代码如下所示。
from multiprocessing import Poolimport numpy as npimport psutilimport scipy.signal num_cpus = psutil.cpu_count(logical=False) def f(args): image, random_filter = args # Do some image processing. return scipy.signal.convolve2d(image, random_filter)[::5, ::5] pool = Pool(num_cpus) filters = [np.random.normal(size=(4, 4)) for _ in range(num_cpus)] # Time the code below. for _ in range(10): image = np.zeros((3000, 3000)) pool.map(f, zip(num_cpus * [image], filters))
使用多处理的玩具图像处理示例的代码
这里的不同之处在于Python多处理在进程之间传递大型对象时使用pickle来序列化大对象。这种方法要求每个进程创建自己的数据副本,这增加了大量的内存使用以及昂贵的反序列化的开销,Ray通过使用Apache Arrow数据布局进行零拷贝序列化以及Plasma存储来避免这种情况。
基准2:有状态计算
需要在许多小型工作单元之间共享大量“状态”的工作负载是另一类工作负载,这些工作负载对Python多处理提出了挑战。这种模式非常普遍,用它来说明玩具流处理应用程序。
在具有48个物理内核的机器上,Ray 比Python多处理快6 倍,比单线程Python快17倍。Python多处理在少于24个内核上的性能不超过单线程Python。工作负载按比例缩放到内核数量,因此在更多内核上完成了更多工作(这就是为什么串行Python在更多内核上需要更长时间)
State通常封装在Python类中,而Ray提供了一个actor抽象,以便可以在并行和分布式设置中使用类。相比之下,Python多处理并不提供并行化Python类的自然方式,因此用户通常需要在map调用之间传递相关状态。这种策略在实践中很难实现(许多Python变量不容易序列化),并且当它工作时它可能很慢。
下面是一个玩具示例,它使用并行任务一次处理一个文档,提取每个单词的前缀,并在最后返回最常见的前缀。前缀计数存储在actor状态中,并由不同的任务进行变异。
本例将3.2S与Ray,21S与Python多重处理,和54S具有串行的Python(48个物理内核)。
该Ray的版本如下所示。
from collections import defaultdictimport numpy as npimport psutilimport ray num_cpus = psutil.cpu_count(logical=False) ray.init(num_cpus=num_cpus) @ray.remoteclass StreamingPrefixCount(object): def __init__(self): self.prefix_count = defaultdict(int) self.popular_prefixes = set() def add_document(self, document): for word in document: for i in range(1, len(word)): prefix = word[:i] self.prefix_count[prefix] += 1 if self.prefix_count[prefix] > 3: self.popular_prefixes.add(prefix) def get_popular(self): return self.popular_prefixes streaming_actors = [StreamingPrefixCount.remote() for _ in range(num_cpus)] # Time the code below. for i in range(num_cpus * 10): document = [np.random.bytes(20) for _ in range(10000)] streaming_actors[i % num_cpus].add_document.remote(document) # Aggregate all of the results.results = ray.get([actor.get_popular.remote() for actor in streaming_actors])popular_prefixes = set()for prefixes in results: popular_prefixes |= prefixes
使用Ray的玩具流处理示例的代码
Ray在这里表现很好,因为Ray的抽象适合手头的问题。此应用程序需要一种在分布式设置中封装和变异状态的方法,并且actor符合该法案。
在多处理版本如下所示。
from collections import defaultdictfrom multiprocessing import Poolimport numpy as npimport psutil num_cpus = psutil.cpu_count(logical=False) def accumulate_prefixes(args): running_prefix_count, running_popular_prefixes, document = args for word in document: for i in range(1, len(word)): prefix = word[:i] running_prefix_count[prefix] += 1 if running_prefix_count[prefix] > 3: running_popular_prefixes.add(prefix) return running_prefix_count, running_popular_prefixes pool = Pool(num_cpus) running_prefix_counts = [defaultdict(int) for _ in range(4)]running_popular_prefixes = [set() for _ in range(4)] for i in range(10): documents = [[np.random.bytes(20) for _ in range(10000)] for _ in range(num_cpus)] results = pool.map( accumulate_prefixes, zip(running_prefix_counts, running_popular_prefixes, documents)) running_prefix_counts = [result[0] for result in results] running_popular_prefixes = [result[1] for result in results] popular_prefixes = set()for prefixes in running_popular_prefixes: popular_prefixes |= prefixes
使用多处理的玩具流处理示例的代码
这里的挑战是pool.map执行无状态函数,这意味着pool.map您希望在另一个pool.map调用中使用的一个调用中生成的任何变量需要从第一个调用返回并传递给第二个调用。对于小对象,这种方法是可以接受的,但是当需要共享大的中间结果时,传递它们的成本是令人望而却步的(请注意,如果变量在线程之间共享,但是因为它们正在跨进程边界共享,必须使用像pickle这样的库将变量序列化为一个字节串。
因为它必须通过如此多的状态,所以多处理版本看起来非常笨拙,并且最终只能实现比串行Python更小的加速。实际上,您不会编写这样的代码,因为您根本不会使用Python多处理进行流处理。相反,您可能使用专用的流处理框架。此示例显示Ray非常适合构建此类框架或应用程序。
需要注意的是,有许多方法可以使用Python多处理。在这个例子中进行比较,Pool.map因为它提供了最接近的API比较。通过启动不同的进程并在它们之间设置多个多处理队列,应该可以在此示例中实现更好的性能,但这会导致复杂和脆弱的设计。
基准3:昂贵的初始化
与前面的示例相比,许多并行计算不一定要求在任务之间共享中间计算,但无论如何都要从中受益。当初始化状态昂贵时,甚至无状态计算也可以从共享状态中受益。
下面是一个示例,其中要从磁盘加载已保存的神经网络并使用它来并行分类一堆图像。
在具有48个物理内核的机器上,Ray 比Python多处理快25 倍,比单线程Python快13倍。在这个例子中,Python多处理并不优于单线程Python。描绘了误差条,但在某些情况下,误差条太小而无法看到。工作负载按比例缩放到核心数,因此在更多核心上完成了更多工作。在此基准测试中,“串行”Python代码实际上通过TensorFlow使用多个线程。Python多处理代码的可变性来自于从磁盘重复加载模型的可变性,而其他方法则不需要这样做
这个例子使用Ray 需要5s,使用Python多处理需要126s,使用串行Python需要64s(在48个物理内核上)。在这种情况下,串行Python版本使用许多内核(通过TensorFlow)来并行化计算,因此它实际上不是单线程的。
假设最初通过运行以下内容来创建模型。
import tensorflow as tf mnist = tf.keras.datasets.mnist.load_data()x_train, y_train = mnist[0]x_train = x_train / 255.0model = tf.keras.models.Sequential([ tf.keras.layers.Flatten(input_shape=(28, 28)), tf.keras.layers.Dense(512, activation=tf.nn.relu), tf.keras.layers.Dropout(0.2), tf.keras.layers.Dense(10, activation=tf.nn.softmax)])model.compile( optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])# Train the model.model.fit(x_train, y_train, epochs=1)# Save the model to disk.filename = '/tmp/model'model.save(filename)
用于将神经网络模型保存到磁盘的代码
现在希望加载模型并使用它来分类一堆图像。分批进行此操作,因为在应用程序中,图像可能并非全部同时可用,并且图像分类可能需要与数据加载并行完成。
该Ray的版本如下所示。
import psutilimport rayimport sysimport tensorflow as tf num_cpus = psutil.cpu_count(logical=False) ray.init(num_cpus=num_cpus) filename = '/tmp/model' @ray.remoteclass Model(object): def __init__(self, i): # Pin the actor to a specific core if we are on Linux to prevent # contention between the different actors since TensorFlow uses # multiple threads. if sys.platform == 'linux': psutil.Process().cpu_affinity([i]) # Load the model and some data. self.model = tf.keras.models.load_model(filename) mnist = tf.keras.datasets.mnist.load_data() self.x_test = mnist[1][0] / 255.0 def evaluate_next_batch(self): # Note that we reuse the same data over and over, but in a # real application, the data would be different each time. return self.model.predict(self.x_test) actors = [Model.remote(i) for i in range(num_cpus)] # Time the code below. # Parallelize the evaluation of some test data.for j in range(10): results = ray.get([actor.evaluate_next_batch.remote() for actor in actors])
使用Ray的玩具分类示例代码
加载模型的速度很慢,只想做一次。Ray版本通过在actor的构造函数中加载模型一次来摊销此成本。如果需要将模型放在GPU上,那么初始化将更加昂贵。
多处理版本较慢,因为它需要在每个映射调用中重新加载模型,因为映射函数被假定为无状态。
在多版本如下所示。请注意在某些情况下,可以使用initializer参数来实现此目的multiprocessing.Pool。但是,这仅限于每个进程初始化相同的设置,并且不允许不同的进程执行不同的设置功能(例如,加载不同的神经网络模型),并且不允许不同的任务针对不同的工人。
from multiprocessing import Poolimport psutilimport sysimport tensorflow as tf num_cpus = psutil.cpu_count(logical=False) filename = '/tmp/model' def evaluate_next_batch(i): # Pin the process to a specific core if we are on Linux to prevent # contention between the different processes since TensorFlow uses # multiple threads. if sys.platform == 'linux': psutil.Process().cpu_affinity([i]) model = tf.keras.models.load_model(filename) mnist = tf.keras.datasets.mnist.load_data() x_test = mnist[1][0] / 255.0 return model.predict(x_test) pool = Pool(num_cpus) for _ in range(10): pool.map(evaluate_next_batch, range(num_cpus))
使用多处理的玩具分类示例的代码
在所有这些示例中看到的是,Ray的性能不仅来自其性能优化,还来自于适合于手头任务的抽象。有状态计算对于许多应用程序很重要,并且将有状态计算强制转换为无状态抽象是有代价的。
运行基准测试
在运行这些基准测试之前,需要安装以下内容。
pip install numpy psutil ray scipy tensorflow
然后通过运行这些脚本可以重现上面的所有数字。
如果在安装时遇到问题psutil,请尝试使用Anaconda Python。
原始基准测试使用m5实例类型在EC2上运行(m5.large用于1个物理内核,m5.24xlarge用于48个物理内核)。
要使用正确的配置在AWS或GCP上启动实例,可以使用Ray自动调节器并运行以下命令。
ray up config.yaml
一个例子config.yaml是在这里提供(用于起动m5.4xlarge实例)。
https://gist.github.com/robertnishihara/2b81595abd4f50a049767a040ce435ab#file-config-yaml
更多关于Ray
虽然这篇博文主要关注Ray和Python多处理之间的基准测试,但是这种比较具有挑战性,因为这些库并不是很相似。差异包括以下内容:
更多相关链接如下:
https://github.com/ray-project/ray
https://ray.readthedocs.io/en/latest/
https://stackoverflow.com/questions/tagged/ray