我有Dijkstra的算法:
# ==========================================================================
# We will create a dictionary to represent the graph
# =============================================================================
graph = {
'a' : {'b':3,'c':4, 'd':7},
'b' : {'c':1,'f':5},
'c' : {'f':6,'d':2},
'd' : {'e':3, 'g':6},
'e' : {'g':3, 'h':4},
'f' : {'e':1, 'h':8},
'g' : {'h':2},
'h' : {'g':2}
}
def dijkstra(graph, start, goal):
shortest_distance = {} # dictionary to record the cost to reach to node. We will constantly update this dictionary as we move along the graph.
track_predecessor = {} # dictionary to keep track of path that led to that node.
unseenNodes = graph.copy() # to iterate through all nodes
infinity = 99999999999 # infinity can be considered a very large number
track_path = [] # dictionary to record as we trace back our journey
# Initially we want to assign 0 as the cost to reach to source node and infinity as cost to all other nodes
for node in unseenNodes:
shortest_distance[node] = infinity
shortest_distance[start] = 0
# The loop will keep running until we have entirely exhausted the graph, until we have seen all the nodes
# To iterate through the graph, we need to determine the min_distance_node every time.
while unseenNodes:
min_distance_node = None
for node in unseenNodes:
if min_distance_node is None:
min_distance_node = node
elif shortest_distance[node] < shortest_distance[min_distance_node]:
min_distance_node = node
# From the minimum node, what are our possible paths
path_options = graph[min_distance_node].items()
# We have to calculate the cost each time for each path we take and only update it if it is lower than the existing cost
for child_node, weight in path_options:
if weight + shortest_distance[min_distance_node] < shortest_distance[child_node]:
shortest_distance[child_node] = weight + shortest_distance[min_distance_node]
track_predecessor[child_node] = min_distance_node
# We want to pop out the nodes that we have just visited so that we dont iterate over them again.
unseenNodes.pop(min_distance_node)
# Once we have reached the destination node, we want trace back our path and calculate the total accumulated cost.
currentNode = goal
while currentNode != start:
try:
track_path.insert(0, currentNode)
currentNode = track_predecessor[currentNode]
except KeyError:
print('Path not reachable')
break
track_path.insert(0, start)
# If the cost is infinity, the node had not been reached.
if shortest_distance[goal] != infinity:
print('Shortest distance is ' + str(shortest_distance[goal]))
print('And the path is ' + str(track_path))
如果我有少量的节点(就像代码中的那样),它可以很好地工作,但是我有一个大约480,000个节点的图,按照我的近似,在这么大的图中,它将需要7.5小时,而这仅仅是一种方式!我怎么能让它工作得更快?例如,在OSM中,它以秒为单位计算!
发布于 2020-01-20 23:38:19
变量(unseenNodes
,currentNode
)应该是snake_case
,而不是camelCase
。
99999999999
不是近无穷远的地方。如果您想使用无穷大,请使用真正的无穷大:float("+inf")
,或者在Python 3.5+中使用math.inf
。
Python的print()
语句自动将任何非字符串对象转换为字符串,以便打印。你不需要这么做。这是:
print('Shortest distance is ' + str(shortest_distance[goal]))
print('And the path is ' + str(track_path))
可以写成:
print('Shortest distance is', shortest_distance[goal])
print('And the path is', track_path)
打字的时间略短一些。更重要的是,它的效率略高一些,因为它不需要创建一个新的string对象并随后重新定位,它是字符串的连接。
如果使用的是Python,则可能需要使用f- 3.6+:
print(f'Shortest distance is {shortest_distance[goal]}')
print(f'And the path is {track_path}')
它直接将值插入字符串中。
您的函数找到路径,打印结果,而不返回任何内容。如果您希望以任何其他方式使用已发现的路径,这是没有用的。函数应该计算结果并返回它。调用者应负责打印结果。
没错,你有很多评论。但是调用方只能猜测函数参数应该是什么,以及函数返回的是什么。您应该用类型提示和"""doc-strings"""
来记录这一点。类似于:
from typing import Any, Mapping, Tuple, List
Node = Any
Edges = Mapping[Node, float]
Graph = Mapping[Node, Edges]
def dijkstra(graph: Graph, start: Node, goal: Node) -> Tuple[float, List]:
"""
Find the shortest distance between two nodes in a graph, and
the path that produces that distance.
The graph is defined as a mapping from Nodes to a Map of nodes which
can be directly reached from that node, and the corresponding distance.
Returns:
A tuple containing
- the distance between the start and goal nodes
- the path as a list of nodes from the start to goal.
If no path can be found, the distance is returned as infinite, and the
path is an empty list.
"""
在此代码中:
min_distance_node = None
for node in unseenNodes:
if min_distance_node is None:
min_distance_node = node
elif shortest_distance[node] < shortest_distance[min_distance_node]:
min_distance_node = node
您一直在查找shortest_distance[min_distance_node]
。在编译语言中,编译器可能能够执行数据流分析,并确定只有在min_distance_node
更改时才需要再次查找值。在Python这样的解释语言中,查找操作可以执行用户定义的代码并更改值,因此必须执行每个查找操作。shortest_distance[min_distance_node]
是两个变量查找加上一个字典索引操作。与以下比较:
min_distance_node = None
min_distance = infinity
for node in unseenNodes:
distance = shortest_distance[node]
if min_distance_node is None:
min_distance_node = node
min_distance = distance
elif distance < min_distance:
min_distance_node = node
min_distance = distance
由于shortest_distance[min_distance_node]
和shortest_distance[node]
的查找较少,这段代码将运行得更快。
但是,查找列表的最小值是非常常见的操作,因此Python有一个内置函数来执行这个操作:min(iterable, *, key, default)
。key
参数用于指定排序函数.在这种情况下,从节点到距离的映射。如果没有剩馀的节点,则可以使用default
来防止ValueError
,在这种情况下,这是不必要的。
min_distance_node = min(unseenNodes, key=lambda node: shortest_distance[node])
同样的道理:
for child_node, weight in path_options:
if weight + shortest_distance[min_distance_node] < shortest_distance[child_node]:
shortest_distance[child_node] = weight + shortest_distance[min_distance_node]
track_predecessor[child_node] = min_distance_node
重复查找shortest_distance[min_distance_node]
;再次,两个变量查找和一个字典索引操作。同样,我们可以将其移出循环:
min_distance = shortest_distance[min_distance_node]
for child_node, weight in path_options:
if weight + min_distance < shortest_distance[child_node]:
shortest_distance[child_node] = weight + min_distance
track_predecessor[child_node] = min_distance_node
查找min_distance_node
的代码:它检查了多少个节点?在玩具图"a"
到"h"
中,在第一次迭代时,它需要搜索8个节点。有48万个节点,就需要搜索48万个节点!在第二次迭代中,已经从unseenNodes
中删除了一个节点,因此它将少搜索一个节点。7节点很好,但479 999节点是大量的节点。
"a"
连接到多少个节点?只有3. min_distance_node
将成为这3个节点之一。搜索剩余的节点(具有无限距离)是不必要的。如果只在算法的每一步中添加可以到达的节点,则搜索空间将从数千个节点减少到几百个节点。
此外,如果按距离按排序顺序维护这些unseenNodes
,则min_distance_node
始终是这个“优先级队列”中的第一个节点,根本不需要搜索unseenNodes
。
在优先级队列中维护不可见的节点很容易,可以通过一个min堆结构来完成,该结构内置于Python (heapq
)中:
from math import inf, isinf
from heapq import heappush, heappop
from typing import Any, Mapping, Tuple, List
Node = Any
Edges = Mapping[Node, float]
Graph = Mapping[Node, Edges]
def dijkstra(graph: Graph, start: Node, goal: Node) -> Tuple[float, List]:
"""
Find the shortest distance between two nodes in a graph, and
the path that produces that distance.
The graph is defined as a mapping from Nodes to a Map of nodes which
can be directly reached from that node, and the corresponding distance.
Returns:
A tuple containing
- the distance between the start and goal nodes
- the path as a list of nodes from the start to goal.
If no path can be found, the distance is returned as infinite, and the
path is an empty list.
"""
shortest_distance = {}
predecessor = {}
heap = []
heappush(heap, (0, start, None))
while heap:
distance, node, previous = heappop(heap)
if node in shortest_distance:
continue
shortest_distance[node] = distance
predecessor[node] = previous
if node == goal:
path = []
while node:
path.append(node)
node = predecessor[node]
return distance, path[::-1]
else:
for successor, dist in graph[node].items():
heappush(heap, (distance + dist, successor, node))
else:
return inf, []
if __name__ == '__main__':
graph = {
'a' : {'b':3, 'c':4, 'd':7},
'b' : {'c':1, 'f':5},
'c' : {'f':6, 'd':2},
'd' : {'e':3, 'g':6},
'e' : {'g':3, 'h':4},
'f' : {'e':1, 'h':8},
'g' : {'h':2},
'h' : {'g':2}
}
distance, path = dijkstra(graph, 'a', 'e')
if isinf(distance):
print("No path")
else:
print(f"Distance = {distance}, path={path}")
你所说的“OSM”是指“开放街道地图”吗?如果是这样,则使用错误的算法。地图节点具有坐标,可用作“提示”,用于指向给定方向的搜索。请参阅一个*搜索算法
https://codereview.stackexchange.com/questions/235898
复制相似问题