Python:怎么多处理输出问题?

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (1)
  • 关注 (0)
  • 查看 (103)

我正在使用多进程运行以下代码。这一切都很好,除了产出似乎比它应该是少。下面是一个完整的例子。

import pandas as pd
import multiprocessing
from multiprocessing import Pool, cpu_count
from functools import partial
import timeit
import numpy as np

prng = 1234
cpu_cores = cpu_count()-1


temp_df1 = pd.DataFrame({'trip_id':[22186702,22186703,22186704,26777219,26777220,26777221,26777222,26777223],
            'tour_id':[13525325,13525325,13525325,13525328,13525328,13525328,13525328,13525328],
            'start_time':[8,0,0,10.92,0,0,0,0],
            'ttime_mins':[3.810553,4.649286,2.917499,5.415158,3.800613,1.829472,1.829472,8.643289],
            'arrival_time':[8.063509,0,0,11.010253,0,0,0,0],
            'weight_column':['HBO_outbound','HBO_outbound','HBO_inbound','HBO_outbound','HBO_outbound','NHB_outbound','NHB_inbound','HBM_inbound']})

第二,时间采样数据的名称和功能在多处理中运行。

time_dist = pd.DataFrame({'Time':[8,9,10,11,12,13,14],
            'HBO_outbound':[1573,419,339,544,600,453,100],
            'HBO_inbound':[1573,419,339,544,100,953,800],
            'HBM_outbound':[1573,419,339,544,640,463,90],
            'HBM_inbound':[1573,419,339,544,320,453,100],
            'WBO_outbound':[1573,419,339,544,600,453,100],
            'WBO_inbound':[1573,419,339,544,450,803,190],
            'NHB_outbound':[1573,419,339,544,901,543,290],
            'NHB_inbound':[1573,419,339,544,863,453,330]})

results_frow = []
result_list_final = []

def func(df, time_dist_df):
    """

    """
    for i in range(0, df.shape[0]):
        if i == 0:
            start_time = df['start_time'].iloc[i]
            arrival_time = df['arrival_time'].iloc[i]
            tour_id = df['tour_id'].iloc[i]
            results_frow.append(start_time)
            results_frow.append(arrival_time)
            results_frow.append(tour_id)

        else:
            tour_id = df['tour_id'].iloc[i]
            arrival_time_prev = results_frow[-2]
            time_dist1 = time_dist.loc[time_dist['Time'] >= arrival_time_prev]
            weight_column = df['weight_column'].iloc[i]

            # sample a time and calculate a new arrival time as a result
            if len(time_dist1) > 0:
                start_time = time_dist1.sample(n=1, weights=time_dist1[weight_column], replace=True, random_state=prng)
                start_time = start_time[['Time']].values  ###
                start_time = start_time[0][0]    
            else:
                start_time = results_frow[-2]

            newarrival_time = start_time + df['ttime_mins'].iloc[i] / 60
            results_frow.append(start_time)
            results_frow.append(newarrival_time)
            results_frow.append(tour_id)

    return results_frow

现在运行多处理并收集结果:

def collect_results(result_list):
    return pd.DataFrame({'start_time': result_list[0::3],
                  'arrival_time': result_list[1::3],
                  'tour_id': result_list[2::3]})

# create list of grouped dataframes
grplist = []
for name, group in temp_df1.groupby('tour_id'):
    grplist.append(group)

# use partial to fix the second argument in the function so that multiprocessing does not have an issue
func_partial = partial(func, time_dist_df = time_dist)

if __name__ == '__main__':
    start = timeit.default_timer()
    pool = multiprocessing.Pool(processes=cpu_cores)
    result_list = pool.map(func_partial, grplist)
    result_list_final = result_list[1]

    results_df = collect_results(result_list_final) #### Here lies the problem. Instead of getting back 8 rows, I am only getting back 5 i.e. the last group in the grplist
    stop = timeit.default_timer()
    total_time = stop - start
    print("It took a total of %sec" %total_time)
    results_df.to_csv(r"c:/stimes_parallelized.csv", index=False)

    pool.close()
    pool.join()
提问于
用户回答回答于

这里有多个问题:

1)为什么在导入pool时导入多进程:

from multiprocessing import Pool, cpu_count

这意味着你导入了2次多处理,同时也导入了pool:

import multiprocessing

因此,替换:

pool = multiprocessing.Pool(processes=cpu_cores)

通过:

pool = Pool(processes=cpu_cores)

2)你并没有在描述中提到你的python的版本,你想让我们如何回答,而我们不知道该使用哪个版本

3)我想要解决这个问题,你必须冻结freeze_support多处理的支持

from multiprocessing import freeze_support

它的用途如下:

if __name__ == '__main__':
    freeze_support()

扫码关注云+社区

领取腾讯云代金券