-
Notifications
You must be signed in to change notification settings - Fork 0
/
pathfinding.py
213 lines (170 loc) · 7.37 KB
/
pathfinding.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
from collections import deque
import stark_qa
from stark_qa.skb import SKB
def print_neighbors(node: int, skb: stark_qa.skb.SKB):
neighbors = skb.get_neighbor_nodes(node)
print(f"Number of neighbors: {len(neighbors)}. Neighbors:")
print(neighbors)
skb.node_info[0]
def dfs(start: int, target: int, skb: stark_qa.skb.SKB, max_depth: int):
return dfs_step(start, target, skb, set(), max_depth)
def dfs_step(start: int, end: int, skb: stark_qa.skb.SKB, visited, max_depth: int):
visited.add(start)
for neighbor in skb.get_neighbor_nodes(start):
if neighbor == end:
return [neighbor]
if max_depth == 0:
return None
if neighbor not in visited:
path_to_end = dfs_step(neighbor, end, skb, visited, max_depth - 1)
if path_to_end is not None:
path_to_end.append(neighbor)
return path_to_end
return None
def reduce_num_paths(paths: list[int], targets: list[int], limit: int):
path_cnt2target = dict()
for target in targets:
path_cnt2target[target] = 0
for path in paths:
path_cnt2target[path[-1]] += 1
targets_to_remove = set()
for target in targets:
if path_cnt2target[target] > limit:
targets_to_remove.add(target)
remaining_paths = []
for path in paths:
if path[-1] not in targets_to_remove:
remaining_paths.append(path)
return remaining_paths, targets_to_remove
def rels_to_unknown(start: int, skb: stark_qa.skb.SKB, type_of_unknown: str) -> (list, dict):
paths_found = []
for neighbor in skb.get_neighbor_nodes(start):
if skb.node_info[neighbor]['type'] == type_of_unknown:
paths_found.append([start, neighbor])
return paths_found
def bfs_all_shortest_paths(start: int, targets: list[int], skb: stark_qa.skb.SKB, max_depth: int):
# Queue for BFS that stores (node, path) tuples
queue = deque([(start, [start])])
visited = set()
targets = targets.copy()
paths_found = []
# counters to track search depth
depth = 0
num_next_layer_nodes = 0
targets_reached_at_this_depth = set()
while queue:
if depth > max_depth:
print("Max depth reached. Terminating breadth-first search.")
break
# Dequeue a node and its path
node, path = queue.popleft()
# If the node is the target, save the path
if node in targets:
# check if this path has been found already
if path not in paths_found:
paths_found.append(path)
visited.add(node) # mark node as visited to not follow this path further and continue queue
targets_reached_at_this_depth.add(node)
# enqueue all adjacent nodes with the updated path and mark the node as visited, if last depth not reached yet
if depth < max_depth:
if node not in visited:
visited.add(node)
for neighbor in skb.get_neighbor_nodes(node):
if neighbor not in visited:
queue.append((neighbor, path + [neighbor]))
num_next_layer_nodes += 1
# check if all targets are reached
if len(queue) == num_next_layer_nodes:
num_next_layer_nodes = 0
for t in targets_reached_at_this_depth:
targets.remove(t)
if depth > max_depth:
print(f"{depth=} completed.")
else:
print(f"{depth=} completed, queue length={len(queue)}.")
if len(targets) == 0:
print("all shortest path to all targets found. Terminating search.")
return paths_found
targets_reached_at_this_depth = set()
depth += 1
# If no target is reached but queue is empty
print("End of search queue reached.")
return paths_found
def bfs_all_shortest_paths_not_faster(start: int, targets: list[int], skb: stark_qa.skb.SKB, max_depth: int):
# Queue for BFS that stores (node, path) tuples
next_queue = deque([(start, [start])])
visited = set()
targets = targets.copy()
paths_found = []
for depth in range(max_depth):
queue = next_queue
next_queue = deque()
targets_reached_at_this_depth = set()
while queue:
# Dequeue a node and its path
node, path = queue.popleft()
# If the node is the target, save the path
if node in targets:
paths_found.append(path)
visited.add(node) # mark node as visited to not follow this path further and continue queue
targets_reached_at_this_depth.add(node)
# enqueue all adjacent nodes with the updated path and mark the node as visited
if node not in visited:
visited.add(node)
for neighbor in skb.get_neighbor_nodes(node):
if neighbor not in visited:
next_queue.append((neighbor, path + [neighbor]))
# check if all targets are reached
for t in targets_reached_at_this_depth:
targets.remove(t)
print(f"{depth=} completed, queue length={len(next_queue)}.")
if len(targets) == 0:
print("All shortest path to all targets found. Terminating search.")
return paths_found
# on max depth, don't add anything new to queue, just check for targets reached
for depth in [max_depth]:
queue = next_queue
targets_reached_at_this_depth = set()
while queue:
# Dequeue a node and its path
node, path = queue.popleft()
# If the node is the target, save the path
if node in targets:
paths_found.append(path)
visited.add(node) # mark node as visited to not follow this path further and continue queue
targets_reached_at_this_depth.add(node)
# check if all targets are reached
for t in targets_reached_at_this_depth:
targets.remove(t)
print(f"{depth=} completed.")
if len(targets) == 0:
print("All shortest path to all targets found. Terminating search.")
return paths_found
print("Max depth reached. Terminating breadth-first search.")
return paths_found
def find_edge_type(u: int, v: int, skb: stark_qa.skb.SKB):
for edge_type in skb.rel_type_lst():
neighbors = skb.get_neighbor_nodes(u, edge_type)
if v in neighbors:
return edge_type
return None
def get_target_neighbors_of_certain_type(node_ids: [int], max_path_to_unknowns: int, type_of_unknown: str, skb: SKB):
paths = []
truncated = False
for i in range(len(node_ids)):
paths += rels_to_unknown(node_ids[i], skb, type_of_unknown)
if len(paths) > max_path_to_unknowns:
truncated = True
neighbor_frequency_counter = {}
for path in paths:
if path[-1] in neighbor_frequency_counter:
neighbor_frequency_counter[path[-1]] += 1
else:
neighbor_frequency_counter[path[-1]] = 1
max_connection_cnt = max(neighbor_frequency_counter.values())
filtered_path = []
for path in paths:
if neighbor_frequency_counter[path[-1]] == max_connection_cnt:
filtered_path.append(path)
paths = filtered_path
return paths, truncated