我有一个几千行长的数据帧,其中一列包含两对GPS坐标,我试图用它们来计算这些坐标之间的行驶时间。我有一个函数,它接受这些坐标并返回驾驶时间,它可能需要3-8秒来计算每个条目。因此,整个过程可能需要相当长的时间。我希望能够做的是:使用可能的3-5个线程,迭代通过列表,计算驱动时间,并移动到下一个条目,而其他线程正在完成,并且在进程中创建的线程不超过5个。独立地,我可以运行多个线程,我可以跟踪线程计数,直到允许的最大线程数降到限制以下,直到下一个线程开始,我可以迭代数据帧并计算驱动时间。然而,我很难把它们拼凑在一起。这是我所拥有的经过编辑的精简版本。
import pandas
import threading
import arcgis
class MassFunction:
#This is intended to keep track of the active threads
MassFunction.threadCount = 0
def startThread(functionName,params=None):
#This kicks off a new thread and should count up to keep track of the threads
MassFunction.threadCount +=1
if params is None:
t = threading.Thread(target=functionName)
else:
t = threading.Thread(target=functionName,args=[params])
t.daemon = True
t.start()
class GeoAnalysis:
#This class handles the connection to the ArcGIS services
def __init__(self):
super(GeoAnalysis, self).__init__()
self.my_gis = arcgis.gis.GIS("https://www.arcgis.com", username, pw)
def drivetimeCalc(self, coordsString):
#The coords come in as a string, formatted as 'lat_1,long_1,lat_2,long_2'
#This is the bottleneck of the process, as this calculation/response
#below takes a few seconds to get a response
points = coordsString.split(", ")
route_service_url = self.my_gis.properties.helperServices.route.url
self.route_layer = arcgis.network.RouteLayer(route_service_url, gis=self.my_gis)
point_a_to_point_b = "{0}, {1}; {2}, {3}".format(points[1], points[0], points[3], points[2])
result = self.route_layer.solve(stops=point_a_to_point_b,return_directions=False, return_routes=True,output_lines='esriNAOutputLineNone',return_barriers=False, return_polygon_barriers=False,return_polyline_barriers=False)
travel_time = result['routes']['features'][0]['attributes']['Total_TravelTime']
#This is intended to 'remove' one of the active threads
MassFunction.threadCount -=1
return travel_time
class MainFunction:
#This is to give access to the GeoAnalysis class from this class
GA = GeoAnalysis()
def closureDriveTimeCalc(self,coordsList):
#This is intended to loop in the event that a fifth loop gets started and will prevent additional threads from starting
while MassFunction.threadCount > 4:
pass
MassFunction.startThread(MainFunction.GA.drivetimeCalc,coordsList)
def driveTimeAnalysis(self,location):
#This reads a csv file containing a few thousand entries.
#Each entry/row contains gps coordinates, which need to be
#iterated over to calculate the drivetimes
locationMemberFile = pandas.read_csv(someFileName)
#The built-in apply() method in pandas seems to be the
#fastest way to iterate through the rows
locationMemberFile['DRIVETIME'] = locationMemberFile['COORDS_COL'].apply(self.closureDriveTimeCalc)当我现在运行它时,使用VS代码,我可以看到调用堆栈中的线程计数上升到数千,所以我觉得它并不是在等待线程完成并从threadCount值中加/减。任何想法/建议/小贴士都将不胜感激。
编辑:本质上,我的问题是如何取回travel_time值,以便将其放入数据帧中。我目前没有closureDriveTimeCalc函数的返回语句,所以当函数正确运行时,它不会将任何信息发送回apply()方法。
发布于 2019-02-09 09:37:36
我不会在apply中执行此操作,而是使用multiprocessing Pool.map
from multiprocessing import Pool
with Pool(processes=4) as pool:
locationMemberFile['DRIVETIME'] = pool.map(self.closureDriveTimeCalc, locationMemberFile['COORDS_COL']))https://stackoverflow.com/questions/54601520
复制相似问题