写了这么多年Python,并发编程始终是个绕不开的话题。今天跟大家分享一下我在工程实践中总结的一些经验和技巧。
GIL的存在让Python多线程饱受争议,但这不代表Python就不适合并发编程。关键是要根据任务特点选择合适的并发方案。
来看个最基础的多线程示例:
import threading
import time
def worker():
print(f'线程 {threading.current_thread().name} 开始执行')
time.sleep(2) # 模拟耗时操作
print(f'线程 {threading.current_thread().name} 执行完成')
threads = []
for i in range(3):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
for t in threads:
t.join()
这段代码展示了创建和管理线程的基本方法。但在实际项目中,我们经常需要线程之间共享数据,这就需要用到线程同步机制。
线程安全是个大坑,看这个例子:
from threading import Thread, Lock
counter = 0
lock = Lock()
def increment():
global counter
for _ in range(100000):
with lock: # 使用锁保护共享资源
counter += 1
threads = [Thread(target=increment) for _ in range(4)]
for t in threads: t.start()
for t in threads: t.join()
print(f'最终结果: {counter}')
对于CPU密集型任务,多进程才是更好的选择。ProcessPoolExecutor让多进程编程变得简单:
from concurrent.futures import ProcessPoolExecutor
import math
def compute_intensive_task(n):
return sum(i * i for i in range(n))
def main():
numbers = [10**5, 10**6, 10**5, 10**6]
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(compute_intensive_task, numbers))
print(f'计算结果: {results}')
if __name__ == '__main__':
main()
异步编程是另一个强大的工具。asyncio提供了优雅的协程支持:
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'http://example.com',
'http://example.org',
'http://example.net'
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
if __name__ == '__main__':
start = time.time()
results = asyncio.run(main())
print(f'耗时: {time.time() - start:.2f}秒')
说到性能优化,还得提一下线程池和进程池的合理使用。这是我常用的模式:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
def io_bound_task(x):
time.sleep(1) # 模拟IO操作
return x * x
def cpu_bound_task(x):
return sum(i * i for i in range(x))
def mixed_workload():
# IO密集任务用线程池
with ThreadPoolExecutor(max_workers=4) as executor:
io_results = list(executor.map(io_bound_task, range(10)))
# CPU密集任务用进程池
with ProcessPoolExecutor(max_workers=4) as executor:
cpu_results = list(executor.map(cpu_bound_task, range(10)))
return io_results, cpu_results
实践中发现几个优化要点:
合理设置池大小,一般CPU密集型任务设为CPU核心数,IO密集型可以设大一点
任务粒度要适中,太细会有过多调度开销,太粗会影响负载均衡
注意资源释放,推荐使用上下文管理器
为了提升性能,有时候需要混合使用多种并发方案:
import asyncio
from concurrent.futures import ProcessPoolExecutor
from functools import partial
async def cpu_bound(executor, x):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(executor, cpu_bound_task, x)
async def main():
with ProcessPoolExecutor() as executor:
loop = asyncio.get_running_loop()
tasks = [
cpu_bound(executor, i) for i in range(10)
]
results = await asyncio.gather(*tasks)
return results
这种方案把asyncio的事件循环和进程池结合起来,既能处理好IO密集型任务,又能充分利用多核CPU。
最后分享个实用技巧:用queue做任务队列,实现生产者消费者模式:
from queue import Queue
from threading import Thread
import time
def producer(queue):
for i in range(10):
queue.put(i)
time.sleep(0.1)
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(f'处理任务: {item}')
queue.task_done()
def main():
q = Queue()
producer_thread = Thread(target=producer, args=(q,))
consumer_threads = [Thread(target=consumer, args=(q,)) for _ in range(3)]
producer_thread.start()
for t in consumer_threads:
t.start()
producer_thread.join()
for _ in consumer_threads:
q.put(None) # 发送终止信号
for t in consumer_threads:
t.join()
if __name__ == '__main__':
main()
Python并发编程博大精深,熟练掌握这些工具和模式,能帮你写出高效稳定的程序。重要的是理解每种方案的适用场景,灵活运用。
记住,过早优化是万恶之源。先保证代码正确性和可维护性,真有性能瓶颈再针对性优化。毕竟,最快的代码是根本不用执行的代码。
领取专属 10元无门槛券
私享最新 技术干货