前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python的并发方案讨论

Python的并发方案讨论

原创
作者头像
mariolu
修改2024-01-31 09:21:29
19801
代码可运行
修改2024-01-31 09:21:29
举报
运行总次数:1
代码可运行

Python提供了三种并发方案:multiprocessingthreadingasyncio。从名字来看就是多进程,多线程和异步io。但你知道他们都适合什么场景使用,各有什么优缺点吗?

一 多进程multiprocessiog

multiprocessing是一个使用类似于该threading模块的 API 支持生成进程的包。该multiprocessing包提供本地和远程并发,通过使用子进程而不是线程有效地回避全局解释器锁。因此,它能充分利用给定机器上的多个处理器。

1.1 用法:

代码语言:python
代码运行次数:0
复制
from multiprocessing import Process
import os

def info(title):
    print(title)  
    print('module name:', __name__)
    print('parent process:', os.getppid())  # 打印父进程id  
    print('process id:', os.getpid())  # 打印当前进程id 

def f(name):
    info('function f')  # 打印当前进程信息
    print('hello', name)

if __name__ == '__main__':
    info('main line')  # 打印主进程信息 
    p = Process(target=f, args=('bob',))  # 生成子进程对象
    p.start()  # 执行子进程
    p.join()  # 等待子进程结束

这里列举了一个multiprocessing的另一种写法。一种通过map的处理多个输入写法。

这里的Pool设置并法度为5,之后并行执行f(1), f(2), f(3)

代码语言:javascript
复制
from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

因为多进程能充分利用多CPU核,这里我介绍几个实战例子。

有两个文件logcolor.py和tailf.py

logcolor.py这里过滤log的error等级,对WARNING、ERROR和CRITICAL的日志分别以对应颜色打印到屏幕上。

代码语言:python
代码运行次数:0
复制
#!/usr/bin/env python

ESC = chr(0x1B)

RESET = ESC + '[0m'

BLACK     = ESC + '[30m'
RED       = ESC + '[31m'
GREEN     = ESC + '[32m'
YELLOW    = ESC + '[33m'
BLUE      = ESC + '[34m'
MAGENTA   = ESC + '[35m'
CYAN      = ESC + '[36m'
WHITE     = ESC + '[37m'
BGBLACK     = ESC + '[40m'
BGRED       = ESC + '[41m'
BGGREEN     = ESC + '[42m'
BGYELLOW    = ESC + '[43m'
BGBLUE      = ESC + '[44m'
BGMAGENTA   = ESC + '[45m'
BGCYAN      = ESC + '[46m'
BGWHITE     = ESC + '[47m'

def process(line):
    if line.startswith('WARNING'):  # 如果日志记录以WARNING开头
        return YELLOW + line + RESET  # 用黄色打印 

    if line.startswith('ERROR'): # 如果日志记录以ERROR开头
        return BGRED + BLACK + line + RESET  # 用红色背景黑字打印 

    if line.startswith('CRITICAL'): # 如果日志记录以CRITICLE开头
        return BGCYAN + BLACK + line + RESET # 用绿色背景黑字打印 

    return line

def logcolor():
    try:
        while 1:
            line = raw_input()  # 读取输入fd
            print process(line)  # 执行颜色打印输出
    except KeyboardInterrupt:
        return
    except EOFError:
        return

if __name__ == '__main__':
    logcolor()

tailf.py则对a.log的日志输出做监控。具体来说,启动了p1进程去读取a.log的内容。同时启动了p2进程对p1读到的内容进行颜色打印。

代码语言:python
代码运行次数:0
复制
def tail():
    cmd = ['tail', '-100f', 'a.log']

    pipe_stdout = subprocess.PIPE if os.isatty(1) else None
    p1 = subprocess.Popen(  # p1 执行tail 读取a.log的输出内容
        cmd, stdout=pipe_stdout)  # 将p1的输出stdout重定向到pipe_stdout,即p2的输入
    if pipe_stdout:
        p2 = subprocess.Popen([  # p2执行颜色处理a.log的内容
            'python3', './logcolor.py'],
            stdin=p1.stdout)   # 将p2的输入stdin重定向到p1的输出
        p1.stdout.close()
        p2.wait()
    p1.wait()

if __name__ == '__main__':
    tail()

效果如下:

二、多threading

多threading的用法比较简单。threading.Thread生成实例,然后start(),最后join

代码语言:python
代码运行次数:1
复制
job_handler = JobHandler()  # JobHandler定义了__call__方法
job_worker_thread = threading.Thread(None, job_handler, "JobWorker-{}".format(i))
job_worker_thread.daemon = True
job_worker_thread.start()
job_worker_thread.join()

