首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在Python中使用多个但数量有限的线程来处理列表

如何在Python中使用多个但数量有限的线程来处理列表
EN

Stack Overflow用户
提问于 2019-02-09 07:16:20
回答 1查看 101关注 0票数 1

我有一个几千行长的数据帧,其中一列包含两对GPS坐标,我试图用它们来计算这些坐标之间的行驶时间。我有一个函数,它接受这些坐标并返回驾驶时间,它可能需要3-8秒来计算每个条目。因此,整个过程可能需要相当长的时间。我希望能够做的是:使用可能的3-5个线程,迭代通过列表,计算驱动时间,并移动到下一个条目,而其他线程正在完成,并且在进程中创建的线程不超过5个。独立地,我可以运行多个线程,我可以跟踪线程计数,直到允许的最大线程数降到限制以下,直到下一个线程开始,我可以迭代数据帧并计算驱动时间。然而,我很难把它们拼凑在一起。这是我所拥有的经过编辑的精简版本。

代码语言:javascript
运行
复制
import pandas
import threading
import arcgis

class MassFunction:
    #This is intended to keep track of the active threads
    MassFunction.threadCount = 0

    def startThread(functionName,params=None):
        #This kicks off a new thread and should count up to keep track of the threads
        MassFunction.threadCount +=1

        if params is None:
            t = threading.Thread(target=functionName)
        else:
            t = threading.Thread(target=functionName,args=[params])
        t.daemon = True
        t.start()

class GeoAnalysis:
    #This class handles the connection to the ArcGIS services
    def __init__(self):
        super(GeoAnalysis, self).__init__()
        self.my_gis = arcgis.gis.GIS("https://www.arcgis.com", username, pw)

    def drivetimeCalc(self, coordsString):
        #The coords come in as a string, formatted as 'lat_1,long_1,lat_2,long_2'
        #This is the bottleneck of the process, as this calculation/response
        #below takes a few seconds to get a response
        points = coordsString.split(", ")
        route_service_url = self.my_gis.properties.helperServices.route.url
        self.route_layer = arcgis.network.RouteLayer(route_service_url, gis=self.my_gis)
        point_a_to_point_b = "{0}, {1}; {2}, {3}".format(points[1], points[0], points[3], points[2])
        result = self.route_layer.solve(stops=point_a_to_point_b,return_directions=False, return_routes=True,output_lines='esriNAOutputLineNone',return_barriers=False, return_polygon_barriers=False,return_polyline_barriers=False)
        travel_time = result['routes']['features'][0]['attributes']['Total_TravelTime']
        #This is intended to 'remove' one of the active threads 
        MassFunction.threadCount -=1
        return travel_time


class MainFunction:
    #This is to give access to the GeoAnalysis class from this class
    GA = GeoAnalysis()

    def closureDriveTimeCalc(self,coordsList):
        #This is intended to loop in the event that a fifth loop gets started and will prevent additional threads from starting
        while MassFunction.threadCount > 4:
            pass
        MassFunction.startThread(MainFunction.GA.drivetimeCalc,coordsList)

    def driveTimeAnalysis(self,location):
        #This reads a csv file containing a few thousand entries. 
        #Each entry/row contains gps coordinates, which need to be 
        #iterated over to calculate the drivetimes
        locationMemberFile = pandas.read_csv(someFileName)
        #The built-in apply() method in pandas seems to be the
        #fastest way to iterate through the rows

        locationMemberFile['DRIVETIME'] = locationMemberFile['COORDS_COL'].apply(self.closureDriveTimeCalc)

当我现在运行它时,使用VS代码,我可以看到调用堆栈中的线程计数上升到数千,所以我觉得它并不是在等待线程完成并从threadCount值中加/减。任何想法/建议/小贴士都将不胜感激。

编辑:本质上,我的问题是如何取回travel_time值,以便将其放入数据帧中。我目前没有closureDriveTimeCalc函数的返回语句,所以当函数正确运行时,它不会将任何信息发送回apply()方法。

EN

回答 1

Stack Overflow用户

发布于 2019-02-09 09:37:36

我不会在apply中执行此操作,而是使用multiprocessing Pool.map

代码语言:javascript
运行
复制
from multiprocessing import Pool

with Pool(processes=4) as pool:
    locationMemberFile['DRIVETIME'] = pool.map(self.closureDriveTimeCalc, locationMemberFile['COORDS_COL']))
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54601520

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档