Skip to content

Commit

Permalink
Flatten out delay filtered z-score with linear algebra tricks, change…
Browse files Browse the repository at this point in the history
… defaults after parameter exploration
  • Loading branch information
jsdillon committed Oct 11, 2024
1 parent b757bcb commit 446c369
Showing 1 changed file with 69 additions and 75 deletions.
144 changes: 69 additions & 75 deletions notebooks/delay_filtered_average_zscore.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"source": [
"# Single File Delay Filtered Average Z-Score\n",
"\n",
"**by Josh Dillon**, last updated May 12, 2024\n",
"**by Josh Dillon**, last updated October 10, 2024\n",
"\n",
"This notebook is designed to calculate a metric used for finding low-level RFI in redundantly-averaged cross-correlations, which are then incoherently averaged across well-sampled baselines.\n",
"\n",
Expand Down Expand Up @@ -46,7 +46,7 @@
"import numpy as np\n",
"import copy\n",
"import glob\n",
"from hera_cal import io, utils, redcal, apply_cal, datacontainer, vis_clean\n",
"from hera_cal import io, utils, redcal, apply_cal, datacontainer, vis_clean, noise\n",
"from hera_filters import dspec\n",
"from pyuvdata import UVFlag, UVData\n",
"from scipy import constants\n",
Expand All @@ -65,7 +65,7 @@
"source": [
"# get input data file names\n",
"SUM_FILE = os.environ.get(\"SUM_FILE\", None)\n",
"# SUM_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459861/zen.2459861.59008.sum.uvh5'\n",
"# SUM_FILE = '/lustre/aoc/projects/hera/h6c-analysis/IDR2/2459861/zen.2459861.35453.sum.uvh5'\n",
"SUM_SUFFIX = os.environ.get(\"SUM_SUFFIX\", 'sum.uvh5')\n",
"\n",
"# get input calibration files and flags\n",
Expand All @@ -79,8 +79,8 @@
"# get delay filtering parameters\n",
"FM_LOW_FREQ = float(os.environ.get(\"FM_LOW_FREQ\", 87.5)) # in MHz\n",
"FM_HIGH_FREQ = float(os.environ.get(\"FM_HIGH_FREQ\", 108.0)) # in MHz\n",
"MIN_SAMP_FRAC = float(os.environ.get(\"MIN_SAMP_FRAC\", .05))\n",
"FILTER_DELAY = float(os.environ.get(\"FILTER_DELAY\", 500)) # in ns\n",
"MIN_SAMP_FRAC = float(os.environ.get(\"MIN_SAMP_FRAC\", .15))\n",
"FILTER_DELAY = float(os.environ.get(\"FILTER_DELAY\", 750)) # in ns\n",
"EIGENVAL_CUTOFF = float(os.environ.get(\"EIGENVAL_CUTOFF\", 1e-12))\n",
"\n",
"for setting in ['SUM_FILE', 'SMOOTH_CAL_FILE', 'ZSCORE_OUTFILE', 'FM_LOW_FREQ', 'FM_HIGH_FREQ', \n",
Expand Down Expand Up @@ -211,50 +211,72 @@
"id": "1c3ec0d8",
"metadata": {},
"source": [
"## Delay filter redundantly-averaged data"
"## Delay filter redundantly-averaged SNRs"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f4d700b9",
"id": "f6c582d3-78c4-45e7-8ff5-b6bf35c11f8d",
"metadata": {},
"outputs": [],
"source": [
"if not ALL_FLAGGED:\n",
" t = time.time()\n",
" # compute weights based on RFI flags and autocorrelations, assumes that nsamples is constant across a spectrum\n",
" wgts = {}\n",
"\n",
" # insert auto_bls into red_avg_data for the purpose of calculating noise\n",
" all_ants_in_keys = set([ant for bl in red_avg_data.keys() for ant in bl[0:2]])\n",
" for pol in ['ee', 'nn']:\n",
" rfi_flags = np.all([red_avg_flags[bl] for bl in red_avg_flags if bl[2] == pol], axis=0)\n",
" auto_bl = [bl for bl in red_avg_data if bl[0] == bl[1] and bl[2] == pol][0]\n",
" wgts[pol] = np.where(rfi_flags, 0, np.abs(red_avg_data[auto_bl])**-2)\n",
" wgts[pol] /= np.nanmean(np.where(rfi_flags, np.nan, wgts[pol])) # avoid dynamic range issues\n",
" wgts[pol][~np.isfinite(wgts[pol])] = 0\n",
" \n",
" for ant in all_ants_in_keys:\n",
" if (ant, ant, pol) not in red_avg_data:\n",
" red_avg_data[(ant, ant, pol)] = red_avg_data[auto_bl]\n",
"\n",
" # predict noise to compute red_avg_SNRs\n",
" red_avg_SNRs = copy.deepcopy(red_avg_data)\n",
" dt = np.median(np.diff(hd.times)) * 24 * 3600\n",
" df = np.median(np.diff(hd.freqs)) \n",
" for bl in red_avg_SNRs:\n",
" if bl[0] != bl[1]:\n",
" noise_var = noise.predict_noise_variance_from_autos(bl, red_avg_data, dt=dt, df=df, nsamples=red_avg_nsamples)\n",
" red_avg_SNRs[bl] /= noise_var**.5\n",
"\n",
" # pick out baselines with enough median nsamples and light-travel times shorter than the filter delay\n",
" min_nsamples = np.max([np.max(red_avg_nsamples[bl]) for bl in red_avg_nsamples]) * MIN_SAMP_FRAC\n",
" bls_to_filter = [bl for bl in red_avg_data if (np.median(red_avg_nsamples[bl]) >= min_nsamples)]\n",
" max_nsamples_by_pol = {pol: np.max([np.max(red_avg_nsamples[bl]) for bl in red_avg_nsamples if bl[2] == pol]) for pol in ['ee', 'nn']}\n",
" bls_to_filter = [bl for bl in red_avg_data if (np.median(red_avg_nsamples[bl]) >= (max_nsamples_by_pol[bl[2]] * MIN_SAMP_FRAC))]\n",
" bls_to_filter = [bl for bl in bls_to_filter if np.linalg.norm(hd.antpos[bl[0]] - hd.antpos[bl[1]]) / constants.c * 1e9 < FILTER_DELAY]\n",
"\n",
" bls_to_filter = [bl for bl in bls_to_filter if bl[0] != bl[1]]\n",
" \n",
" # perform delay filter\n",
" wgts = (~np.all(list(red_avg_flags.values()), axis=0)).astype(float)\n",
" cache = {}\n",
" dly_filt_red_avg_data = copy.deepcopy(red_avg_data)\n",
" dly_filt_SNRs = copy.deepcopy(red_avg_SNRs)\n",
" for bl in bls_to_filter:\n",
" d_mdl = np.zeros_like(dly_filt_red_avg_data[bl])\n",
" d_mdl = np.zeros_like(dly_filt_SNRs[bl])\n",
" for band in [low_band, high_band]:\n",
" d_mdl[:, band], _, info = dspec.fourier_filter(hd.freqs[band], dly_filt_red_avg_data[bl][:, band], \n",
" wgts=wgts[bl[2]][:, band], filter_centers=[0], \n",
" d_mdl[:, band], _, info = dspec.fourier_filter(hd.freqs[band], \n",
" dly_filt_SNRs[bl][:, band], \n",
" wgts=wgts[:, band], filter_centers=[0], \n",
" filter_half_widths=[FILTER_DELAY / 1e9], mode='dpss_solve', \n",
" eigenval_cutoff=[EIGENVAL_CUTOFF], suppression_factors=[EIGENVAL_CUTOFF], \n",
" max_contiguous_edge_flags=len(hd.freqs), cache=cache)\n",
" dly_filt_red_avg_data[bl] = np.where(red_avg_flags[bl], 0, red_avg_data[bl] - d_mdl)\n",
" print(f'Finished delay-filtering in {(time.time() - t) / 60:.2f} minutes.')"
" dly_filt_SNRs[bl] = np.where(red_avg_flags[bl], 0, red_avg_SNRs[bl] - d_mdl)\n",
"\n",
" # calculate and apply correction factor based on the leverage to flatten out the SNR\n",
" correction_factors = np.full_like(wgts, np.nan)\n",
" for band in [low_band, high_band]:\n",
" X = dspec.dpss_operator(hd.freqs[band], [0], filter_half_widths=[FILTER_DELAY / 1e9], eigenval_cutoff=[EIGENVAL_CUTOFF])[0]\n",
" for tind in range(wgts.shape[0]):\n",
" W = wgts[tind, band]\n",
" leverage = np.diag(X @ np.linalg.pinv(np.dot(X.T * W, X)) @ (X.T * W))\n",
" correction_factors[tind, band] = np.where(leverage > 0, np.sqrt(np.pi)/2 * (1 - leverage)**.5, np.nan)\n",
" for bl in dly_filt_SNRs:\n",
" dly_filt_SNRs[bl] /= correction_factors"
]
},
{
"cell_type": "markdown",
"id": "8035cece",
"id": "4b1d3a3a-67d6-45bd-aa36-e0135b4b401d",
"metadata": {},
"source": [
"## Calculate z-scores"
Expand All @@ -263,57 +285,19 @@
{
"cell_type": "code",
"execution_count": null,
"id": "9bc8c61e",
"metadata": {
"scrolled": true
},
"id": "fca3a96e-6413-4ef4-a37b-dc34e73e7aab",
"metadata": {},
"outputs": [],
"source": [
"if not ALL_FLAGGED:\n",
" t = time.time()\n",
" # estimate how much signal loss we should expect for white noise\n",
" filters_low = dspec.dpss_operator(hd.freqs[low_band], [0], [FILTER_DELAY / 1e9], eigenval_cutoff=[EIGENVAL_CUTOFF])[0]\n",
" filter_frac_low = filters_low.shape[1] / filters_low.shape[0]\n",
" filters_high = dspec.dpss_operator(hd.freqs[high_band], [0], [FILTER_DELAY / 1e9], eigenval_cutoff=[EIGENVAL_CUTOFF])[0]\n",
" filter_frac_high = filters_high.shape[1] / filters_high.shape[0]\n",
"zscore = {}\n",
"for pol in ['ee', 'nn']:\n",
" abs_SNRs_this_pol = [np.abs(dly_filt_SNRs[bl]) for bl in bls_to_filter if (bl[2] == pol) and (bl[0] != bl[1])]\n",
" \n",
" # calculate z-scores\n",
" zscore = {}\n",
" for pol in ['ee', 'nn']:\n",
" to_avg = []\n",
" weights = []\n",
" predicted_mean = 1.0\n",
" sigma = predicted_mean * np.sqrt(2 / np.pi)\n",
" variance_expected = (4 - np.pi) / 2 * sigma**2 / len(abs_SNRs_this_pol)\n",
"\n",
" for bl in bls_to_filter:\n",
" auto_bl = auto_bl = [k for k in red_avg_data if k[0] == k[1] and k[2] == bl[2]][0]\n",
" if (bl[2] == pol) and (bl != auto_bl): \n",
" # calcualte predicted variance\n",
" dt = np.median(np.diff(hd.times)) * 24 * 3600\n",
" df = np.median(np.diff(hd.freqs)) \n",
" predicted_variance = (np.abs(red_avg_data[auto_bl]))**2 / red_avg_nsamples[bl] / dt / df \n",
" predicted_variance[:, low_band] *= (1 - filter_frac_low)\n",
" predicted_variance[:, high_band] *= (1 - filter_frac_high)\n",
" predicted_variance[red_avg_flags[bl]] = np.nan\n",
"\n",
" # prep for inverse variance weighting\n",
" if np.any(np.isfinite(predicted_variance)):\n",
" weights.append(np.where(np.isfinite(predicted_variance) & np.isfinite(dly_filt_red_avg_data[bl]), predicted_variance**-1, 0))\n",
" to_avg.append(np.where(np.isfinite(predicted_variance) & np.isfinite(dly_filt_red_avg_data[bl]), np.abs(dly_filt_red_avg_data[bl]), 0))\n",
"\n",
" # Check if one polarization is effectively entirely flagged, and if so, flag the whole file\n",
" if len(weights) == 0:\n",
" print(f'No unique baselines have enough {pol}-polarized samples to merit filtering. Flagging the entire file.')\n",
" ALL_FLAGGED = True\n",
" continue\n",
" \n",
" # perform inverse variance weighred average\n",
" Wsum = np.sum(weights, axis=0)**-1\n",
" estimator = np.einsum(\"mij,mij->ij\", to_avg, weights) * Wsum\n",
"\n",
" # turn estimator into z-score assuming Rayleigh-distributed data (appropriate for averaging magnitudes of visibilities incoherently)\n",
" predicted_mean = np.sum(np.array(weights)**.5, axis=0) * Wsum * (np.pi / 4)**.5\n",
" predicted_var = (4 - np.pi) / 4 * Wsum\n",
" zscore[pol] = (estimator - predicted_mean) / predicted_var**.5\n",
" print(f'Finished computing z-scores in {(time.time() - t) / 60:.2f} minutes.')"
" zscore[pol] = (np.nanmean(abs_SNRs_this_pol, axis=0) - predicted_mean) / variance_expected**.5"
]
},
{
Expand Down Expand Up @@ -345,7 +329,7 @@
" ax.set_ylabel(f'{pol}-polarized z-score')\n",
" axes[0].legend() \n",
" axes[1].set_xlabel('Frequency (MHz)')\n",
" plt.tight_layout() "
" plt.tight_layout()"
]
},
{
Expand All @@ -361,7 +345,9 @@
" return \n",
" \n",
" plt.figure(figsize=(12, 4))\n",
" bins = np.arange(-np.nanmax(np.abs(list(zscore.values()))) - 1, np.nanmax(np.abs(list(zscore.values()))) + 1, .1)\n",
" all_abs_z = np.abs(list(zscore.values()))\n",
" all_abs_z = all_abs_z[np.isfinite(all_abs_z)]\n",
" bins = np.arange(-np.max(all_abs_z) - 1, np.max(all_abs_z) + 1, .1)\n",
" hist_ee = plt.hist(np.ravel(zscore['ee']), bins=bins, density=True, label='ee-polarized z-scores', alpha=.5)\n",
" hist_nn = plt.hist(np.ravel(zscore['nn']), bins=bins, density=True, label='nn-polarized z-scores', alpha=.5)\n",
" plt.plot(bins, (2*np.pi)**-.5 * np.exp(-bins**2 / 2), 'k--', label='Gaussian approximate\\nnoise-only distribution')\n",
Expand All @@ -380,7 +366,7 @@
"metadata": {},
"source": [
"# *Figure 1: z-Score Spectra for All Integrations in the File*\n",
"This plot shows the z-score spectrum for each integration and for both polarizations. This is what we'll use in full_day_rfi_round_2.ipynb to further refine the flagging waterfall. Negative-going excursions near prior flag boundaries are expected: the filter can overfit the noise when it is unconstrained on one side. "
"This plot shows the z-score spectrum for each integration and for both polarizations. This is what we'll use in full_day_rfi_round_2.ipynb to further refine the flagging waterfall."
]
},
{
Expand Down Expand Up @@ -473,6 +459,14 @@
"source": [
"print(f'Finished execution in {(time.time() - tstart) / 60:.2f} minutes.')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1e54a619-bd83-4001-a008-eed4180e2df6",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand All @@ -491,7 +485,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
"version": "3.12.0"
},
"toc": {
"base_numbering": 1,
Expand Down

0 comments on commit 446c369

Please sign in to comment.