进程(Process)是操作系统进行资源分配和调度的基本单位,每个进程拥有独立的内存空间、系统资源和执行状态。在Python中,进程管理主要通过标准库multiprocessing实现,该模块突破了GIL(全局解释器锁)的限制,尤其适合CPU密集型任务的并行计算。
Python通过multiprocessing.Process类创建子进程,核心参数包括:
target:指定子进程执行的函数args:以元组形式传递位置参数kwargs:以字典形式传递关键字参数name:自定义进程名称(便于调试)示例代码:直接调用方式
pythonimport multiprocessingimport time def worker(task_id, duration): print(f"进程 {multiprocessing.current_process().pid} 执行任务 {task_id}") time.sleep(duration) return f"任务{task_id}完成" if __name__ == '__main__': processes = [] for i in range(3): p = multiprocessing.Process( target=worker, args=(i, 2), # 位置参数必须为元组 name=f"Worker-{i}" ) processes.append(p) p.start() for p in processes: p.join() # 阻塞主进程直到所有子进程结束继承式调用方式
pythonclass CustomProcess(multiprocessing.Process): def __init__(self, task_id): super().__init__() self.task_id = task_id def run(self): print(f"自定义进程 {self.pid} 处理任务 {self.task_id}") if __name__ == '__main__': p1 = CustomProcess(101) p2 = CustomProcess(102) p1.start() p2.start() p1.join() p2.join()join()会阻塞直到子进程结束,常用于同步控制daemon=True可使子进程随主进程退出而自动终止(适用于后台任务)守护进程示例
pythondef background_task(): while True: print("后台服务运行中...") time.sleep(1) if __name__ == '__main__': p = multiprocessing.Process(target=background_task) p.daemon = True # 必须设置在start()之前 p.start() time.sleep(3) print("主进程退出,守护进程自动终止")方法 | 所属模块 | 返回值类型 | 缓存机制 | 跨平台性 |
|---|---|---|---|---|
os.getpid() | os | int | 无 | ✅ |
os.getppid() | os | int | 无 | ✅ |
Process.pid | multiprocessing | int | 实例化时确定 | ✅ |
Process.ppid() | multiprocessing | int | Windows缓存/POSIX不缓存 | ✅ |
psutil.Process() | psutil | object | 支持多种属性缓存 | ✅ |
推荐方案:
os模块(无需额外依赖)psutil(支持进程名、路径等20+属性)进程树遍历
pythonimport psutil def print_process_tree(pid): try: p = psutil.Process(pid) print(f"PID: {p.pid}, PPID: {p.ppid()}, Name: {p.name()}") for child in p.children(recursive=True): print(f" ├─ Child PID: {child.pid}") except psutil.NoSuchProcess: print(f"进程 {pid} 不存在") if __name__ == '__main__': print_process_tree(os.getpid()) # 打印当前进程树资源监控
pythondef monitor_process(pid, interval=1): p = psutil.Process(pid) while True: try: mem = p.memory_info().rss / 1024**2 # MB cpu = p.cpu_percent(interval=interval) print(f"PID: {pid}, CPU: {cpu:.1f}%, MEM: {mem:.2f}MB") except psutil.NoSuchProcess: breakconcurrent.futures提供统一的线程池/进程池接口,核心组件:
submit()和map()方法方法 | 线程池 | 进程池 | 适用场景 |
|---|---|---|---|
submit(fn, *args) | 异步提交单个任务 | 异步提交单个任务 | 需要获取Future对象时 |
map(fn, iterable) | 同步映射处理可迭代对象 | 同步映射处理可迭代对象 | 批量处理数据时 |
as_completed(fs) | 按完成顺序迭代Future对象 | 按完成顺序迭代Future对象 | 需要结果立即处理时 |
案例:并行计算素数
pythonimport mathfrom concurrent.futures import ProcessPoolExecutor def is_prime(n): if n < 2: return False for i in range(2, int(math.sqrt(n)) + 1): if n % i == 0: return False return True def find_primes(start, end): with ProcessPoolExecutor(max_workers=4) as executor: numbers = range(start, end + 1) # 方法1:使用submit获取Future对象 futures = [executor.submit(is_prime, num) for num in numbers] for future in futures: if future.result(): print(f"发现素数: {future.result()}") # 实际应获取输入参数 # 方法2:使用map直接获取结果(更简洁) results = executor.map(is_prime, numbers) primes = [num for num, res in zip(numbers, results) if res] print(f"{start}-{end}范围内的素数: {primes}") if __name__ == '__main__': find_primes(100, 200)pythondef risky_operation(x): if x == 3: raise ValueError("故意抛出异常") return x * 2 with ProcessPoolExecutor() as executor: try: future = executor.submit(risky_operation, 3) result = future.result(timeout=1) # 设置超时 except ValueError as e: print(f"捕获到异常: {e}") except concurrent.futures.TimeoutError: print("操作超时")机制 | 实现类 | 特点 | 适用场景 |
|---|---|---|---|
队列 | multiprocessing.Queue | 线程/进程安全,支持阻塞操作 | 生产者-消费者模式 |
管道 | multiprocessing.Pipe | 全双工通信,适合点对点 | 父子进程通信 |
共享内存 | multiprocessing.Value | 轻量级,需手动同步 | 高频数据交换 |
Manager对象 | multiprocessing.Manager | 支持复杂数据结构(dict/list) | 跨网络进程通信 |
队列示例
pythondef producer(queue): for i in range(5): queue.put(f"数据-{i}") print(f"生产数据-{i}") def consumer(queue): while True: item = queue.get() if item is None: # 终止信号 break print(f"消费 {item}") if __name__ == '__main__': q = multiprocessing.Queue() p = multiprocessing.Process(target=producer, args=(q,)) c = multiprocessing.Process(target=consumer, args=(q,)) p.start() c.start() p.join() q.put(None) # 发送终止信号 c.join()锁机制示例
pythonlock = multiprocessing.Lock()shared_counter = multiprocessing.Value('i', 0) def increment(): with lock: # 自动获取和释放锁 for _ in range(10000): shared_counter.value += 1 if __name__ == '__main__': processes = [multiprocessing.Process(target=increment) for _ in range(4)] for p in processes: p.start() for p in processes: p.join() print(f"最终计数: {shared_counter.value}") # 应为40000cpu_count()自动设置max_workersfunctools.lru_cacheProcessPoolExecutor的initializer参数初始化资源动态池大小示例
pythonimport osfrom concurrent.futures import ProcessPoolExecutor def process_data(data): return sum(data) if __name__ == '__main__': data_chunks = [[i, i+1, i+2] for i in range(0, 100, 3)] with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor: results = list(executor.map(process_data, data_chunks)) print(f"总结果: {sum(results)}")spawn方式创建进程,必须将进程代码放在if __name__ == '__main__':块中close()/join()场景 | 推荐方案 | 性能考量 |
|---|---|---|
CPU密集型计算 | ProcessPoolExecutor | 充分利用多核,但启动成本高 |
I/O密集型操作 | ThreadPoolExecutor | 轻量级,但受GIL限制 |
混合型任务 | 组合使用线程池+进程池 | 根据任务类型动态分配 |
复杂进程管理 | multiprocessing模块 | 灵活但需要手动处理同步 |
进程监控需求 | psutil库 | 提供丰富的进程信息 |
Python多进程编程的核心在于合理选择进程模型、有效管理进程生命周期,并通过适当的同步机制确保数据一致性。对于现代Python开发,concurrent.futures模块提供了最简洁的并发编程接口,而multiprocessing模块则在需要精细控制时更具优势。在实际项目中,建议结合psutil进行进程监控,使用logging模块记录进程活动,构建健壮的并行计算系统。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。