-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAsearch.py
84 lines (60 loc) · 2.39 KB
/
Asearch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import matplotlib.pyplot as plt
#import geopandas as gpd
import heapq
import networkx as nx
import pickle
import random
import math
#from shapely.geometry import Point
with open("valid_nodes.pkl", "rb") as file:
valid_nodes = pickle.load(file)
# print("Valid nodes:", valid_nodes)
# Randomly select start and goal nodes from the valid nodes
start_node = random.choice(valid_nodes)
goal_node = random.choice(valid_nodes)
print(f"Start Node: {start_node}")
print(f"Goal Node: {goal_node}")
# Define the heuristic function
def heuristic(node, goal, flood_penalty):
base_cost = math.dist(node, goal)
flood_risk = flood_penalty.get(node, 0) # Default penalty = 0
return base_cost + flood_risk
with open("graph.gpickle", "rb") as file:
G = pickle.load(file)
# Define the edge cost function
def edge_cost(u, v):
# Get the length attribute of the edge
return G.edges[u, v]['length']
# Define the A* search algorithm
def a_star_search(start, goal, flood_penalty):
if not isinstance(start, tuple) or not isinstance(goal, tuple):
raise ValueError("Start and goal nodes must be tuples")
# Priority queue for A* search
queue = [(0, start)]
g_costs = {start: 0}
parents = {start: None}
while queue:
current_cost, current_node = heapq.heappop(queue)
if current_node == goal:
# Reconstruct path
path = []
while current_node is not None:
path.append(current_node)
current_node = parents[current_node]
return path[::-1], g_costs[goal]
for neighbor in G.neighbors(current_node):
new_cost = g_costs[current_node] + edge_cost(current_node, neighbor)
if neighbor not in g_costs or new_cost < g_costs[neighbor]:
g_costs[neighbor] = new_cost
parents[neighbor] = current_node
priority = new_cost + heuristic(neighbor, goal, flood_penalty)
heapq.heappush(queue, (priority, neighbor))
return [], float("inf")
#This is for running tests for later
flood_penalty = {
(34.052235, -118.243683): 10, # Replace with actual coordinates
(40.712776, -74.005974): 15,
}
path, cost = a_star_search(start_node, goal_node, flood_penalty)
print("Shortest Path is :", path)
print("Total Cost is :", cost)