前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【代码】Python多线程执行并且按原本顺序返回[详细注释]

【代码】Python多线程执行并且按原本顺序返回[详细注释]

作者头像
小锋学长生活大爆炸
发布2023-03-01 14:08:09
1.4K0
发布2023-03-01 14:08:09
举报

简单记录一下,免得下次找不到,还得重写。

先看一下效果,然后直接上代码,并且每行都配注释。

代码语言:javascript
复制
# 导入线程池的包
from concurrent.futures import ThreadPoolExecutor, as_completed


# 整个的任务函数,方便调用。也可以拆开写
def extract_append_audio_features(extract_type='age', max_workers=16):
    # feature_csv任务队列,可以理解为数组
    datas_num = len(feature_csv)
    # 不知道为什么tqdm没显示,所以这里搞个锁
    compute_cnt_lock = threading.Lock()
    # 与compute_cnt_lock锁配合,用来计当前已执行的总数
    compute_cnt = 0
    
    # 这是真正要被线程池调用的函数,id用来表示不同的线程
    def func(id, start_index, end_index):
        # 说明变量是从外部获得的
        nonlocal compute_cnt
        # 用来存处理的结果
        feature_data = []
        # 根据参数确定当前线程中执行数组中的哪部分
        for path in feature_csv[start_index:end_index]:
            t1 = time.time()
            # 执行一些操作
            data = do_feature(y)
            # 将操作结果存下来
            feature_data.append(data)
            # 获取线程锁,并将计数+1
            with compute_cnt_lock:
                compute_cnt += 1
            print(f'\r完成进度[{id}=>{round(time.time()-t1, 3)}s]: {compute_cnt}/{datas_num}', end='')
        # 全部完成后,返回本次处理的内容
        return id, feature_data
    
    # 这个函数用来根据指定的线程数,自动分配线程资源
    def allocate_threads(thread_num=5):
        # 计算每个线程会包含数组中的多少个元素
        gap = (datas_num // thread_num)+1
        # 创建线程池对象
        pool = ThreadPoolExecutor(max_workers=max_workers)
        threads_pool = []
        futures = []
        results = {}
        # 开始根据指定的线程数,为不同线程分配资源
        for id in range(thread_num):
            # 第id个线程在数组中元素的起始位置和结束位置
            start_index = id * gap
            end_index = (id+1) * gap
            # 如果数组越界,就让他等于数组最大值
            start_index = start_index if start_index < datas_num else datas_num
            end_index = end_index if end_index < datas_num else datas_num
            # print(f'线程{id}分配内容: {start_index} - {end_index}')
            # 分配完成,提交任务,并启动线程池
            future = pool.submit(func, id, start_index, end_index)
            futures.append(future)
        print('>> 线程全部启动')
    
        # 等待线程池完成,并获取返回值
        for future in as_completed(futures):
            id, feature_data = future.result()
            results[id] = feature_data
        print('\n>> 线程全部完成')
        
        # 根据分配的id顺序,重组结果
        new_feature_data = []
        for i in range(len(results)):
            new_feature_data.extend(results[i])

        # 返回最终的排完序的结果
        return new_feature_data
        

    # 调用资源分配函数,并获取结果
    new_feature_data = allocate_threads(max_workers)
    # 根据结果,可以直接return,也可以执行进一步的操作。
    # xxxx 
    # xxxx 


# 调用演示
extract_append_audio_features(extract_type='age', max_workers=16)
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-02-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档