-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathUCS_analysis.py
294 lines (222 loc) · 12.9 KB
/
UCS_analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
from parameters import *
import time
import queue
def UCS_analysis(data, parameters):
# -------------------------------------- 4 different A* algorithms --------------------------------------
def norm_UCS_list(img_array, start_node, goal_node, img_shape, theta, heuristic):
steps_count = 0 # Save our number of steps
visited_nodes = set() # Save our visited nodes
frontier = [start_node] # Add the start_node to the frontier
while len(frontier) > 0: # While there is something inside the frontier, continue the algorithm
# Sort the frontier in reverse order, here Python uses timsort
frontier = sorted(frontier, reverse=True)
# Pop the last node, which has the lowest value of f
actual_node = frontier.pop()
# Add 1 to steps_count for each time we enter a node
steps_count += 1
# Save the temporary g-score of the actual node
tmp_g_score = actual_node.g
# Return the steps_count for analysis process
if actual_node == goal_node:
return steps_count
# Add the position (tuple(x,y)) to visited_node
visited_nodes.add(actual_node.position)
# Create a list of temporary visited nodes, add these nodes to the frontier after the for loop below
tmp_visited_nodes = []
# Iterate over all neighbors of the actual node
for next_pos in next_pos_list(img_array, actual_node, img_shape, theta):
steps_count += 1 # Add 1 for each time we check the next node
# Create next_node with position next_pos
next_node = Node(next_pos)
# If next_pos in visited_nodes, we don't do anything
if next_pos not in visited_nodes:
# f = g, update f-score
next_node.g = tmp_g_score + 1
next_node.f = next_node.g
next_node.parent = actual_node
# If next_node in frontier
if next_node in frontier:
# Find the index of next_node in the frontier
idx = frontier.index(next_node)
# If the f-score of the duplicate node is more than that of the actual next_node
if frontier[idx] > next_node:
# Remove this next_node in the frontier
frontier.remove(next_node)
# And append the actual next_node to the frontier
tmp_visited_nodes.append(next_node)
else:
# Append the actual next_node to the frontier
tmp_visited_nodes.append(next_node)
# Add all the nodes inside tmp_visited_nodes into frontier
frontier += tmp_visited_nodes
# Return the number of steps counted for analysis process
return steps_count
def norm_UCS_priority_queue(img_array, start_node, goal_node, img_shape, theta, heuristic):
steps_count = 0 # Save our number of steps
visited_nodes = set() # Save our visited nodes
frontier = queue.PriorityQueue()
frontier.put((0, start_node))
while frontier.empty() == False:
actual_node = frontier.get()[1]
steps_count += 1 # Add 1 for each time we enter a node
tmp_g_score = actual_node.g
if actual_node == goal_node:
return steps_count
visited_nodes.add(actual_node.position)
# Iterate over all neighbors of the actual node
for next_pos in next_pos_list(img_array, actual_node, img_shape, theta):
steps_count += 1 # Add 1 for each time we check the next node
# Create next_node with position next_pos
next_node = Node(next_pos)
# If next_pos in visited_nodes, we don't do anything
if next_pos not in visited_nodes:
# f = g, update f-score
next_node.g = tmp_g_score + 1
next_node.f = next_node.g
next_node.parent = actual_node
# Check if there exists a node having the same position as next_node (denoted as duplicate node)
# in the frontier or not
if (next_node.f, next_node) in frontier.queue:
# Find the index of the duplicate node in the frontier.queue
idx = frontier.queue.index((next_node.f, next_node))
# Check the f-value of the 2 nodes (next_node and the duplicate node in the frontier)
if frontier.queue[idx][0] > next_node.f:
# Replace the duplicate node by the next_node with lower f-score
# Step 1: Create a temporary queue
tmp_queue = queue.PriorityQueue()
# Step 2: Put all the elements of the frontier to tmp_queue until we find the duplicate node
while t != frontier.queue[idx]:
t = frontier.get()
tmp_queue.put(t)
# Step 3: Delete the duplicate node
t = frontier.get()
# Step 4: Reput all the elements from the tmp_queue to the frontier
while tmp_queue.empty() == False:
t = tmp_queue.get()
frontier.put(t)
# Put next_node to frontier
frontier.put((next_node.f, next_node))
else:
# Put next_node to frontier
frontier.put((next_node.f, next_node))
# Return the number of steps counted for analysis process
return steps_count
def UCS_list_with_init(grid, img_array, start_node, goal_node, img_shape, theta, heuristic):
steps_count = 0 # Save our number of steps
visited_nodes = set() # Save our visited nodes
frontier = [start_node] # Add the start_node to the frontier
while len(frontier) > 0: # While there is something inside the frontier, continue the algorithm
# Sort the frontier in reverse order, here Python uses timsort
frontier = sorted(frontier, reverse=True)
# Pop the last node, which has the lowest value of f
actual_node = frontier.pop()
# Add 1 to steps_count for each time we enter a node
steps_count += 1
# Save the temporary g-score of the actual node
tmp_g_score = actual_node.g
# Return the steps_count for analysis process
if actual_node == goal_node:
return steps_count
# Add the position (tuple(x,y)) to visited_node
visited_nodes.add(actual_node.position)
# Create a list of temporary visited nodes, add these nodes to the frontier after the for loop below
tmp_visited_nodes = []
for next_pos in grid[actual_node.position[0]][actual_node.position[1]]:
steps_count += 1 # Add 1 for each time we check the next node
next_node = Node(next_pos)
if next_pos not in visited_nodes:
# f = g, update f-score
next_node.g = tmp_g_score + 1
next_node.f = next_node.g
next_node.parent = actual_node
# If next_node in frontier
if next_node in frontier:
# Find the index of next_node in the frontier
idx = frontier.index(next_node)
# If the f-score of the duplicate node is more than that of the actual next_node
if frontier[idx] > next_node:
# Remove this next_node in the frontier
frontier.remove(next_node)
# And append the actual next_node to the frontier
tmp_visited_nodes.append(next_node)
else:
# Append the actual next_node to the frontier
tmp_visited_nodes.append(next_node)
# Add all the nodes inside tmp_visited_nodes into frontier
frontier += tmp_visited_nodes
# Return the number of steps counted for analysis process
return steps_count
# ------------------------------------ Retrieving data from parameters ------------------------------------
# List of image size:
img_size = parameters['img_size']
# Type of frontier (list or priority_queue)
frontier_type = parameters['frontier_type']
# Type of finding neighbors (find_first_before_run or find_while_running)
finding_neighbors = parameters['finding_neighbors']
# Choose UCS version
if finding_neighbors == 'find_while_running' and frontier_type == 'list':
UCS = norm_UCS_list
elif finding_neighbors == 'find_while_running' and frontier_type == 'priority_queue':
UCS = norm_UCS_priority_queue
elif finding_neighbors == 'find_first_before_run' and frontier_type == 'list':
UCS = UCS_list_with_init
# Type of heuristic function (manhattan; tie_breaking_high_g_cost)
global heuristics
heuristic_type = parameters['heuristic_type']
heuristic = heuristics[heuristic_type]
# Delta for heuristic functions
delta = 0.001
if 'delta' in parameters.keys():
delta = parameters['delta']
# Number of random starting points and ending points to run analysis
num_points = parameters['num_points']
# -------------------------------------------- Analysis process --------------------------------------------
# Store the analysis results
analysis_results = []
# Iterate over all the image size in data.keys()
for size in data.keys():
# Preparation
# Create a new AnalysisResults object and append it to the list
analysis_results.append(AnalysisResults(size))
# Create a random pair of start node and goal node
pair_node_list = random_initialization(num_points, size)
print("Image size: {}".format(size))
# Run the analysis for loop
for data_bin_key in data[size].keys():
i = 1
total_time = 0
total_steps_count = 0
num_images = len(data[size][data_bin_key])
for start_node, goal_node in pair_node_list:
# Create start_node and goal_node objects
start_node = Node(start_node)
goal_node = Node(goal_node)
# Iterate over all image in data[size][data_bin_key], for example if we want to access to the list of
# images of size 15 inside the data_bin (0,0.5) -> data[15][(0,0.5)]
for tmp_data in data[size][data_bin_key]:
# Different type of analysis for each finding_neighbors algorithm
if finding_neighbors == 'find_first_before_run':
# Make grid of neighbors first before run the algorithm
grid = make_grid(tmp_data, (size, size), 2)
start_time = time.time()
steps_count = UCS(
grid, tmp_data, start_node, goal_node, (size, size), 2, heuristic)
running_time = time.time() - start_time # Running time
else:
start_time = time.time()
steps_count = UCS(tmp_data, start_node,
goal_node, (size, size), 2, heuristic)
running_time = time.time() - start_time # Running time
total_time += running_time
total_steps_count += steps_count
print("Standard deviation range: {}".format(data_bin_key))
# Average running time
tmp_avg_run_time = total_time / num_images / num_points
analysis_results[-1].avg_run_time[data_bin_key] = tmp_avg_run_time
print("Average running time: {} s.".format(tmp_avg_run_time))
# Average steps count
tmp_avg_steps_count = total_steps_count // num_images // num_points
analysis_results[-1].avg_steps_count[data_bin_key] = tmp_avg_steps_count
print("Average steps counts: {} steps.".format(tmp_avg_steps_count))
print("------------------------------------")
return analysis_results