我需要一些关于如何加快代码速度的提示,因为到目前为止,对于更大的输入来说,速度非常慢。代码的工作方式是,文件Loc_Circle_50U.txt包含50辆车辆的真实位置,而ns3_files中的文件包含一些错误位置。我分析了这些作为误差存储的差异,并根据误差和车辆的速度,计算它们是否有可能发生碰撞。时间被分成1毫秒的时隙。
测试文件在这里共享。 sumo_file = ['Loc_Circle_50U.txt']
是一个全局文件,提取时为25 by,ns3_files
中列出的文件一个接一个地运行。目前,我为ns3_files
附加的是一个较小的,较大的是大约30 is。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import math
from joblib import Parallel, delayed
ns3_files = ['EQ_100p_1Hz_50U.txt','EQ_100p_5Hz_50U.txt','EQ_100p_10Hz_50U.txt',
'EQ_100p_20Hz_50U.txt','EQ_100p_30Hz_50U.txt','EQ_100p_40Hz_50U.txt','EQ_100p_50Hz_50U.txt','EQ_100p_100Hz_50U.txt']
sumo_file = ['Loc_Circle_50U.txt']
sumo_df = pd.read_csv(sumo_file[0], delim_whitespace = True)
for qqq in sumo_file:
rr = pd.read_csv(qqq, delim_whitespace=True)
print("analyzing file ", qqq)
if 'Time' in rr.columns:
if 'VID' in rr.columns:
if 'Pos(X)' in rr.columns:
if 'Pos(Y)' in rr.columns:
if 'Vel(X)' in rr.columns:
if 'Vel(Y)' in rr.columns:
print("sumo file ", qqq, " is OK")
for qqq in ns3_files:
rr = pd.read_csv(qqq, delim_whitespace=True)
print("analyzing file ", qqq)
if 'PId' in rr.columns:
if 'TxId' in rr.columns:
if 'RxId' in rr.columns:
if 'GenTime' in rr.columns:
if 'RecTime' in rr.columns:
if 'Delay' in rr.columns:
if 'TxTruePos(X)' in rr.columns:
if 'TxHeadPos(X)' in rr.columns:
if 'TxTruePos(Y)' in rr.columns:
if 'TxHeadPos(Y)' in rr.columns:
if 'Error(m)' in rr.columns:
if 'Tx_HeadVelX' in rr.columns:
if 'Tx_HeadVelY' in rr.columns:
if 'RecvPos(X)' in rr.columns:
if 'RecvPos(Y)' in rr.columns:
print("ns3 file ", qqq, " is OK")
prediction = 0 # 0 means no prediction and 1 is prediction enabled
def calc_error(c): # pass the ns3 dataframe cleaned of all nan values.
c.sort_values("RecTime")
# print("c = ", c, " to be processed")
nrows = c.shape[0]
error = [] # will store slot wise error
collision = 0 # will be 0 as long as ttc error < 6.43. Becomes 1 if ttc_error exceeds 6.43 even for 1 slot
ttc_error_x = 0 # calculates the ttc error in x for every slot
ttc_error_y = 0 # calculates the ttc error in y for every slot
ttc_error = [] # will store slot wise ttc error
sender = c.loc[0, "TxId"] # sender will be the same throughout, so take the sender value from any row
receiver = c.loc[0, "RxId"] # same as above
# print("sender = ", sender, "receiver = ", receiver)
if nrows==1: # only 1 message exchanged
error_x_val = abs( (c["TxTruePos(X)"]) - c["TxHeadPos(X)"]).values[0]
error_y_val = abs( (c["TxTruePos(Y)"]) - c["TxHeadPos(Y)"]).values[0]
rel_vel_x = abs (c["Tx_HeadVelX"] - c["RecvVel(X)"]).values[0]
rel_vel_y = abs (c["Tx_HeadVelY"] - c["RecvVel(Y)"]).values[0]
# now for the relative velocity, which of the sender's velocity to take ? the sending instant or receiving instant ?
# print("error_x = ", error_x, "rel_vel_x = ", rel_vel_x)
if (rel_vel_x!=0):
ttc_error_x = error_x_val/rel_vel_x
else:
ttc_error_x = 0 # rel vel same means no error
if (rel_vel_y!=0):
ttc_error_y = error_y_val/rel_vel_y
else:
ttc_error_y = 0
# print("1 packet scenario ", ttc_error_x, ttc_error_y, error_x_val, error_y_val, rel_vel_x, rel_vel_y)
ttc_error.append(max(ttc_error_x, ttc_error_y))
err = c["Error(m)"].values[0]
error.append(np.mean(err))
else: # more than 1 packet exchanged
for k in range(nrows-1): # one k for each BSM. here BSMs are analyzed at reception instants
current_bsm = c.loc[k]
next_bsm = c.loc[k+1]
slots = int(next_bsm['RecTime'] - current_bsm['RecTime'] - 1)
current_time = current_bsm["RecTime"]
# print("slots=", slots, "current_time=",current_time,"sender=",sender,"receiver=",receiver)
mask_1 = (sumo_df['VID'] == sender)
df_actual = sumo_df[mask_1] # df_actual is senders sumo information
# print("current_bsm" , current_bsm) #["RecTime"]) #["RecvVel(X)"])
x_actual=(current_bsm["TxTruePos(X)"])
y_actual=(current_bsm["TxTruePos(Y)"])
x_header=(current_bsm["TxHeadPos(X)"])
y_header=(current_bsm["TxHeadPos(Y)"])
error_x_val = abs(x_actual - x_header)
error_y_val = abs(y_actual - y_header)
error.append(math.sqrt(error_x_val**2 + error_y_val**2))
# print("x_actual",x_actual,"y_actual",y_actual,"x_header",x_header,"y_header",y_header,"error_x_val",error_x_val,"error_y_val",error_y_val) #OK
mask_2 = (df_actual["Time"]==current_time)
sender_velocity_x = (df_actual[mask_2])["Vel(X)"].values[0]
sender_velocity_y = (df_actual[mask_2])["Vel(Y)"].values[0]
rel_vel_x = abs(sender_velocity_x - current_bsm["RecvVel(X)"])
rel_vel_y = abs(sender_velocity_y - current_bsm["RecvVel(Y)"])
# print("sender_velocity_x=",sender_velocity_x,"sender_velocity_y=",sender_velocity_y,"rec_vel_x=",current_bsm["RecvVel(X)"],"rec_vel_y",current_bsm["RecvVel(Y)"], \
# "rel_vel_x",rel_vel_x,"rel_vel_y",rel_vel_y)
# the next are header info as error is from rx perspective and rx has only header info
x_pos_BSM = c.loc[k, "TxHeadPos(X)"]
y_pos_BSM = c.loc[k, "TxHeadPos(Y)"]
x_speed_BSM = c.loc[k, "Tx_HeadVelX"]
y_speed_BSM = c.loc[k, "Tx_HeadVelY"]
if (rel_vel_x!=0):
ttc_error_x = error_x_val/rel_vel_x
else:
ttc_error_x = 0
if (rel_vel_y!=0):
ttc_error_y = error_y_val/rel_vel_y
else:
ttc_error_y = 0
ttc_error.append(max(ttc_error_x, ttc_error_y))
# print(" BSM at", time," rel_vel_x=",rel_vel_x,"rel_vel_y=",rel_vel_y,"error_x=",error_x,"error_y=",error_y )
# print("ttc_error_x=", ttc_error_x, "ttc_error_y=", ttc_error_y)
for j in range(slots): # this for loop will run fir every slot in between 2 receptions
# print("prediction slot = ", current_slot+j+1)
x_pos_predicted = x_pos_BSM + prediction*(x_speed_BSM * (j+1))*(0.001) # as slots are in msec and speed in m/s
y_pos_predicted = y_pos_BSM + prediction*(y_speed_BSM * (j+1))*(0.001)
mask_3 = (df_actual["Time"] == (current_time + (j+1))) # df_actual has the senders info
df_row = df_actual[mask_3]
# print(df_row)
# row of sumo file at the ongoing slot for receiver
x_pos_actual = df_row["Pos(X)"].values[0]
y_pos_actual = df_row["Pos(Y)"].values[0]
# print("x actual=", x_pos_actual," y actual=",y_pos_actual," x pred=",x_pos_predicted, " y pred =", y_pos_predicted)
error_x_val = abs((x_pos_predicted) - (x_pos_actual))
error_y_val = abs((y_pos_predicted) - (y_pos_actual))
error.append(error_x_val**2 + error_y_val**2)
# print("x error = ", error_x, ", y error = ", error_y)
receiver_mask = (sumo_df["VID"]==receiver) & (sumo_df["Time"]==(current_time + (j+1)))
df_receiver = sumo_df[receiver_mask] # the row for sender at that instant
rel_vel_x = abs(df_row["Vel(X)"].values[0] - df_receiver["Vel(X)"].values[0])
rel_vel_y = abs(df_row["Vel(Y)"].values[0] - df_receiver["Vel(Y)"].values[0])
if (rel_vel_x!=0):
ttc_error_x = error_x_val/rel_vel_x
else:
ttc_error_x = 0
if (rel_vel_y!=0):
ttc_error_y = error_y_val/rel_vel_y
else:
ttc_error_y = 0
ttc_error.append(max(ttc_error_x, ttc_error_y))
# print("ttc_error_x=", ttc_error_x, "ttc_error_y=", ttc_error_y)
# print("predslot",current_time+j+1,"x_actual",x_pos_actual,"y_actual",y_pos_actual,"x_predicted",x_pos_predicted,"y_predicted",y_pos_predicted,"error_x_val",error_x_val,"error_y_val",error_y_val) #, " is ", slot_error)
# add the last packet details
err_lastpacket = c.loc[nrows-1, "Error(m)"]
error.append(err_lastpacket)
current_time = c.loc[nrows-1, "RecTime"]
error_x_val = abs( (c.loc[nrows-1,"TxTruePos(X)"]) - c.loc[nrows-1,"TxHeadPos(X)"])
error_y_val = abs( (c.loc[nrows-1,"TxTruePos(Y)"]) - c.loc[nrows-1,"TxHeadPos(Y)"])
sender_mask = ((sumo_df["VID"]==sender) & (sumo_df["Time"]==(current_time)))
sender_x_vel = (sumo_df[sender_mask])["Vel(X)"].values[0]
sender_y_vel = (sumo_df[sender_mask])["Vel(Y)"].values[0]
rel_vel_x = abs (sender_x_vel - c.loc[nrows-1,"RecvVel(X)"])
rel_vel_y = abs (sender_y_vel - c.loc[nrows-1,"RecvVel(Y)"])
if (rel_vel_x!=0):
ttc_error_x = error_x_val/rel_vel_x
# print("error_x_val",error_x_val,"rel_vel_x",rel_vel_x)
else:
ttc_error_x = 0
if (rel_vel_y!=0):
ttc_error_y = error_y_val/rel_vel_y
else:
ttc_error_y = 0
# print("ttc_error_x",ttc_error_x, "ttc_error_y",ttc_error_y)
ttc_error.append(max(ttc_error_x, ttc_error_y))
# print("current_time",current_time,"sender ",sender)
# print("overall error", error)
# print("overall ttc_error", ttc_error)
# print("\n")
# print("error for sender", sender, "and receiver", receiver, "is", error)
avg_error = np.mean(error)
if np.mean(ttc_error)>6.43:
collision = 1
else:
collision = 0
return (avg_error, collision)
overall_errors = [] # to store error per file
overall_collisions = [] # to store collision per file
def start_process(fil):
print("File ", fil, " started")
b = pd.read_csv(fil, delim_whitespace = True)
b = b.sort_values(['RecTime'])
b = b.reset_index(drop=True)
m = b['RxId'].nunique() # m is number of receivers
# throughput block starts
# overall_duration = (b['RecTime'].max() - b['RecTime'].min())/1000 # milliseconds to seconds
## in throughput case, we work on the whole file so 1 pair or 1 packet exchanged cases do not apply.
# packets = b.shape[0] # no of rows = no of packets received
receivers = b['RxId'].unique()
average_errors = [] # hold error for every pair in a file
average_collisions = [] # hold collision (0 or 1) for every pair in a file
# collisions = 0 # will have the number of collision risky pairs in every file
for i in range(len(receivers)):
receiver = receivers[i] # for every receiver
senders = b[b['RxId'] == receiver].TxId.unique()
for j in range(len(senders)):
sender = senders[j]
mask = (b['RxId'] == receiver) & (b['TxId'] == sender) # extract out the rx-tx pair
# print("cc=",cc)
c = b[mask]
c = c.reset_index(drop=True)
# print("cc=",cc)
# print("error calculation for sender ",sender, " and receiver ", receiver, "\n")
# print("c = ", c , "before being sent")
avg_error, collision = calc_error(c) # calc_error is the function
# this will give the whole error for that pair
# pos_error should return a value of error
# avg_error = np.sum(pos_error)/overall_duration # errors for single pair
average_errors.append(avg_error) # average_errors will hold the error for every pair in a file
average_collisions.append(collision)
# print("average errors for Tx ",sender, " and receiver ", receiver, " is ", avg_error, "\n")
# print("collision status is ", collision)
# print("average_collisions", average_collisions,"average_errors",average_errors)
average_error = np.average(average_errors)
average_collision = np.average(average_collisions)
print("File ", fil, " completed")
# print("\n")
# print("average_error = ", average_error)
overall_collisions.append(average_collision)
overall_errors.append(average_error)
# print(average_errors)
if prediction==0:
print("for file ", fil, file = open("parallel_error_collision_P.txt", "a"))
print("no prediction result follows with prediction flag =", prediction, file = open("parallel_error_collision_P.txt", "a"))
print("overall_collisions = ", overall_collisions, file = open("parallel_error_collision_P.txt", "a"))
print("overall_errors = ", overall_errors,"\n", file = open("parallel_error_collision_P.txt", "a"))
else:
print("for file ", fil, file = open("parallel_error_collision_P.txt", "a"))
print("prediction assisted result follows with prediction flag =", prediction, file = open("parallel_error_collision_P.txt", "a"))
print("overall_collisions = ", overall_collisions, file = open("parallel_error_collision_P.txt", "a"))
print("overall_errors = ", overall_errors, "\n", file = open("parallel_error_collision_P.txt", "a"))
Parallel(n_jobs=len(ns3_files))(delayed(start_process)(fil) for fil in ns3_files)
发布于 2020-04-02 12:43:08
我看到了一些可以帮助你改进你的程序的东西。
这段代码非常密集,很长,而且没有很好的组织,因此很难理解和理解。作为第一步,我建议提取较小的函数,例如计算错误值。每个功能都应该是小的,有很好的文档和可测试性。
不要像在这段代码中那样依赖于间接式的代码和函数声明,而是将事物收集到一个main
函数中。有一种方法可以做到:
if __name__ == "__main__":
prediction = 0 # 0 means no prediction and 1 is prediction enabled
ns3_files = ['EQ_100p_1Hz_50U.txt','EQ_100p_5Hz_50U.txt','EQ_100p_10Hz_50U.txt',
'EQ_100p_20Hz_50U.txt','EQ_100p_30Hz_50U.txt','EQ_100p_40Hz_50U.txt',
'EQ_100p_50Hz_50U.txt','EQ_100p_100Hz_50U.txt']
sumo_file = ['Loc_Circle_50U.txt']
sumo_df = pd.read_csv(sumo_file[0], delim_whitespace = True)
sumo_headers = {'Time', 'VID', 'Pos(X)', 'Pos(Y)', 'Vel(X)', 'Vel(Y)'}
if validate_headers(sumo_df, sumo_headers):
print("sumo file ", sumo_file[0], " is OK")
[start_process(fil, sumo_df) for fil in ns3_files]
Python广泛使用数据结构,如列表、字典和集合。如果您发现自己正在编写长if
结构,如下面的代码所示,请停止!几乎总有更好的方法来做这件事。例如,您将看到在上面的示例代码中有一个validate_headers
函数。这是定义
def validate_headers(df, headerset):
return headerset <= set(df.columns.values)
它在两个集合上使用<=
运算符来确定一个集是否是另一个集的严格子集。在这种情况下,我们试图确保所有必需的字段都存在于一个dataframe中,因此我们传递了dataframe和一组标头名称。很简单!
原始代码加载每个文件的整个数据,只是为了验证头文件,然后再次加载它来实际处理数据。这是毫无意义的,浪费时间和记忆。
pandas
迭代pandas
数据,逐行使用索引是处理数据的最慢的方法。相反,您应该寻求使用矢量化。例如,如果您想为dataframe中的每一行创建一个新的列errx
,您可以这样写:
df['errx'] = abs(df['TxTruePos(X)'] - df['TxHeadPos(X)'])
使用矢量化操作是熊猫打算使用的方式,与使用for
循环相比非常有效。但是,如果您发现您还需要更高的性能,则可以使用numpy
,您已经包括了它,但没有充分利用它。我还没有对所有这些进行验证,但似乎大部分的计算都是多余的。例如,程序计算一个error_x_val
,但是ns3_data
文件似乎已经有这样一个列了。如果实际包含所需的数据,则使用它而不是重新计算。如果没有,我建议将其从数据框架中删除,如果没有用的话。可以这样做:
del df['Error(X)']
https://codereview.stackexchange.com/questions/239697
复制相似问题