首页
学习
活动
专区
圈层
工具
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Python并发编程 | 多线程多进程 | 性能优化指南

写了这么多年Python,并发编程始终是个绕不开的话题。今天跟大家分享一下我在工程实践中总结的一些经验和技巧。

GIL的存在让Python多线程饱受争议,但这不代表Python就不适合并发编程。关键是要根据任务特点选择合适的并发方案。

来看个最基础的多线程示例:

import threading

import time

def worker():

print(f'线程 {threading.current_thread().name} 开始执行')

time.sleep(2) # 模拟耗时操作

print(f'线程 {threading.current_thread().name} 执行完成')

threads = []

for i in range(3):

t = threading.Thread(target=worker)

threads.append(t)

t.start()

for t in threads:

t.join()

这段代码展示了创建和管理线程的基本方法。但在实际项目中,我们经常需要线程之间共享数据,这就需要用到线程同步机制。

线程安全是个大坑,看这个例子:

from threading import Thread, Lock

counter = 0

lock = Lock()

def increment():

global counter

for _ in range(100000):

with lock: # 使用锁保护共享资源

counter += 1

threads = [Thread(target=increment) for _ in range(4)]

for t in threads: t.start()

for t in threads: t.join()

print(f'最终结果: {counter}')

对于CPU密集型任务,多进程才是更好的选择。ProcessPoolExecutor让多进程编程变得简单:

from concurrent.futures import ProcessPoolExecutor

import math

def compute_intensive_task(n):

return sum(i * i for i in range(n))

def main():

numbers = [10**5, 10**6, 10**5, 10**6]

with ProcessPoolExecutor(max_workers=4) as executor:

results = list(executor.map(compute_intensive_task, numbers))

print(f'计算结果: {results}')

if __name__ == '__main__':

main()

异步编程是另一个强大的工具。asyncio提供了优雅的协程支持:

import asyncio

import aiohttp

import time

async def fetch_url(session, url):

async with session.get(url) as response:

return await response.text()

async def main():

urls = [

'http://example.com',

'http://example.org',

'http://example.net'

]

async with aiohttp.ClientSession() as session:

tasks = [fetch_url(session, url) for url in urls]

results = await asyncio.gather(*tasks)

return results

if __name__ == '__main__':

start = time.time()

results = asyncio.run(main())

print(f'耗时: {time.time() - start:.2f}秒')

说到性能优化,还得提一下线程池和进程池的合理使用。这是我常用的模式:

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import time

def io_bound_task(x):

time.sleep(1) # 模拟IO操作

return x * x

def cpu_bound_task(x):

return sum(i * i for i in range(x))

def mixed_workload():

# IO密集任务用线程池

with ThreadPoolExecutor(max_workers=4) as executor:

io_results = list(executor.map(io_bound_task, range(10)))

# CPU密集任务用进程池

with ProcessPoolExecutor(max_workers=4) as executor:

cpu_results = list(executor.map(cpu_bound_task, range(10)))

return io_results, cpu_results

实践中发现几个优化要点:

合理设置池大小,一般CPU密集型任务设为CPU核心数,IO密集型可以设大一点

任务粒度要适中,太细会有过多调度开销,太粗会影响负载均衡

注意资源释放,推荐使用上下文管理器

为了提升性能,有时候需要混合使用多种并发方案:

import asyncio

from concurrent.futures import ProcessPoolExecutor

from functools import partial

async def cpu_bound(executor, x):

loop = asyncio.get_running_loop()

return await loop.run_in_executor(executor, cpu_bound_task, x)

async def main():

with ProcessPoolExecutor() as executor:

loop = asyncio.get_running_loop()

tasks = [

cpu_bound(executor, i) for i in range(10)

]

results = await asyncio.gather(*tasks)

return results

这种方案把asyncio的事件循环和进程池结合起来,既能处理好IO密集型任务,又能充分利用多核CPU。

最后分享个实用技巧:用queue做任务队列,实现生产者消费者模式:

from queue import Queue

from threading import Thread

import time

def producer(queue):

for i in range(10):

queue.put(i)

time.sleep(0.1)

def consumer(queue):

while True:

item = queue.get()

if item is None:

break

print(f'处理任务: {item}')

queue.task_done()

def main():

q = Queue()

producer_thread = Thread(target=producer, args=(q,))

consumer_threads = [Thread(target=consumer, args=(q,)) for _ in range(3)]

producer_thread.start()

for t in consumer_threads:

t.start()

producer_thread.join()

for _ in consumer_threads:

q.put(None) # 发送终止信号

for t in consumer_threads:

t.join()

if __name__ == '__main__':

main()

Python并发编程博大精深,熟练掌握这些工具和模式,能帮你写出高效稳定的程序。重要的是理解每种方案的适用场景,灵活运用。

记住,过早优化是万恶之源。先保证代码正确性和可维护性,真有性能瓶颈再针对性优化。毕竟,最快的代码是根本不用执行的代码。

  • 发表于:
  • 原文链接https://page.om.qq.com/page/Oc0gza2NBvM9C4CM--ufDv2Q0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券