-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcomparison_ivar_obstacle_1teams.py
99 lines (72 loc) · 2.91 KB
/
comparison_ivar_obstacle_1teams.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from multiprocessing import Pool
from typing import Optional, Callable
from tqdm import tqdm
from python.algorithm import MapfAlgorithm
from python.benchmarks.extensions_25percent_3teams import read_from_file
from python.benchmarks.graph_times import graph_results
from python.benchmarks.inmatch_vs_prematch_75percent_1teams import output_data
import pathlib
from python.benchmarks.parse_map import MapParser
from python.benchmarks.run_with_timeout import run_with_timeout
from python.mstar.rewrite import Config, MatchingStrategy
from python.mstar.rewrite.config import GigaByte
from python.solvers.configurable_mstar_solver import ConfigurableMStar
this_dir = pathlib.Path(__file__).parent.absolute()
name = "comparison_obstacle_1teams_maps"
processes = 2
def run(solver: Callable[[], MapfAlgorithm], bm_name: str):
batchdir = this_dir / name
parser = MapParser(batchdir)
fname = batchdir / f"results_{bm_name}.txt"
if fname.exists():
print(f"data exists for {bm_name}")
return fname, bm_name
# num agents : solutions
results: dict[int, list[Optional[float]]] = {}
all_problems = [[i[1] for i in parser.parse_batch(n.name)] for n in batchdir.iterdir() if n.is_dir()]
all_problems.sort(key=lambda i: len(i[0].goals))
with Pool(processes) as p:
for problems in tqdm(all_problems):
num_agents = len(problems[0].goals)
partname = pathlib.Path(str(fname) + f".{num_agents}agents")
if partname.exists():
print(f"found data for part {num_agents}")
results[num_agents] = read_from_file(partname, num_agents)
continue
if num_agents <= 1 or sum(1 for i in results[num_agents - 1] if i is not None) != 0:
sols_inmatch = run_with_timeout(p, solver(), problems, 2 * 60)
tqdm.write(f"{bm_name} with {num_agents} agents: {sols_inmatch}")
results[num_agents] = sols_inmatch
else:
results[num_agents] = [None for i in range(len(problems))]
output_data(partname, results)
tqdm.write(str(results))
output_data(fname, results)
return fname, bm_name
def main():
batchdir = this_dir / name
files: list[tuple[pathlib.Path, str]] = []
files.append(run(
lambda: ConfigurableMStar(
Config(
operator_decomposition=True,
precompute_paths=False,
precompute_heuristic=True,
collision_avoidance_table=False,
recursive=False,
matching_strategy=MatchingStrategy.SortedPruningPrematch,
max_memory_usage=3 * GigaByte,
debug=False,
report_expansions=False,
),
),
"M*"
))
graph_results(
*files,
batchdir / f"{name}",
save=True,
bounds=False,
)
if __name__ == '__main__':
main()