-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
150 lines (132 loc) · 5.85 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import numpy as np
import os
import subprocess
import healpy as hp
def setup_output_dir(inp, env):
'''
Sets up directory for output files
ARGUMENTS
---------
inp: Info() object containing input specifications
env: environment object
RETURNS
-------
None
'''
if not os.path.isdir(inp.output_dir):
subprocess.call(f'mkdir {inp.output_dir}', shell=True, env=env)
if not os.path.isdir(f'{inp.output_dir}/maps'):
subprocess.call(f'mkdir {inp.output_dir}/maps', shell=True, env=env)
if not os.path.isdir(f'{inp.output_dir}/pyilc_yaml_files'):
subprocess.call(f'mkdir {inp.output_dir}/pyilc_yaml_files', shell=True, env=env)
if not os.path.isdir(f'{inp.output_dir}/pyilc_outputs'):
subprocess.call(f'mkdir {inp.output_dir}/pyilc_outputs', shell=True, env=env)
if not os.path.isdir(f'{inp.output_dir}/n_point_funcs'):
subprocess.call(f'mkdir {inp.output_dir}/n_point_funcs', shell=True, env=env)
if not os.path.isdir(f'{inp.output_dir}/data_vecs'):
subprocess.call(f'mkdir {inp.output_dir}/data_vecs', shell=True, env=env)
return
def tsz_spectral_response(freqs):
'''
ARGUMENTS
---------
freqs: 1D numpy array, contains frequencies (GHz) for which to calculate tSZ spectral response
RETURNS
---------
1D array containing tSZ spectral response to each frequency
'''
T_cmb = 2.726
h = 6.62607004*10**(-34)
kb = 1.38064852*10**(-23)
response = []
for freq in freqs:
x = h*(freq*10**9)/(kb*T_cmb) #x is v/56.9 GHz
response.append(T_cmb*(x*1/np.tanh(x/2)-4))
return np.array(response)
def GaussianNeedlets(inp, taper_width=0):
'''
Function from pyilc (https://github.com/jcolinhill/pyilc, https://arxiv.org/pdf/2307.01043.pdf)
ARGUMENTS
---------
inp: Info object containing input parameter specifications
taper_width: int, ell-space width of high ell taper. If 0, no taper is applied
RETURNS
--------
ell: array of ell values
filters: (N_scales, ellmax+1) numpy array containing filters at each scale
'''
FWHM_arcmin = np.array(inp.GN_FWHM_arcmin)
if ( any( i <= j for i, j in zip(FWHM_arcmin, FWHM_arcmin[1:]))):
raise AssertionError
ell = np.arange(inp.ell_sum_max+1)
N_scales = len(FWHM_arcmin) + 1
filters = np.zeros((N_scales, inp.ell_sum_max+1))
FWHM = FWHM_arcmin * np.pi/(180.*60.)
# define gaussians
Gaussians = np.zeros((N_scales-1, inp.ell_sum_max+1))
for i in range(N_scales-1):
Gaussians[i] = hp.gauss_beam(FWHM[i], lmax=inp.ell_sum_max)
# define needlet filters in harmonic space
filters[0] = Gaussians[0]
for i in range(1,N_scales-1):
filters[i] = np.sqrt(Gaussians[i]**2. - Gaussians[i-1]**2.)
filters[N_scales-1] = np.sqrt(1. - Gaussians[N_scales-2]**2.)
# simple check to ensure that sum of squared transmission is unity as needed for NILC algorithm
assert (np.absolute( np.sum( filters**2., axis=0 ) - np.ones(inp.ell_sum_max+1,dtype=float)) < 1.e-3).all(), "wavelet filter transmission check failed"
if taper_width:
taper_func = (1.0 - 0.5*(np.tanh(0.025*(ell - (inp.ell_sum_max - taper_width))) + 1.0)) #smooth taper to zero from ELLMAX-taper_width to ELLMAX
# taper_func *= 0.5*(np.tanh(0.5*(ell-10)))+0.5 #smooth taper to zero for low ell
else:
taper_func = np.ones_like(ell)
for i, filt in enumerate(filters):
filters[i] = filters[i]*taper_func
return ell, filters
def build_NILC_maps(inp, h, CMB_wt_maps, tSZ_wt_maps, freq_maps=None):
'''
Note that pyilc checks which frequencies to use for every filter scale
We include all freqs for all filter scales here
ARGUMENTS
---------
inp: Info object containing input parameter specifications
h: (N_scales, ellmax+1) ndarray containing needlet filters at each scale
CMB_wt_maps: (Nscales, Nfreqs=2, npix (variable for each scale and freq)) nested list,
contains NILC weight maps for preserved CMB
tSZ_wt_maps: (Nscales, Nfreqs=2, npix (variable for each scale and freq)) nested list,
contains NILC weight maps for preserved tSZ
freq_maps: (Nfreqs=2, 12*nside**2) ndarray containing simulated map at
each frequency to use in NILC map construction
RETURNS
-------
NILC_maps: (2 for CMB or tSZ preserved NILC map, 12*nside**2) ndarray
'''
if freq_maps is None:
freq_map1 = hp.read_map(f'{inp.output_dir}/maps/freq1.fits')
freq_map2 = hp.read_map(f'{inp.output_dir}/maps/freq2.fits')
freq_maps = [freq_map1, freq_map2]
NILC_maps = []
for p in range(2):
if p==0:
wt_maps = CMB_wt_maps
else:
wt_maps = tSZ_wt_maps
all_maps = np.zeros((inp.Nscales, 12*inp.nside**2)) #index as all_maps[scale][pixel]
for i in range(len(inp.freqs)):
map_ = freq_maps[i]
map_ = hp.ud_grade(map_, inp.nside)
alm_orig = hp.map2alm(map_)
for n in range(inp.Nscales):
alm = hp.almxfl(alm_orig, h[n]) #initial needlet filtering
map_ = hp.alm2map(alm, inp.nside)
NILC_weights = hp.ud_grade(wt_maps[n][i],inp.nside)
map_ = map_*NILC_weights #application of weight map
all_maps[n] = np.add(all_maps[n], map_) #add maps at all frequencies for each scale
T_ILC_n = None
for n in range(inp.Nscales):
T_ILC_alm = hp.map2alm(all_maps[n])
tmp = hp.almxfl(T_ILC_alm, h[n]) #final needlet filtering
if T_ILC_n is None:
T_ILC_n = np.zeros((inp.Nscales,len(tmp)),dtype=np.complex128)
T_ILC_n[n]=tmp
T_ILC = np.sum(np.array([hp.alm2map(T_ILC_n[n], inp.nside) for n in range(len(T_ILC_n))]), axis=0) #adding maps from all scales
NILC_maps.append(T_ILC)
return NILC_maps