-
Notifications
You must be signed in to change notification settings - Fork 7
/
submit_multipass
executable file
·322 lines (304 loc) · 14.7 KB
/
submit_multipass
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
#!/usr/bin/env python
# Script for quickly submitting a hierarchy of multi-pass tod2map runs
# and postprocessing, including retrying failed jobs. etc.
# Format: submit_multipass [options] sel odir
from __future__ import division, print_function
import argparse, os, subprocess, re, sys, getpass
parser = argparse.ArgumentParser()
parser.add_argument("sel")
parser.add_argument("odir")
parser.add_argument("-n", "--nsplit", type=int, default=4)
parser.add_argument("-p", "--patch", type=str, default=None)
parser.add_argument( "--want", type=str, default=None, help="Shortcut for nodes, ntasks-per-node and cpus-per-task. Example --want 2:10:4 would ask for 2 nodes, 10 tasks per node and 4 cpus per task")
parser.add_argument("-N", "--nnode", type=int, default=4, help="Number of nodes to use. --want overries")
parser.add_argument( "--npernode",type=int, default=10, help="Tasks per node, --want overrides")
parser.add_argument( "--nomp", type=int, default=4, help="OMP threads per task. --want overriedes")
parser.add_argument("-t", "--ntry", type=int, default=1)
parser.add_argument("-P", "--npass", type=int, default=None)
parser.add_argument("-T", "--tblock", type=float, default=4)
parser.add_argument( "--time", type=str, default="24:00:00")
parser.add_argument( "--dry-run",action="store_true")
parser.add_argument( "--print-scripts", action="store_true")
parser.add_argument( "--sky", type=str, default="sky")
parser.add_argument( "--sys", type=str, default=None)
parser.add_argument("--split-mode", type=str, default="jon")
parser.add_argument("--no-prune", action="store_true")
parser.add_argument("--test", type=str, default=None)
parser.add_argument("--slice", type=str, default=None)
parser.add_argument("--prefix", type=str, default=None)
parser.add_argument("--suffix", type=str, default=None)
parser.add_argument("--daz", type=float, default=None)
parser.add_argument("--addsim", type=str, default=None)
parser.add_argument("--filter", type=str, default="none")
parser.add_argument("--order", type=int, default=None)
parser.add_argument("-c", "--cont", action="store_true")
parser.add_argument( "--queue", type=int, default=1)
parser.add_argument( "--user", type=str, default=None)
parser.add_argument( "--srcsub", action="store_true")
parser.add_argument( "--minpass",type=int, default=1)
parser.add_argument( "--downpat",type=str, default=None)
parser.add_argument( "--cgpat", type=str, default="200")
parser.add_argument( "--distributed", type=int, default=-1)
parser.add_argument( "--account",type=str, default=None)
args, unknown = parser.parse_known_args()
from enlib import utils, config, enmap, colors
tenkidir = os.environ["HOME"] + "/local/tenki"
config.init()
root = config.get("root")
cdir = os.getcwd()
jobid = 0 # for dry runs
echo = "echo " if args.test else ""
map_sys = "" if not args.sys else ",sys=%s" % args.sys
nnode = args.nnode
npernode = args.npernode
nomp = args.nomp
if args.want:
nnode, npernode, nomp = [int(w) for w in args.want.split(":")]
utils.mkdir(args.odir)
rundir = args.odir + "/runs"
workdir = args.odir + "/work"
utils.mkdir(rundir)
array_freqs = {"pa1":["f150"], "pa2":["f150"], "pa3":["f090","f150"],
"pa4":["f150","f220"], "pa5":["f090","f150"], "pa6":["f090","f150"],
"pa7":["f030","f040"]}
season_arrays = {"s13":["pa1"], "s14":["pa1","pa2"], "s15":["pa1","pa2","pa3"],
"s16":["pa2","pa3","pa4"], "s17":["pa4","pa5","pa6"], "s18":["pa4","pa5","pa6"],
"s19":["pa4","pa5","pa6"], "s20":["pa4","pa5","pa7"], "s21":["pa4","pa5","pa7"]}
accounts = {"rbond": "rrg-rbond-ac", "sievers": "rrg-sievers"}
account = args.account
if account and account in accounts:
account = accounts[account]
tag_map = {"+":"", "~":"no", ">":"_gt_", "<":"_lt_"}
def remap(toks, tag_map):
toks = list(toks)
for key in tag_map:
for i, tok in enumerate(toks):
toks[i] = tok.replace(key, tag_map[key])
return toks
# Sel format is normal, except there can be :-separated parts, which will
# be iterated over
toks = utils.split_outside(args.sel,",")
alts = [utils.split_outside(tok,":") for tok in toks]
def get_patch_file(toks, override=None):
if override is not None: return override
pfmt = root + "/area/%s.fits"
for tok in toks:
fname = pfmt % tok
if os.path.isfile(fname): return fname
raise ValueError("Can infer patch file")
def invalid_combination(toks):
# Find seasons, arrays and freqs
seasons, arrays, freqs = [], [], []
for tok in toks:
m = re.match(r"(s\d\d)", tok)
if m: seasons.append(m.group(1))
m = re.match(r"(pa\d)", tok)
if m: arrays.append(m.group(1))
m = re.match(r"\+?(f\d\d\d)", tok)
if m: freqs.append(m.group(1))
# We only handle the simple case of one array and one freq
if len(seasons) > 1 or len(arrays) != 1 or len(freqs) != 1:
return False
# Season may be a generalized season (e.g. "gs19"), but the same selection criteria
# apply to both, so we just chop off any leading g for this test
array, freq = arrays[0], freqs[0]
if len(seasons) == 1 and seasons[0][-3:] in season_arrays and array not in season_arrays[seasons[0][-3:]]: return True
if array not in array_freqs: return False
return freq not in array_freqs[array]
def get_queue(user):
res = []
lines = subprocess.check_output(['squeue', '-u', user, '-o', '%A %j %E %t']).decode().split("\n")
for line in lines[1:]:
toks = line.split()
if len(toks) == 0: continue
id, name, dep, status = toks
if dep == "(null)": dep = None
else:
m = re.match(r"afterok:(\w+)", dep) or re.match(r"afterany:(\w+)", dep)
if not m:
raise ValueError("Unrecognized dependency '%s'" % str(dep))
dep = m.group(1)
res.append([int(id), name, "R" in status, dep])
return res
def find_queue(queue, id=None, name=None):
if id is not None: v,ind = id, 0
elif name is not None: v,ind = name, 1
else: return -1,None
for i,e in enumerate(queue):
if e[ind] == v: return i,e
return -1, None
def clean_unrunnable(queue, id=None, name=None):
"""Check if the job with the given id is deadlocked, returning True
if it is, and canceling it and everything that depends on it."""
i, e = find_queue(queue, id=id, name=name)
if not e: return False
if clean_unrunnable(queue, id=e[3]):
if args.dry_run:
print("Would cancel unrunnable %d: %s" % (e[0],e[1]))
else:
subprocess.check_output(["scancel", e[0]])
print("Cancelled unrunnable %d: %s" % (e[0],e[1]))
del queue[i]
return True
else:
return False
if args.queue:
user = args.user or getpass.getuser()
queue = get_queue(user)
tag_prefix = "" if args.prefix is None else args.prefix + "_"
tag_suffix = "" if args.suffix is None else "_" + args.suffix
order_str = "" if args.order is None else ",order=%s" % str(args.order)
downpat = None if args.downpat is None else [int(w) for w in args.downpat.split(",")]
cgpat = [int(w) for w in args.cgpat.split(",")]
# Set up number of passes. It is the longest of the length of downpat and cgpat,
# unless npass is directly passed in which that overrides it
npass = len(cgpat)
if downpat is not None: npass = max(npass, len(downpat))
if args.npass is not None: npass = args.npass
# 1. First loop over our datasets (e.g. seasons, patches, arrays, frequencies etc.)
for toks in utils.list_combination_iter(alts):
# Skip invalid array-frequency combinations
if not args.no_prune and invalid_combination(toks): continue
# Expand variables in toks, based on the toks themselves. Only positional expansion supported
otoks = remap(toks, tag_map)
# 2. Loop over the individual splits
for i in range(args.nsplit):
prev_jobid = None
# 3. Loop over our multipass mapmaking passes
for ipass in range(args.minpass-1, npass):
nstep = cgpat[min(ipass,len(cgpat)-1)]
if args.split_mode == "jon":
osel = ",".join(toks) + ",int32(jon/%f)%%%d==%d" % (args.tblock,args.nsplit,i)
elif args.split_mode == "baz":
osel = ",".join(toks) + ",int32(((baz+180)%%360-180+200)/400.*%d)==%d" % (args.nsplit,i)
elif args.split_mode.startswith("file:"):
fname = args.split_mode[5:]
fname = fname.format(*otoks, i=i)
osel = ",".join(toks) + ",@" + fname
else: raise ValueError(args.split_mode)
if args.slice: osel += ":[" + args.slice + "]"
def get_otag(ipass):
return tag_prefix + "_".join(otoks) + tag_suffix + "_%dpass" % (ipass+1) + "_%dway_%d" % (args.nsplit, i)
otag = get_otag(ipass)
prefix = "%(odir)s/%(otag)s_%(sky)s" % {"odir":workdir, "otag":otag, "sky":args.sky}
prev_prefix = "%(odir)s/%(otag)s_%(sky)s" % {"odir":workdir, "otag":get_otag(ipass-1), "sky":args.sky}
# We can't run a pass if the previous pass is neither present on disk or queued up
if ipass > 0 and prev_jobid is None and not os.path.exists(prev_prefix + "_map_full.fits"):
print("Skipping deps misisng " + prefix)
continue
# Allow us to skip already done work if requested
if args.cont:
if os.path.exists(prefix + "_map_full.fits"):
print("Skipping done " + prefix)
prev_jobid = None
continue
elif args.queue:
# Are we already running or queued up?
qind, e = find_queue(queue, name=otag)
if e:
id, name, running, dep = e
if running:
# Already running
print("Skipping running %s with pid %d" % (otag, id))
prev_jobid = id
continue
else:
# queued up. But can it ever run?
if not clean_unrunnable(dep):
# Yes, can run. So don't submit
print("Skipping already queued %s with pid %d" % (otag, id))
prev_jobid = id
continue
else:
# Was queued, but queued was unrunnable and was cleaned up.
# So should submit after all
pass
patch_file = get_patch_file(otoks, args.patch)
shape, wcs = enmap.read_map_geometry(patch_file)
npix = shape[-2]*shape[-1]
# Determine the map type
if args.distributed == 0: map_type = "map"
elif args.distributed >= 1: map_type = "dmap"
else: map_type = "map" if npix < 4e7 else "dmap"
# Allow variable replacement in the unknown arguments
unknown_expanded = [a.format(*otoks, i=i) for a in unknown]
# 4. Ok, we can finally set up the actual job. This differs whether we are
# in the first or subsequent passes of multipass mapmaking. The first one
# can just be run as-is, but the later ones must add a sub:2 filter and
# debuddy refering to the output of the previous step. For these we need
# both source-free (for signal subtraction) and source-full (for buddy subtraction)
# maps. This script is supposed to be easy to use, so it will take care of all
# standard filters itself. That also means that we know that the main maps we
# get out will be source-free, and that we must add srcs to get the src-full map
#map_command = """%(echo)sOMP_NUM_THREADS=4 mpirun -ppn 10 python %(tenki)s/tod2map2.py --dmap_format=tiles -S %(sky)s:%(patch)s,type=%(map_type)s "%(osel)s" "%(odir)s" "%(otag)s" %(extra_args)s --map_cg_nmax=%(nstep)s""" % {
map_command = """%(echo)smyrun -npernode %(npernode)d -nomp %(nomp)d python %(tenki)s/tod2map2.py --dmap_format=tiles -S %(sky)s:%(patch)s,type=%(map_type)s%(order_str)s%(sys)s "%(osel)s" "%(odir)s" "%(otag)s" %(extra_args)s --map_cg_nmax=%(nstep)s""" % {
"sky":args.sky, "map_type":map_type, "osel":osel, "odir":workdir, "otag":otag, "extra_args":" ".join(unknown_expanded), "nstep":nstep, "tenki":tenkidir, "patch":patch_file, "order_str":order_str, "echo": echo, "npernode":npernode, "nomp":nomp, "sys":map_sys}
# Set up filters
if args.srcsub:
map_command += " -F src"
if ipass > 0:
ptag = get_otag(ipass-1)
prev_prefix = "%(odir)s/%(otag)s_%(sky)s" % {"odir":workdir, "otag":ptag, "sky":args.sky}
map_command += " -F buddy:map=%s_map_full.fits%s%s" % (prev_prefix,order_str,map_sys)
if args.srcsub:
map_command += " -F sub:2,map=%s_map_srcfree.fits%s%s" % (prev_prefix,order_str, map_sys)
else:
map_command += " -F sub:2,map=%s_map_full.fits%s%s" % (prev_prefix,order_str, map_sys)
if args.addsim:
map_command += " -F add:map=%s%s%s -F buddy:map=%s,mul=-1%s" % (args.addsim, order_str, map_sys, args.addsim, map_sys)
if args.filter in ["az","post"]:
val = 2 if args.filter == "post" else 1
if args.daz is None:
map_command += " -F scan:%d" % val
else:
map_command += " -F scan:%d,daz=%.6f" % (val,args.daz)
if downpat: map_command += " --downsample=%d" % downpat[ipass]
# Set up tidying to run after the mapmaker has finished. This overlaps slightly
# with the postprocessing, but here we only do the bare minimum needed to make
# multipass mapmaking work
if args.srcsub:
post_command = """%srm -f "%s_map_srcfree.fits" """ % (echo, prefix)
post_command += """ && %sln -s "%s_map%04d.fits" "%s_map_srcfree.fits" """ % (echo, otag+"_"+args.sky, nstep, prefix)
#post_command += """ && %sOMP_NUM_THREADS=4 mpirun -ppn 10 python %s/mapadd.py "%s_map_srcfree.fits" "%s_srcs.fits" "%s_map_full.fits" """ % (echo, tenkidir, prefix, prefix, prefix)
post_command += """ && %ssrun python %s/mapadd.py "%s_map_srcfree.fits" "%s_srcs.fits" "%s_map_full.fits" """ % (echo, tenkidir, prefix, prefix, prefix)
else:
post_command = """%sln -s "%s_map%04d.fits" "%s_map_full.fits" """ % (echo, otag+"_"+args.sky, nstep, prefix)
# Set up our slurm parameters
slurm_command = "#SBATCH --nodes %d --ntasks-per-node=%d --cpus-per-task=%d --time=%s\n" % (nnode, npernode, nomp, args.time)
slurm_command += "#SBATCH --job-name %s\n" % otag
if account and account != "default":
slurm_command += "#SBATCH --account %s\n" % account
# and the dependency list. First our previous try
deps = []
if prev_jobid is not None:
deps.append("afterok:%d" % prev_jobid)
if len(deps) > 0:
slurm_command += "#SBATCH --dependency=" + ",".join(deps) + "\n"
# Construct the full batch script
batch = """#!/bin/bash
%(slurm)s
cd "%(cdir)s"
%(map)s && %(post)s
""" % {"slurm":slurm_command, "cdir":cdir, "map":map_command, "post":post_command}
if args.test:
batch += "echo %s\n%s\n" % (args.test, args.test)
runfile = rundir + "/%s.txt" % (otag + "_try%d" % 1)
if not args.dry_run:
with open(runfile, "w") as f:
f.write(batch + "\n")
jobid = int(subprocess.check_output(["sbatch","--parsable",runfile]))
print("%sSubmitted %6d %s%s" % (colors.lgreen, jobid, otag, colors.reset))
else:
print("%sWould have submitted %6d %s%s: %s" % (colors.lgreen, jobid, otag, colors.reset, batch.replace("\n",";")))
jobid += 1
prev_jobid = jobid
if args.print_scripts:
print(batch)
# Copy our command line argument to the run directory, but avoid clobbering
if not args.dry_run:
for i in range(100):
ofile = rundir + "/submit%d.txt" % i
if os.path.isfile(ofile): continue
with open(ofile, "w") as f:
f.write("submit_multipass " + " ".join(sys.argv[1:]) + "\n")
break