-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanager.py
202 lines (168 loc) · 8.5 KB
/
manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import argparse
import logging
import os
import shutil
import sys
import warnings
from multiprocessing import Pool
import GPUtil
from booster.utils import logging_sep
from tqdm import tqdm
from ovis.utils.dbutils import FileLockedTinyDB
from ovis.utils.dbutils import requeue_experiments
from ovis.utils.manager import snapshot_dir, read_experiment_json_file, get_abs_paths, retrieve_exp_and_run
from ovis.utils.utils import Header
def run_manager():
"""
Run a set of experiments defined as a json file in `experiments/` using mutliprocessing.
This script is a quick & dirty implementation of a queue system using `filelock` and `tinydb`.
The manager creates a snapshot of the library to ensure consistency between runs. You may update the snapshot
manually using `--update_lib` or update the experiment file using `--update_exp`.
In that case starting a new manager using `--resume` will append potential new experiments to the database.
Use `--rf` to delete an existing experiment. Example:
```bash
python manager.py --exp gaussian-mixture-model --max_gpus 4 --processes 2
```
Check the experiment status using `dbutils.py` (see `python dbutils.py --help` for details about requeuing exps):
```bash
python dbutils.py --exp gaussian-mixture-model --check
```
Troubleshooting: If the `FileLockedTinyDB` exits without properly removing the `.lock` file, you may need to delete
it manually using `--purge_lock`. This must be used carefully as it may alter ongoing database transactions.
"""
parser = argparse.ArgumentParser()
parser.add_argument('--script', default='run.py',
help='script name')
parser.add_argument('--root', default='runs/',
help='experiment directory')
parser.add_argument('--data_root', default='data/',
help='data directory')
parser.add_argument('--exp', default='gaussian-mixture-model', type=str,
help='experiment id pointing to a .json file in experiments/')
parser.add_argument('--max_gpus', default=8, type=int,
help='maximum number of GPUs')
parser.add_argument('--max_load', default=0.5, type=float,
help='only use GPUs with load < `max_load`')
parser.add_argument('--max_memory', default=0.01, type=float,
help='only use GPUs with memory < `max_memory`')
parser.add_argument('--processes', default=1, type=int,
help='number of processes per GPU')
parser.add_argument('--rf', action='store_true', help='delete the previous experiment')
parser.add_argument('--resume', action='store_true',
help='resume an experiment')
parser.add_argument('--update_exp', action='store_true',
help='update snapshot experiment file')
parser.add_argument('--update_lib', action='store_true',
help='update the entire snapshot')
parser.add_argument('--max_jobs', default=-1, type=int,
help='maximum jobs per thread (stop after `max_jobs` jobs)')
parser.add_argument('--requeue_level', default=1, type=int,
help='[db] Requeue level {0: nothing, 1: aborted_by_user/not_found, 2: failed, 100: not completed}')
parser.add_argument('--purge_lock', action='store_true',
help='purge the `.lock` file [use only if the `.lock` file has not been properly removed].')
opt = parser.parse_args()
# get the list of devices
device_ids = GPUtil.getAvailable(order='memory',
limit=opt.max_gpus,
maxLoad=opt.max_load,
maxMemory=opt.max_memory,
includeNan=False,
excludeID=[],
excludeUUID=[])
if len(device_ids):
device_ids = [f"cuda:{d}" for d in device_ids]
else:
device_ids = ['cpu']
# total number of processes
processes = opt.processes * len(device_ids)
# get absolute path to logging directories
exps_root, exp_root, exp_data_root = get_abs_paths(opt.root, opt.exp, opt.data_root)
if os.path.exists(exp_root):
warnings.warn(f"logging directory `{exp_root}` already exists.")
if not opt.resume:
if opt.rf:
warnings.warn(f"Deleting existing logging directory `{exp_root}`.")
shutil.rmtree(exp_root)
else:
sys.exit()
# copy library to the `snapshot` directory
if not opt.resume:
shutil.copytree('./', snapshot_dir(exp_root),
ignore=shutil.ignore_patterns('.*', '*.git', 'runs', 'reports', 'data', '__pycache__'))
if opt.update_lib:
# move original lib
shutil.move(snapshot_dir(exp_root), f"{snapshot_dir(exp_root)}-saved")
# copy lib
shutil.copytree('./', snapshot_dir(exp_root),
ignore=shutil.ignore_patterns('.*', '*.git', 'runs', 'reports', 'data', '__pycache__', '*.psd'))
# udpate experiment file
if opt.update_exp:
_exp_file = f'experiments/{opt.exp}.json'
shutil.copyfile(_exp_file, os.path.join(snapshot_dir(exp_root), _exp_file))
# move path to the snapshot directory to ensure consistency between runs (lib will be loaded from `./lib_snapshot/`)
os.chdir(snapshot_dir(exp_root))
# logging
logging.basicConfig(level=logging.INFO, handlers=[logging.StreamHandler()])
logger = logging.getLogger('exp-manager')
# log config
print(logging_sep("="))
logger.info(
f"Available devices: {device_ids}")
logger.info(
f"Experiment id = {opt.exp}, running {opt.processes} processes/device, logdir = {exp_root}")
print(logging_sep())
# read the experiment file
experiment_args = read_experiment_json_file(opt.exp)
# replace the default `script` argument if specified
if "script" in experiment_args.keys():
opt.script = experiment_args.pop("script")
# define the arguments for each run
args = experiment_args['args'] # retrieve the list of arguments
args = [experiment_args['global'] + " " + a for a in args] # append the `global` arguments
args = [f"--exp {opt.exp} --root {exps_root} --data_root {exp_data_root} {a}" for a in
args] # append specific parameters
if "parameters" in experiment_args.keys():
for _arg, values in experiment_args["parameters"].items():
_args = []
for v in values:
if isinstance(v, bool):
if v:
_args += [f"--{_arg} " + a for a in args]
else:
_args += args
else:
_args += [f"--{_arg} {v} " + a for a in args]
args = _args
# remove the lock file manually
if opt.purge_lock:
FileLockedTinyDB(exp_root).purge()
# write all experiments to `tinydb` database guarded by a `filelock`
with FileLockedTinyDB(exp_root) as db:
query = db.query()
# add missing exps (when using `--resume` + `--update_exp`)
n_added = 0
for i, a in enumerate(args):
if not db.contains(query.arg == a):
db.insert({'arg': a, 'queued': True, "job_id": "none"})
n_added += 1
# potentially check the database status requeue fail experiment
if opt.script == 'run.py': # only handled for `run.py` because matching exps relies on `get_run_parser`
with Header(f"Status"):
requeue_experiments(exp_root, opt.requeue_level)
# count queued exps and total exps
with FileLockedTinyDB(exp_root) as db:
n_queued_exps = db.count(query.queued == True)
n_exps = len(db)
# remaining queued experiments
logger.info(f"Queued experiments : {n_queued_exps} / {n_exps}. Added exps. {n_added}")
# run processes in parallel (spawning `processes` processes)
pool = Pool(processes=processes)
job_args = [{"opt": opt, "exp_root": exp_root, "devices": device_ids} for _ in range(n_queued_exps)]
if opt.max_jobs > 0:
job_args = job_args[:opt.max_jobs * processes]
logger.info(f"Max. number of jobs = {len(job_args)}, processes = {processes} [{len(device_ids)} devices]")
for _ in tqdm(pool.imap_unordered(retrieve_exp_and_run, job_args, chunksize=1), total=n_queued_exps,
desc="Job Manager"):
pass
if __name__ == '__main__':
run_manager()