-
Notifications
You must be signed in to change notification settings - Fork 8
/
fba_patch_202110
408 lines (364 loc) · 14.6 KB
/
fba_patch_202110
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
import os
import fitsio
import numpy as np
from glob import glob
from collections import Counter
from astropy.table import Table
import sys
sys.path.append('desicode')
from desimodel.footprint import radec2pix
_sec_cache = {}
def read_secondary(fn):
'''Read the given secondary targets file (FITS), with caching.'''
global _sec_cache
try:
return _sec_cache[fn]
except KeyError:
pass
scnd = fitsio.read(fn)
sidmap = dict([(tid,i) for i,tid in enumerate(scnd['TARGETID']) if tid > 0])
val = scnd,sidmap
# Only cache one!!
_sec_cache = {fn: val}
#_sec_cache[fn] = val
return val
def patch(infn = 'desi/target/fiberassign/tiles/trunk/001/fiberassign-001000.fits.gz',
desiroot = '/global/cfs/cdirs/desi',
outfn = None,
):
'''Repair the data corruption reported in https://github.com/desihub/fiberassign/pull/375
for a given single fiberassign file.
The patching proceeds by reading the FIBERASSIGN hdu, while checking the headers
for the locations of targeting files used.
Based on TARGETID, we match objects in the FIBERASSIGN file and
the targeting files. For a subset of columns, we check for cases
where the targeting files contain different values, and patch them
in to the FIBERASSIGN table.
First we read the primary target files themselves, then secondary
targets, then ToO targets.
If any rows are changed, then an updated file is written out, preserving all other HDUs.
'''
log_to_string = True
logstr = ''
def log(*a):
nonlocal logstr
if log_to_string:
logstr += ' '.join([str(s) for s in a]) + '\n'
else:
print(*a)
stats = {}
patched_rows = set()
patched_neg_tids = False
log('Reading', infn)
F = fitsio.FITS(infn)
primhdr = F[0].read_header()
#tilera = primhdr['TILERA']
#tiledec = primhdr['TILEDEC']
tileid = primhdr['TILEID']
fa_surv = primhdr['FA_SURV']
stats.update(infn=infn, tileid=tileid, fa_surv=fa_surv)
# Find targeting files / directories in the headers.
#mtldir = primhdr['MTL']
targdirs = [primhdr['TARG']]
for i in range(2, 100):
key = 'TARG%i' % i
if not key in primhdr:
break
targdirs.append(primhdr[key])
scndfn = primhdr.get('SCND', '').strip()
toofn = primhdr.get('TOO', '')
# Swap in the local file locations. (Header values may start with the string "DESIROOT")
#mtldir = mtldir.replace('DESIROOT', desiroot)
targdirs = [targdir.replace('DESIROOT', desiroot)
for targdir in targdirs]
scndfn = scndfn.replace('DESIROOT', desiroot)
toofn = toofn.replace('DESIROOT', desiroot)
# Tile 80611 has /data as the prefix.
pre = '/data/'
targdirs = [targdir.replace(pre, desiroot+'/')
if targdir.startswith(pre)
else targdir
for targdir in targdirs]
if scndfn.startswith(pre):
scndfn = scndfn.replace(pre, desiroot+'/')
if toofn.startswith(pre):
toofn = toofn.replace(pre, desiroot+'/')
# Tile 80687 has SCND a directory, not a file.
if os.path.isdir(scndfn):
# Tile 80687 has SCND a directory that contains two files, with the same number of rows
# but one includes DR9 photometry columns.
fn = os.path.join(scndfn, 'sv1targets-dark-secondary-dr9photometry.fits')
if os.path.exists(fn):
log('Special-case updated SCND from', scndfn, 'to', fn)
scndfn = fn
# Similar with 80717.
fn = os.path.join(scndfn, 'sv1targets-bright-secondary-dr9photometry.fits')
if os.path.exists(fn):
log('Special-case updated SCND from', scndfn, 'to', fn)
scndfn = fn
# Special-case a missing SCND file from sv2 (tile 81096+)
if scndfn == '/global/cfs/cdirs/desi/target/catalogs/dr9/0.53.0/targets/sv2/secondary/dark/sv2targets-dark-secondary.fits':
log('Ignoring SCND file', scndfn)
scndfn = ''
oldfn = '/global/cfs/cdirs/desi/target/catalogs/dr9/0.58.0/targets/main/secondary/dark/maintargets-dark-secondary.fits'
if scndfn == oldfn:
newfn = '/global/cfs/cdirs/desi/target/catalogs/dr9/0.58.0/targets/main/secondary/dark/targets-dark-secondary.fits'
log('Replacing', oldfn, 'with', newfn)
scndfn = newfn
stats.update(scndfn=scndfn, toofn=toofn)
for i,t in enumerate(targdirs):
stats['targdir%i' % i] = t
# Read the original FIBERASSIGN table.
hduname = 'FIBERASSIGN'
ff = F[hduname]
tab = ff.read()
# We're going to match based on (positive) TARGETID.
targetid = tab['TARGETID']
ra = tab['TARGET_RA']
dec = tab['TARGET_DEC']
# Per https://github.com/desihub/fiberassign/issues/385,
# Swap in unique values for negative TARGETIDs.
I = np.flatnonzero(targetid < 0)
stats.update(neg_targetids=len(I), pos_targetids=len(np.flatnonzero(targetid>0)))
if len(I):
tab['TARGETID'][I] = -(tileid * 10000 + tab['LOCATION'][I])
log('Updated', len(I), 'negative TARGETID values')
patched_neg_tids = True
# Start with primary target files. These are split into healpixes, so look up which healpixes
# contain targets used in this tile.
# Some objects have NaN TARGET_RA; skip these when looking up the healpix
Iok = np.flatnonzero(np.isfinite(ra))
nside = 8
hps = radec2pix(nside, ra[Iok], dec[Iok])
# Read relevant healpixes.
alltargets = []
tidset = set(targetid[targetid > 0])
for hp in np.unique(hps):
foundhp = False
for targdir in targdirs:
pat = os.path.join(targdir, '*-hp-%i.fits' % hp)
#log(pat)
fns = glob(pat)
#log(fns)
if len(fns) != 1:
log('Searching for pattern', pat, 'found files', fns, '; expected 1 file.')
assert(len(fns) <= 1)
if len(fns) == 0:
continue
foundhp = True
fn = fns[0]
T = fitsio.read(fn)
tid = T['TARGETID']
I = np.flatnonzero([t in tidset for t in tid])
log('Read targets', fn, '->', len(T), 'targets,', len(I), 'matching (positive) TARGETIDs')
if len(I) == 0:
continue
alltargets.append(T[I])
assert(foundhp)
# Merge targets and match on TARGETID.
targets = np.hstack(alltargets)
tidmap = dict([(tid,i) for i,tid in enumerate(targets['TARGETID']) if tid>0])
I = np.array([tidmap.get(t, -1) for t in targetid])
J = np.flatnonzero(I >= 0)
I = I[J]
log('Checking', len(I), 'matched TARGETIDs')
stats.update(matched_targets=len(I))
# We use this function to test for equality between arrays -- both
# being non-finite (eg NaN) counts as equal.
def equal(a, b):
bothnan = False
try:
bothnan = np.logical_not(np.isfinite(a)) * np.logical_not(np.isfinite(b))
except:
pass
return np.logical_or(a == b, bothnan)
# This is the subset of columns we patch, based on the bug report.
for col in ['BRICK_OBJID', 'BRICKID', 'RELEASE', 'FLUX_G', 'FLUX_R', 'FLUX_Z',
'FLUX_IVAR_G', 'FLUX_IVAR_R', 'FLUX_IVAR_Z',
'REF_CAT', 'GAIA_PHOT_G_MEAN_MAG', 'GAIA_PHOT_BP_MEAN_MAG', 'GAIA_PHOT_RP_MEAN_MAG',
'MASKBITS', 'REF_ID', 'MORPHTYPE']:
old = tab[col][J]
new = targets[col][I]
eq = equal(old, new)
#log('Target catalogs:', col, 'equal:', Counter(eq))
if not np.all(eq):
diff = np.flatnonzero(np.logical_not(eq))
log('Target catalogs: col', col, 'patching', len(diff), 'rows')
log(' rows', J[diff][:5])
log(' old vals', tab[col][J[diff]][:5])
log(' new vals', new[diff][:5])
tab[col][J[diff]] = new[diff]
patched_rows.update(J[diff])
stats.update(patched_rows_targets=len(patched_rows))
# Were secondary targets used for this tile?
if scndfn == '-':
# eg fiberassign-081000.fits.gz
scndfn = ''
if len(scndfn):
log('Reading secondary targets file:', scndfn)
scnd,sidmap = read_secondary(scndfn)
log('Read', len(scnd), 'secondaries from', scndfn)
# Match on targetid
I = np.array([sidmap.get(t, -1) for t in targetid])
J = np.flatnonzero(I >= 0)
I = I[J]
log('Matched', len(I), 'secondary targets on TARGETID')
stats.update(matched_scnd=len(I))
for col in ['FLUX_G', 'FLUX_R', 'FLUX_Z',
'GAIA_PHOT_G_MEAN_MAG', 'GAIA_PHOT_BP_MEAN_MAG', 'GAIA_PHOT_RP_MEAN_MAG',]:
# if not col in tab.dtype.fields:
# log('No column', col, 'in original FIBERASSIGN table')
# continue
if not col in scnd.dtype.fields:
log('No column', col, 'in secondary table')
continue
old = tab[col][J]
new = scnd[col][I]
eq = equal(old, new)
#log(col, Counter(eq))
if not np.all(eq):
diff = np.flatnonzero(np.logical_not(eq))
log('Secondary targets: col', col, 'patching', len(diff), 'rows')
log(' rows', J[diff][:5])
log(' old vals', tab[col][J[diff]][:5])
log(' new vals', new[diff][:5])
tab[col][J[diff]] = new[diff]
patched_rows.update(J[diff])
stats.update(patched_rows_scnd=len(patched_rows))
# ToO files -- we're going to ignore the header values and look
# up the file location based on FA_SURV.
toomap = {
'sv3': '/global/cfs/cdirs/desi/survey/ops/surveyops/trunk/mtl/sv3/ToO/ToO.ecsv',
'main': '/global/cfs/cdirs/desi/survey/ops/surveyops/trunk/mtl/main/ToO/ToO.ecsv',
'cmx': '',
'sv2': '',
'sv1': '',
}
if not fa_surv in toomap:
log('Failed to find ToO file for FA_SURV', fa_surv)
toofn = toomap[fa_surv]
# Were ToO targets used in this tile?
if len(toofn):
log('Reading ToO file', toofn)
too = Table.read(toofn)
log('Read', len(too), 'ToO from', toofn)
toomap = dict([(tid,i) for i,tid in enumerate(too['TARGETID']) if tid > 0])
I = np.array([toomap.get(t, -1) for t in targetid])
J = np.flatnonzero(I >= 0)
I = I[J]
log('Matched', len(I), 'ToO targets on TARGETID')
stats.update(matched_too=len(I))
for col in ['FLUX_G', 'FLUX_R', 'FLUX_Z',
'GAIA_PHOT_G_MEAN_MAG', 'GAIA_PHOT_BP_MEAN_MAG', 'GAIA_PHOT_RP_MEAN_MAG',]:
old = tab[col][J]
new = too[col][I]
eq = equal(old, new)
if not np.all(eq):
diff = np.flatnonzero(np.logical_not(eq))
log('ToO targets: col', col, 'patching', len(diff), 'rows')
tab[col][J[diff]] = new[diff]
patched_rows.update(J[diff])
stats.update(patched_rows_too=len(patched_rows))
if len(patched_rows) == 0 and not patched_neg_tids:
log('No need to patch', infn)
stats.update(patched=0)
else:
log('Patched', len(patched_rows), 'data rows')
stats.update(patched=1)
# Make sure output directory exists
outdir = os.path.dirname(outfn)
if not os.path.exists(outdir):
os.makedirs(outdir)
assert(outfn != infn)
# Write out output file, leaving other HDUs unchanged.
Fout = fitsio.FITS(outfn, 'rw', clobber=True)
for ext in F:
#log(ext.get_extname())
extname = ext.get_extname()
hdr = ext.read_header()
data = ext.read()
if extname == 'PRIMARY':
# fitsio will add its own headers about the FITS format, so trim out all COMMENT cards.
newhdr = fitsio.FITSHDR()
for r in hdr.records():
if r['name'] == 'COMMENT':
#log('Skipping comment:', r['name'], r['value'])
continue
newhdr.add_record(r)
hdr = newhdr
if extname == hduname:
# Swap in our updated FIBERASSIGN table!
data = tab
Fout.write(data, header=hdr, extname=extname)
Fout.close()
log('Wrote', outfn)
if log_to_string:
print(logstr)
return stats
def main():
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument('files', type=str, nargs='*', help='Fiberassign files to patch')
parser.add_argument('--in-base', default='/global/cfs/cdirs/desi/target/fiberassign/tiles/trunk',
help='Input base path')
parser.add_argument('--outdir', default='patched-fa')
parser.add_argument('--stats', default='patch-fa-stats.fits',
help='Output filename for patching statistics.')
parser.add_argument('--threads', type=int, help='Multiprocessing threads')
opt = parser.parse_args()
args = []
fa_base = opt.in_base
for infn in opt.files:
#infn = os.path.join(fa_base, '001/fiberassign-001000.fits.gz')
if not infn.startswith(fa_base):
print('All input filenames must start with --in-base = ', fa_base)
return -1
outfn = infn.replace(fa_base, opt.outdir+'/')
args.append(dict(infn=infn, outfn=outfn))
if opt.threads:
from astrometry.util.multiproc import multiproc
mp = multiproc(opt.threads)
allstats = mp.map(_bounce_patch, args)
else:
allstats = []
for kw in args:
stats = patch(**kw)
allstats.append(stats)
print()
# Collate and write out stats.
allcols = set()
for s in allstats:
allcols.update(s.keys())
allcols = list(allcols)
allcols.sort()
print('allcols:', allcols)
data = dict([(c,[]) for c in allcols])
# if not ''
blankvals = dict(
tileid=-1,
neg_targetids=-1,
pos_targetids=-1,
matched_targets=-1,
patched_rows_targets=-1,
matched_scnd=-1,
patched_rows_scnd=-1,
matched_too=-1,
patched_rows_too=-1,
patched=-1,
)
for s in allstats:
for c in allcols:
try:
data[c].append(s[c])
except KeyError:
data[c].append(blankvals.get(c, ''))
for c in allcols:
#print('Column', c, 'length', len(data[c]))
data[c] = np.array(data[c])
#print('Np array:', len(data[c]))
#print(data[c])
fitsio.write(opt.stats, data, names=allcols, clobber=True)
def _bounce_patch(x):
return patch(**x)
if __name__ == '__main__':
main()