-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcache_gt.py
executable file
·119 lines (96 loc) · 3.51 KB
/
cache_gt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python3
#
# Copyright (C) 2021 Chi-kwan Chan
# Copyright (C) 2021 Steward Observatory
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from pathlib import Path
from itertools import product
from importlib import import_module
import numpy as np
import h5py
from tqdm import tqdm
from yaml import safe_load
from common import hallmark as hm
from common import mockservation as mock
def cache_gt(src_fmt, dst_fmt, img_fmt='ipole',
params=None, order=['snapshot'], cutoff=15e9, **kwargs):
io = import_module('common.io_' + img_fmt)
dlen = 0 # for pretty format in `tqdm`
# Find input models using hallmark `ParaFrame`
pf = hm.ParaFrame(src_fmt, **kwargs)
if len(pf) == 0:
print('No input found; please try different options')
exit(1)
# Automatically determine parameters if needed, turn `params` into
# a dict of parameters and their unique values
if params is None:
params = list(pf.keys())
params.remove('path')
for k in order:
params.remove(k)
params = {p:np.unique(pf[p]) for p in params}
# Main loop for generating multiple image caches
for values in product(*params.values()):
criteria = {p:v for p, v in zip(params.keys(), values)}
# Check output file
dst = Path(dst_fmt.format(**criteria))
if dst.is_file():
print(f' "{dst}" exists; SKIP')
continue
# Select models according to `criteria`
sel = pf
for p, v in criteria.items():
sel = sel(**{p:v})
if len(sel) == 0:
print(f' No input found for {criteria}; SKIP')
continue
# Pretty format in `tqdm`
desc = f'* "{dst}"'
desc = f'{desc:<{dlen}}'
dlen = len(desc)
# Make sure that the summary table is sorted correctly
for k in order:
sel = sel.sort_values(k)
# Actually load the images
mov = io.load_mov(tqdm(sel.path, desc=desc), pol=False)
m = mov.meta
N = max(mov.shape[-2:])
mov = mock.crop(mock.compress(mov, N=2*N, cutoff=float(cutoff)), m.width, m.height)
# Only touch file system if everything works
dst.parent.mkdir(parents=True, exist_ok=True)
with h5py.File(dst, 'w') as f:
f['data'] = mov
for k, v in mov.meta.dict().items():
if v is not None:
f['meta/'+k] = v
#==============================================================================
# Make cache_gt() callable as a script
import click
@click.command()
@click.argument('args', nargs=-1)
def cmd(args):
confs = []
params = {}
for arg in args:
if '=' in arg:
p = arg.split('=')
params[p[0]] = p[1]
else:
confs.append(arg)
for c in confs:
with open(c) as f:
cache_gt(**safe_load(f), **params)
if __name__ == '__main__':
cmd()