-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathAbidirectional.py
166 lines (131 loc) · 5.3 KB
/
Abidirectional.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import networkx as nx
import matplotlib.pyplot as plt
import geopandas as gpd
import pickle
import heapq
import math
import random
# Bidirectional A* search with risk factor
# Still need to adpat code with the new heuristic
# Constants for critical thresholds
CRITICAL_DEPTH = 0.3 # Critical depth threshold for safe passage in meters
CRITICAL_VELOCITY = 3 # Critical velocity threshold for safe passage in m/s
with open("graph.gpickle", "rb") as file:
G = pickle.load(file)
def euclidean_distance(x1, y1, x2, y2):
"""
Calculate Euclidean distance between two points.
"""
return math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
def risk(depth, velocity):
"""
Calculate the risk factor for a flooded area based on depth and velocity.
"""
depth_risk = depth / CRITICAL_DEPTH
velocity_risk = (velocity / CRITICAL_VELOCITY) ** 2
return depth_risk + velocity_risk
def heuristic(node, goal, flood_penalty):
"""
Heuristic with risk factor and Euclidean distance.
"""
# Coordinates for current node and goal node
x1, y1 = node
x2, y2 = goal
# Base heuristic using distance
base_cost = euclidean_distance(x1, y1, x2, y2)
# Risk factor and flood penalty
depth = G.nodes[node].get("depth", 0)
velocity = G.nodes[node].get("velocity", 0)
risk_factor = risk(depth, velocity)
flood_risk = flood_penalty.get(node, 0)
return base_cost + risk_factor + flood_risk
def edge_cost(u, v):
"""
Cost of traveling between nodes u and v.
"""
length = G.edges[u, v].get("length", 1)
return length
def bidirectional_a_star(G, start, goal, flood_penalty):
# Priority queues for forward and reverse search
forward_queue = [(0, start)]
reverse_queue = [(0, goal)]
# Costs and paths for forward and reverse search
forward_costs = {start: 0}
reverse_costs = {goal: 0}
forward_parents = {start: None}
reverse_parents = {goal: None}
# Visited nodes
forward_visited = set()
reverse_visited = set()
meeting_node = None
best_cost = float("inf")
while forward_queue and reverse_queue:
# Forward search step
f_cost, f_node = heapq.heappop(forward_queue)
if f_node in reverse_visited:
total_cost = forward_costs[f_node] + reverse_costs[f_node]
if total_cost < best_cost:
best_cost = total_cost
meeting_node = f_node
break # Found meeting point
forward_visited.add(f_node)
for neighbor in G.neighbors(f_node):
new_cost = forward_costs[f_node] + edge_cost(f_node, neighbor)
if neighbor not in forward_costs or new_cost < forward_costs[neighbor]:
forward_costs[neighbor] = new_cost
forward_parents[neighbor] = f_node
priority = new_cost + heuristic(neighbor, goal, flood_penalty)
heapq.heappush(forward_queue, (priority, neighbor))
# Reverse search step
r_cost, r_node = heapq.heappop(reverse_queue)
if r_node in forward_visited:
total_cost = reverse_costs[r_node] + forward_costs[r_node]
if total_cost < best_cost:
best_cost = total_cost
meeting_node = r_node
break # Found meeting point
reverse_visited.add(r_node)
for neighbor in G.neighbors(r_node):
new_cost = reverse_costs[r_node] + edge_cost(r_node, neighbor)
if neighbor not in reverse_costs or new_cost < reverse_costs[neighbor]:
reverse_costs[neighbor] = new_cost
reverse_parents[neighbor] = r_node
priority = new_cost + heuristic(neighbor, start, flood_penalty)
heapq.heappush(reverse_queue, (priority, neighbor))
# Reconstruct path
path = []
if meeting_node:
# Forward path from start to meeting point
node = meeting_node
while node:
path.append(node)
node = forward_parents[node]
path.reverse()
# Reverse path from meeting point to goal
node = reverse_parents[meeting_node]
while node:
path.append(node)
node = reverse_parents[node]
return path, best_cost
with open("valid_nodes.pkl", "rb") as file:
valid_nodes = pickle.load(file)
# print("Valid nodes:", valid_nodes)
# Randomly select start and goal nodes from the valid nodes
start_node = random.choice(valid_nodes)
goal_node = random.choice(valid_nodes)
print(f"Start Node: {start_node}")
print(f"Goal Node: {goal_node}")
flood_penalty = {
(34.052235, -118.243683): 10, # Replace with actual coordinates
(40.712776, -74.005974): 15,
}
path, cost = bidirectional_a_star(G,start_node, goal_node, flood_penalty)
print("Shortest Path is :", path)
print("Total Cost is :", cost)
# Need to set up visualization for the graph and code (do the plotting)
# Start testing code for written output then with gragh visualization
# Need to do bidirectional without risk factor
#Challenges:
#Integrating risk factor equation into heuristic
#Using the gerographic coordinates in the code , changing paramters
# Not sure how to visualize yet