
文章目录
在多进程编程中,数据传递和内存管理是影响性能的关键因素。本文通过对比两种多进程实现方式(共享内存和普通多进程),分析它们在内存使用和性能上的差异,并提供实验数据支持。
在多进程编程中,数据传递通常有两种方式:
我们通过一个简单的实验来对比这两种方式的性能和内存使用情况。
np.ones(10000000, dtype=np.int64))。multiprocessing.shared_memory 创建共享内存。Lock)确保操作的互斥性。Queue)收集子进程的处理结果。以下是实验的完整代码:
import multiprocessing
import numpy as np
import time
import psutil
import os
from multiprocessing import shared_memory, Lock, Queue
def get_process_memory():
"""获取当前进程的内存使用情况"""
process = psutil.Process(os.getpid())
memory_info = process.memory_info()
return memory_info.rss / 1024 / 1024 # 转换为MB
def worker(shm_name, shape, dtype, lock, add_value):
"""
子进程通过共享内存名称连接共享内存,并在获取互斥锁后对数据进行安全修改。
"""
# 打印子进程内存使用
print(f"进程 {multiprocessing.current_process().name} 内存使用: {get_process_memory():.2f} MB")
# 连接已有的共享内存块
existing_shm = shared_memory.SharedMemory(name=shm_name)
# 利用共享内存创建一个 NumPy 数组视图
arr = np.ndarray(shape, dtype=dtype, buffer=existing_shm.buf)
# 通过锁确保操作的互斥性,避免多个进程同时修改数据
with lock:
# 此处为示例:每个进程对数组所有元素加上一个常数 add_value
print(f"进程 {multiprocessing.current_process().name} 正在修改数据")
arr += add_value
# 打印子进程修改后内存使用
print(f"进程 {multiprocessing.current_process().name} 修改后内存使用: {get_process_memory():.2f} MB")
existing_shm.close()
def normal_worker(data, output_queue, add_value):
"""
普通多进程方式的工作函数,直接接收数据而不是通过输入队列
"""
# 打印子进程内存使用
print(f"进程 {multiprocessing.current_process().name} 内存使用: {get_process_memory():.2f} MB")
try:
# 直接处理传入的数据
print(f"进程 {multiprocessing.current_process().name} 正在修改数据")
result = data + add_value
print(f"进程 {multiprocessing.current_process().name} 修改完成")
output_queue.put(result) # 将结果放入输出队列
print(f"进程 {multiprocessing.current_process().name} 结果已放入队列")
except Exception as e:
print(f"进程 {multiprocessing.current_process().name} 出错: {e}")
# 打印子进程修改后内存使用
print(f"进程 {multiprocessing.current_process().name} 修改后内存使用: {get_process_memory():.2f} MB")
print(f"进程 {multiprocessing.current_process().name} 正在退出")
def run_shared_memory_version(data, num_processes=5):
"""
运行共享内存版本的测试
"""
print(f"主进程内存使用 (共享内存版本开始): {get_process_memory():.2f} MB")
start_time = time.time()
# 创建共享内存块
shm = shared_memory.SharedMemory(create=True, size=data.nbytes)
shared_array = np.ndarray(data.shape, dtype=data.dtype, buffer=shm.buf)
shared_array[:] = data[:] # 初始化共享内存中的数据
lock = Lock()
processes = []
for i in range(num_processes):
p = multiprocessing.Process(
target=worker,
args=(shm.name, data.shape, data.dtype, lock, i+1)
)
processes.append(p)
p.start()
for p in processes:
p.join()
result = np.copy(shared_array)
# 清理共享内存
shm.close()
shm.unlink()
end_time = time.time()
print(f"主进程内存使用 (共享内存版本结束): {get_process_memory():.2f} MB")
return result, end_time - start_time
def run_normal_version(data, num_processes=5):
"""
运行普通多进程版本的测试
"""
print(f"主进程内存使用 (普通版本开始): {get_process_memory():.2f} MB")
start_time = time.time()
output_queue = Queue()
processes = []
# 启动所有进程
for i in range(num_processes):
p = multiprocessing.Process(
target=normal_worker,
args=(data, output_queue, i+1)
)
processes.append(p)
p.start()
print(f"启动进程 {i+1}")
# 立即开始收集结果,不等待进程完成
results = []
for i in range(num_processes):
print(f"正在等待第 {i+1} 个结果...")
result = output_queue.get() # 这会阻塞直到有数据可用
print(f"收到第 {i+1} 个结果")
results.append(result)
# 收集完结果后,等待所有进程完成
for p in processes:
p.join()
print("所有进程都已完成")
final_result = sum(results) - data * (num_processes-1) # 计算最终结果
end_time = time.time()
print(f"主进程内存使用 (普通版本结束): {get_process_memory():.2f} MB")
return final_result, end_time - start_time
if __name__ == '__main__':
# 首先安装必要的包
# 可以在终端运行: pip install psutil
print(f"初始主进程内存使用: {get_process_memory():.2f} MB")
# 创建测试数据
data = np.ones(10000000, dtype=np.int64) # 使用更大的数组来测试性能差异
print("初始数据大小:", data.nbytes / 1024 / 1024, "MB")
# 运行共享内存版本
print("\n运行共享内存版本...")
shared_result, shared_time = run_shared_memory_version(data)
print("共享内存版本耗时:", shared_time, "秒")
print("共享内存版本结果:", shared_result[:10], "...") # 只显示前10个元素
# 运行普通多进程版本
print("\n运行普通多进程版本...")
normal_result, normal_time = run_normal_version(data)
print("普通多进程版本耗时:", normal_time, "秒")
print("普通多进程版本结果:", normal_result[:10], "...") # 只显示前10个元素
assert np.array_equal(shared_result, normal_result), "结果不一致!"
print("结果一致性检查通过!")
print("\n性能对比:")
print(f"共享内存版本比普通多进程版本快 {normal_time/shared_time:.2f} 倍")
print(f"内存使用总结: 共享内存版本可以节省内存,因为所有进程共享同一块内存")
版本 | 主进程内存 (结束) (MB) |
|---|---|
共享内存版本 | 182.00 |
普通多进程版本 | 565.17 |
版本 | 耗时 (秒) | 性能提升倍数 |
|---|---|---|
共享内存版本 | 0.29 | - |
普通多进程版本 | 3.30 | 慢 11.4 倍 |
通过对比共享内存和普通多进程的实现方式,我们发现共享内存版本在内存使用和性能上具有显著优势。 对于需要处理大规模数据或多进程协作的场景,推荐使用共享内存来优化性能和内存使用。