看一个实际的例子,用来并发处理Job。

代码语言:python
代码运行次数:0
复制
from queue import Queue
import threading
    
class Job(object):
    def __init__(self):
        self.next_job = None

    def set_next_job(self, next_job):
        self.next_job = next_job

    def handle(self, job_handler):
        r, out = self.do()
        if out:
            out = (self.__class__.__name__, out)
        else:
            out = tuple()
        if r is not None:
            r = '>'.join([self.__class__.__name__, r])
        else:
            if self.next_job is not None:
                self.next_job.trigger(job_handler)

        return r, out

    def trigger(self, job_handler):
        job_handler.add_next_job(self)

    def do(self):  # 执行Job
        raise NotImplementedError


class JobWrapper(Job):
    def __init__(self, prefix, current_job, next_job):
        super(JobWrapper, self).__init__()
        self.prefix = prefix
        self.current_job = current_job
        self.set_next_job(next_job)

    def handle(self, job_handler):
        self.current_job.set_next_job(self.next_job)
        job_res = self.current_job.handle(job_handler)
        if job_res is not None:
            r, out = job_res

            if out:
                out = (">".join([self.prefix, out[0]]), out[1])
            else:
                out = tuple()

            if r is not None:
                r = '>'.join([self.prefix, r])

            return r, out
        else:
            return None


class ChainedJobs(Job):
    def __init__(self, *jobs):
        super(ChainedJobs, self).__init__()
        self.jobs = jobs

    def handle(self, job_handler):
        next_job = self.next_job
        for current_job in self.jobs[::-1]:
            next_job = JobWrapper(self.__class__.__name__, current_job, next_job)
        job_handler.add_next_job(next_job)  #串联一系列job 


class ParallelJobs(Job):
    def __init__(self, *jobs):
        super(ParallelJobs, self).__init__()
        self.jobs = jobs

    def add_job(self, job):
        self.jobs.append(job)

    class _NotifyJob(Job):  # _NotifyJob继承了Job,改写了trigger方法
        def __init__(self, remain, next_job):
            super(ParallelJobs._NotifyJob, self).__init__()
            self.set_next_job(next_job)
            self.remain = remain
            self.lock = threading.Lock()

        def trigger(self, job_handler):
            continue_next_job = False
            with self.lock:
                self.remain -= 1  # 使用remain计数,每次trigger都进行减1,也就是执行权提升1,直到trigger结束
                if self.remain == 0:
                    continue_next_job = True

            if continue_next_job and self.next_job:
                self.next_job.trigger(job_handler)

    def handle(self, job_handler):
        notify_job = ParallelJobs._NotifyJob(len(self.jobs), self.next_job)
        for job in self.jobs:
            job.set_next_job(notify_job)
            job.trigger(job_handler)
        return None


class JobHandler(object):
    def __init__(self, job_mgr, handler_id):
        self.q = job_mgr.job_queue
        self.outs, self.errors = [], []

    def add_next_job(self, next_job):
        self.q.put_nowait(next_job)

    def __call__(self, *args, **kwargs):
        while True:
            j = self.q.get()
            try:
                jres = j.handle(self)
                if jres is not None:
                    r, out = jres
                    if r is not None:
                        self.errors.append(r)
                    if out:
                        self.outs.append(out)
            except Exception as e:
                imolog.error("Unknown exception thrown in handler:" + str(e))
                traceback.print_exc()
            finally:
                self.q.task_done()

    def get_errors(self):
        return self.errors

    def get_outs(self):
        return self.outs


class JobManager(object):
    def __init__(self):
        self.job_queue = Queue()
        self.job_handlers = []

    def start(self):  #启动了33个线程
        for i in range(1, 33):
            job_handler = JobHandler(self, i)
            job_worker_thread = threading.Thread(None, job_handler, "JobWorker-{}".format(i))
            job_worker_thread.daemon = True
            job_worker_thread.start()
            self.job_handlers.append(job_handler) # all are non-deamon threads

    def wait(self):
        self.job_queue.join()

    def collect_errors(self):
        es = []
        for job_handler in self.job_handlers:
            es.extend(job_handler.get_errors())
        return es

    def collect_outs(self):
        outs = []
        for job_handler in self.job_handlers:
            outs.extend(job_handler.get_outs())
        return outs

    def add_job(self, job):
        self.job_queue.put_nowait(job)
        
if __name__ == '__main__':
    job_manager = JobManager()
    job_manager.start()

三、asyino方案

Python使用await 来等待coroutine的执行完成。协程使用async描述。

代码语言:python
代码运行次数:0
复制
import asyncio
from concurrent.futures import Executor
from functools import partial
from typing import Any, Callable, Optional, TypeVar

T = TypeVar("T")

async def run_in_executor(  # 协程用async修饰
    executor: Optional[Executor],
    func: Callable[..., T],
    /,
    *args: Any,
    **kwargs: Any,
) -> T:
    """
    Run `func(*args, **kwargs)` asynchronously, using an executor.

    If the executor is None, use the default ThreadPoolExecutor.
    """
    return await asyncio.get_running_loop().run_in_executor(
        executor,
        partial(func, *args, **kwargs),
    )

# Example usage for running `print` in a thread.
async def main():
    await run_in_executor(None, print, "O" * 100_000)

asyncio.run(main())

四、三种方案比较

  • Procession强调的是cpu运算,比如说你可以把shell命令放在subprocess.Popen里面执行。比如说你有命令要派发到很多机器上执行。如果你有8核cpu,你可以把cpu计算任务派发到8份子任务,最后汇总。这样任务得到了8倍加速。多进程会更擅长处理cpu-bound受限的任务。
  • 在多threading,不需要多cpu。比如说你这个应用程序用来发送很多HTTP请求,然后等待网络响应到达后继续处理。在cpu等待的时间片内就可以用来处理别的线程任务。多threading擅长I/O受限的任务。
  • asyncio是处理并发任务的高效方式。它指的是一种任务的并行处理方式 。asynio是你来通过代码决定哪里什么时候进行上下文切换,而不是像多threading是由cpu决定何时切换协程。
  • CPython(主要的Python实现方案) 仍然有全局解释锁。所以多线程应用不是优先选择。这也是multiprocessing threading更推荐的原因。但是有些问题不需要分解成那么多份,特别是需要跨进程通信的场景。这也是multiprocessing 没有比 threading 更推荐的原因.

这是一段伪代码来决定你的场景需要用什么样的并发方案。

代码语言:javascript
复制
if io_bound:
    if io_very_slow:
        print("Use Asyncio")
    else:
        print("Use Threads")
else:
    print("Multi Processing")

所以你应该这样考虑:

  • CPU Bound => Multi Processing
  • I/O Bound, Fast I/O, Limited Number of Connections => Multi Threading
  • I/O Bound, Slow I/O, Many connections => Asyncio

I/O bound指的是应用需要花很多时间和慢的IO设备交谈, 比如说网络连接, 硬盘, 打印设备, 或者需要一定睡眠时间的event loop. 因此在blocking模式, 可以选择threading或者asyncio。如果IO很慢, 协程式多任务 (asyncio) 是更优的选择 (i.e. 避免资源过度争用, 死锁和竞争条件)

五、性能比较:

以下列举了3种并发方案执行test函数的运行时间对比:

concurrency

execute time

none

60.39s

multiprocessiing

53.61s

threading

48.23s

asyncio

47.92s

代码语言:python
代码运行次数:0
复制
# "process_test.py"

from multiprocessing import Process
from threading import Thread
import time
import asyncio
start_time = time.time()

def test():
    num = 100000
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    print(primes)
    
async def async_test():
    num = 100000
    primes = 0
    for i in range(2, num + 1):
        for j in range(2, i):
            if i % j == 0:
                break
        else:
            primes += 1
    print(primes)
    
async def call_tests():
    tasks = []

    for _ in range(0, 2): # 2 asyncio tasks
        tasks.append(async_test())

    await asyncio.gather(*tasks)

if __name__ == "__main__": # This is needed to run processes on Windows
    process_list = []

    test()
    test()

    print("non-concurrency:", round((time.time() - start_time), 2), "seconds") # non-concurrency
    
    start_time = time.time()


    process_list = []

    for _ in range(0, 2): # 2 processes
        process = Process(target=test)
        process_list.append(process)

    for process in process_list:
        process.start()

    for process in process_list:
        process.join()
        
    print("multi-processing:", round((time.time() - start_time), 2), "seconds") # multi-processing

    start_time = time.time()

    thread_list = []

    for _ in range(0, 2): # 2 threads
        thread = Thread(target=test)
        thread_list.append(thread)
    
    for thread in thread_list:
        thread.start()

    for thread in thread_list:
        thread.join()

    print("threading:", round((time.time() - start_time), 2), "seconds") # threading
    
    start_time = time.time()

    asyncio.run(call_tests())

    print("asyncio:", round((time.time() - start_time), 2), "seconds") # asyncio
    

输出:

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一 多进程multiprocessiog
    • 1.1 用法:
    • 二、多threading
    • 三、asyino方案
    • 四、三种方案比较
    • 五、性能比较:
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档