From 80b9c2fcc4540bec85bffc1595f90c37b8e001c9 Mon Sep 17 00:00:00 2001 From: keskitalo Date: Tue, 23 Jan 2024 17:18:43 -0800 Subject: [PATCH 1/4] Fix parallelization problems in noise estimation --- src/libtoast/include/toast/fod_psd.hpp | 7 +- src/libtoast/src/toast_fod_psd.cpp | 45 ++-- src/toast/_libtoast/fod_psd.cpp | 25 ++- src/toast/observation.py | 30 ++- src/toast/observation_dist.py | 2 +- src/toast/ops/noise_estimation.py | 225 ++++++++++---------- src/toast/ops/noise_estimation_utils.py | 264 +++++++++++++++--------- 7 files changed, 347 insertions(+), 251 deletions(-) diff --git a/src/libtoast/include/toast/fod_psd.hpp b/src/libtoast/include/toast/fod_psd.hpp index b7d1755fe..d929c23ed 100644 --- a/src/libtoast/include/toast/fod_psd.hpp +++ b/src/libtoast/include/toast/fod_psd.hpp @@ -1,5 +1,5 @@ -// Copyright (c) 2015-2020 by the parties listed in the AUTHORS file. +// Copyright (c) 2015-2024 by the parties listed in the AUTHORS file. // All rights reserved. Use of this source code is governed by // a BSD-style license that can be found in the LICENSE file. @@ -11,11 +11,12 @@ namespace toast { void fod_autosums(int64_t n, const double * x, const uint8_t * good, - int64_t lagmax, double * sums, int64_t * hits); + int64_t lagmax, double * sums, int64_t * hits, + int64_t all_sums); void fod_crosssums(int64_t n, const double * x, const double * y, const uint8_t * good, int64_t lagmax, double * sums, - int64_t * hits); + int64_t * hits, int64_t all_sums, int64_t symmetric); } #endif // ifndef TOAST_FOD_PSD_HPP diff --git a/src/libtoast/src/toast_fod_psd.cpp b/src/libtoast/src/toast_fod_psd.cpp index 82b8b6fef..bd37d9c4b 100644 --- a/src/libtoast/src/toast_fod_psd.cpp +++ b/src/libtoast/src/toast_fod_psd.cpp @@ -1,5 +1,5 @@ -// Copyright (c) 2015-2020 by the parties listed in the AUTHORS file. +// Copyright (c) 2015-2024 by the parties listed in the AUTHORS file. // All rights reserved. Use of this source code is governed by // a BSD-style license that can be found in the LICENSE file. @@ -10,7 +10,8 @@ void toast::fod_autosums(int64_t n, double const * x, uint8_t const * good, - int64_t lagmax, double * sums, int64_t * hits) { + int64_t lagmax, double * sums, int64_t * hits, + int64_t all_sums) { toast::AlignedVector xgood(n); toast::AlignedVector gd(n); @@ -24,19 +25,22 @@ void toast::fod_autosums(int64_t n, double const * x, uint8_t const * good, } } - #pragma \ - omp parallel for default(none) shared(n, gd, lagmax, xgood, sums, hits) schedule(static, 100) +#pragma \ + omp parallel for default(none) \ + shared(n, gd, lagmax, xgood, sums, hits, all_sums) \ + schedule(static, 100) for (int64_t lag = 0; lag < lagmax; ++lag) { int64_t j = lag; double lagsum = 0.0; int64_t hitsum = 0; - for (int64_t i = 0; i < (n - lag); ++i) { + int64_t imax = all_sums ? n - lag : n - lagmax; + for (int64_t i = 0; i < imax; ++i) { lagsum += xgood[i] * xgood[j]; hitsum += gd[i] * gd[j]; j++; } - sums[lag] = lagsum; - hits[lag] = hitsum; + sums[lag] += lagsum; + hits[lag] += hitsum; } return; @@ -44,7 +48,7 @@ void toast::fod_autosums(int64_t n, double const * x, uint8_t const * good, void toast::fod_crosssums(int64_t n, double const * x, double const * y, uint8_t const * good, int64_t lagmax, double * sums, - int64_t * hits) { + int64_t * hits, int64_t all_sums, int64_t symmetric) { toast::AlignedVector xgood(n); toast::AlignedVector ygood(n); toast::AlignedVector gd(n); @@ -61,23 +65,28 @@ void toast::fod_crosssums(int64_t n, double const * x, double const * y, } } - #pragma \ - omp parallel for default(none) shared(n, gd, lagmax, xgood, ygood, sums, hits) schedule(static, 100) +#pragma \ + omp parallel for default(none) \ + shared(n, gd, lagmax, xgood, ygood, sums, hits, all_sums, symmetric) \ + schedule(static, 100) for (int64_t lag = 0; lag < lagmax; ++lag) { int64_t i, j; double lagsum = 0.0; int64_t hitsum = 0; - for (i = 0, j = lag; i < (n - lag); ++i, ++j) { + int64_t imax = all_sums ? n - lag : n - lagmax; + for (i = 0, j = lag; i < imax; ++i, ++j) { lagsum += xgood[i] * ygood[j]; hitsum += gd[i] * gd[j]; } - - // Use symmetry to double the statistics - for (i = 0, j = lag; i < (n - lag); ++i, ++j) { - lagsum += xgood[j] * ygood[i]; - } - sums[lag] = lagsum; - hits[lag] = 2 * hitsum; + if (symmetric && lag != 0) { + // Use symmetry to double the statistics + for (i = 0, j = lag; i < imax; ++i, ++j) { + lagsum += xgood[j] * ygood[i]; + } + hitsum *= 2; + } + sums[lag] += lagsum; + hits[lag] += hitsum; } return; diff --git a/src/toast/_libtoast/fod_psd.cpp b/src/toast/_libtoast/fod_psd.cpp index 54375c219..81d69513f 100644 --- a/src/toast/_libtoast/fod_psd.cpp +++ b/src/toast/_libtoast/fod_psd.cpp @@ -1,5 +1,5 @@ -// Copyright (c) 2015-2020 by the parties listed in the AUTHORS file. +// Copyright (c) 2015-2024 by the parties listed in the AUTHORS file. // All rights reserved. Use of this source code is governed by // a BSD-style license that can be found in the LICENSE file. @@ -8,7 +8,8 @@ void init_fod_psd(py::module & m) { m.def("fod_crosssums", [](py::buffer x, py::buffer y, py::buffer good, - int64_t lagmax, py::buffer sums, py::buffer hits) { + int64_t lagmax, py::buffer sums, py::buffer hits, + int64_t all_sums, int64_t symmetric) { pybuffer_check_1D (x); pybuffer_check_1D (y); pybuffer_check_1D (good); @@ -42,11 +43,12 @@ void init_fod_psd(py::module & m) { uint8_t * rawgood = reinterpret_cast (info_good.ptr); double * rawsums = reinterpret_cast (info_sums.ptr); int64_t * rawhits = reinterpret_cast (info_hits.ptr); - toast::fod_crosssums(n, rawx, rawy, rawgood, lagmax, rawsums, rawhits); + toast::fod_crosssums(n, rawx, rawy, rawgood, lagmax, + rawsums, rawhits, all_sums, symmetric); return; }, py::arg("x"), py::arg("y"), py::arg("good"), py::arg("lagmax"), - py::arg("sums"), py::arg( - "hits"), R"( + py::arg("sums"), py::arg("hits"), py::arg("all_sums"), + py::arg("symmetric"), R"( Accumulate the time domain covariance between two vectors. Args: @@ -56,6 +58,8 @@ void init_fod_psd(py::module & m) { lagmax (int): The maximum sample distance to consider. sums (array like, float64): The vector of sums^2 to accumulate for each lag. hits (array_like, int64): The vector of hits to accumulate for each lag. + all_sums (int): If non-zero, evaluate lags < `lagmax` even in the last + `lagmax` samples of `x` and `y`. Returns: None. @@ -63,7 +67,8 @@ void init_fod_psd(py::module & m) { )"); m.def("fod_autosums", [](py::buffer x, py::buffer good, int64_t lagmax, - py::buffer sums, py::buffer hits) { + py::buffer sums, py::buffer hits, + int64_t all_sums) { pybuffer_check_1D (x); pybuffer_check_1D (good); pybuffer_check_1D (sums); @@ -93,11 +98,11 @@ void init_fod_psd(py::module & m) { uint8_t * rawgood = reinterpret_cast (info_good.ptr); double * rawsums = reinterpret_cast (info_sums.ptr); int64_t * rawhits = reinterpret_cast (info_hits.ptr); - toast::fod_autosums(n, rawx, rawgood, lagmax, rawsums, rawhits); + toast::fod_autosums(n, rawx, rawgood, lagmax, + rawsums, rawhits, all_sums); return; }, py::arg("x"), py::arg("good"), py::arg("lagmax"), - py::arg("sums"), py::arg( - "hits"), R"( + py::arg("sums"), py::arg("hits"), py::arg("all_sums"), R"( Accumulate the time domain covariance. Args: @@ -106,6 +111,8 @@ void init_fod_psd(py::module & m) { lagmax (int): The maximum sample distance to consider. sums (array like, float64): The vector of sums^2 to accumulate for each lag. hits (array_like, int64): The vector of hits to accumulate for each lag. + all_sums (int): If non-zero, evaluate lags < `lagmax` even in the last + `lagmax` samples of `x`. Returns: None. diff --git a/src/toast/observation.py b/src/toast/observation.py index d67bdf0cd..aaadf120c 100644 --- a/src/toast/observation.py +++ b/src/toast/observation.py @@ -720,6 +720,7 @@ def redistribute( times=None, override_sample_sets=False, override_detector_sets=False, + return_global_intervals=False, ): """Take the currently allocated observation and redistribute in place. @@ -738,9 +739,11 @@ def redistribute( existing sample set boundaries in the redistributed data. override_detector_sets (False, None or list): If not False, override existing detector set boundaries in the redistributed data. + return_global_intervals (bool): Return a list of global intervals for + reference Returns: - None + None or global_intervals """ log = Logger.get() @@ -781,15 +784,16 @@ def redistribute( ) # Do the actual redistribution - new_shr_manager, new_det_manager, new_intervals_manager = redistribute_data( - self.dist, - new_dist, - self.shared, - self.detdata, - self.intervals, - times=times, - dbg=self.name, - ) + new_shr_manager, new_det_manager, new_intervals_manager, global_intervals = \ + redistribute_data( + self.dist, + new_dist, + self.shared, + self.detdata, + self.intervals, + times=times, + dbg=self.name, + ) # Redistribute any metadata objects that support it. for k, v in self._internal.items(): @@ -817,6 +821,12 @@ def redistribute( {x: all_det_flags[x] for x in self.local_detectors} ) + if return_global_intervals: + global_intervals = self.dist.comm.comm_group.bcast(global_intervals) + return global_intervals + else: + return + # Accelerator use def accel_create(self, names): diff --git a/src/toast/observation_dist.py b/src/toast/observation_dist.py index 44ec91eff..72c2eabe4 100644 --- a/src/toast/observation_dist.py +++ b/src/toast/observation_dist.py @@ -917,4 +917,4 @@ def redistribute_data( glb = global_intervals[field] new_intervals_manager.create(field, glb, new_shared_manager[times], fromrank=0) - return new_shared_manager, new_detdata_manager, new_intervals_manager + return new_shared_manager, new_detdata_manager, new_intervals_manager, global_intervals diff --git a/src/toast/ops/noise_estimation.py b/src/toast/ops/noise_estimation.py index cbde91d33..04da1492e 100644 --- a/src/toast/ops/noise_estimation.py +++ b/src/toast/ops/noise_estimation.py @@ -19,7 +19,13 @@ from .arithmetic import Combine from .copy import Copy from .delete import Delete -from .noise_estimation_utils import autocov_psd, crosscov_psd, flagged_running_average +from .noise_estimation_utils import ( + autocov_psd, + crosscov_psd, + flagged_running_average, + communicate_overlap, + highpass_flagged_signal, +) from .operator import Operator from .polyfilter import CommonModeFilter from .scan_healpix import ScanHealpixMap, ScanHealpixMask @@ -131,9 +137,19 @@ class NoiseEstim(Operator): save_cov = Bool(False, help="Save also the sample covariance") + symmetric = Bool( + False, + help="If True, treat positive and negative lags as equivalent " + "in the cross correlator", + ) + nbin_psd = Int(1000, allow_none=True, help="Bin the resulting PSD") - lagmax = Int(10000, help="Maximum lag to consider for the covariance function") + lagmax = Int( + 10000, + help="Maximum lag to consider for the covariance function. " + "Will be truncated the length of the longest view.", + ) stationary_period = Quantity( 86400 * u.s, @@ -244,7 +260,9 @@ def _redistribute(self, obs): timer=timer, ) # Redistribute this temporary observation to be distributed by samples - temp_obs.redistribute(1, times=self.times, override_sample_sets=None) + global_intervals = temp_obs.redistribute( + 1, times=self.times, override_sample_sets=None, return_global_intervals=True) + global_intervals = global_intervals[self.view] log.debug_rank( f"{obs.comm.group:4} : Redistributed observation in", comm=temp_obs.comm.comm_group, @@ -254,8 +272,11 @@ def _redistribute(self, obs): else: self.redistribute = False temp_obs = obs + global_intervals = [] + for ival in obs.intervals[self.view]: + global_intervals.append((ival.start, ival.stop)) - return temp_obs + return temp_obs, global_intervals @function_timer def _re_redistribute(self, obs, temp_obs): @@ -346,9 +367,9 @@ def _exec(self, data, detectors=None, **kwargs): scan_mask.apply(data, detectors=detectors) for orig_obs in data.obs: - obs = self._redistribute(orig_obs) + obs, global_intervals = self._redistribute(orig_obs) - # Get the set of all detector we are considering for this obs + # Get the set of all detectors we are considering for this obs local_dets = obs.select_local_detectors(detectors, flagmask=self.det_mask) if obs.comm.comm_group is not None: pdets = obs.comm.comm_group.gather(local_dets, root=0) @@ -401,6 +422,13 @@ def _exec(self, data, detectors=None, **kwargs): continue pairs.append([det1, det2]) + if self.symmetric: + # Remove duplicate entries in pair list + unordered_pairs = set() + for pair in pairs: + unordered_pairs.add(tuple(sorted(pair))) + pairs = list(unordered_pairs) + times = np.array(obs.shared[self.times]) nsample = times.size @@ -414,31 +442,6 @@ def _exec(self, data, detectors=None, **kwargs): fileroot = f"{self.name}_{obs.name}" - intervals = obs.intervals[self.view].data - - # self.highpass_signal(obs, comm, intervals) - - # Extend the gap between intervals to prevent sample pairs - # that cross the gap. - - gap_min = self.lagmax + 1 - # Downsampled data requires longer gaps - gap_min_nsum = self.lagmax * self.nsum + 1 - - gapflags = np.zeros_like(shared_flags) - gapflags_nsum = np.zeros_like(shared_flags) - for ival1, ival2 in zip(intervals[:-1], intervals[1:]): - gap_start = ival1.last + 1 - gap_stop = max(gap_start + gap_min, ival2.first) - if gap_stop >= ival2.last: - msg = f"Gap from samples {ival1.last+1} to {ival2.first}" - msg += " extended through next good data interval. Use " - msg += "different / no intervals or shorter lagmax." - log.warning(msg) - gap_stop_nsum = max(gap_start + gap_min_nsum, ival2.first) - gapflags[gap_start:gap_stop] = True - gapflags_nsum[gap_start:gap_stop_nsum] = True - # Re-use this flag array flags = np.zeros(times.size, dtype=bool) @@ -491,17 +494,16 @@ def _exec(self, data, detectors=None, **kwargs): nse_freqs, nse_psd = self.process_noise_estimate( obs, + global_intervals, signal1, signal2, flags, - gapflags, - gapflags_nsum, times, fsample, fileroot, det1_name, det2_name, - intervals, + self.lagmax, ) if obs.comm.group_rank == 0: det_units = obs.detdata[self.det_data].units @@ -539,58 +541,9 @@ def _exec(self, data, detectors=None, **kwargs): del obs @function_timer - def highpass_signal(self, obs, intervals): - """Suppress the sub-harmonic modes in the TOD by high-pass - filtering. - """ - log = Logger.get() - timer = Timer() - timer.start() - log.debug_rank("High-pass-filtering signal", comm=obs.comm.comm_group) - for det in obs.local_detectors: - signal = obs.detdata[self.det_data][det] - flags = obs.detdata[self.det_flags][det] & self.det_flag_mask - for ival in intervals: - ind = slice(ival.first, ival.last + 1) - sig = signal[ind] - flg = flags[ind] - trend = flagged_running_average( - sig, flg, self.lagmax, return_flags=False - ) - sig -= trend - log.debug_rank("TOD high pass", comm=obs.comm.comm_group, timer=timer) - return - - @function_timer - def decimate(self, x, flg, gapflg, intervals): - # Low-pass filter with running average, then downsample - xx = x.copy() - flags = flg.copy() - for ival in intervals: - ind = slice(ival.first, ival.last + 1) - xx[ind], flags[ind] = flagged_running_average( - x[ind], flg[ind], self.naverage, return_flags=True - ) - return xx[:: self.nsum].copy(), (flags + gapflg)[:: self.nsum].copy() - - # def highpass(self, x, flg): - # # Flagged real-space high pass filter - # xx = x.copy() - # - # j = 0 - # while j < x.size and flg[j]: j += 1 - # - # alpha = .999 - # - # for i in range(j+1, x.size): - # if flg[i]: - # xx[i] = x[j] - # else: - # xx[i] = alpha*(xx[j] + x[i] - x[j]) - # j = i - # - # xx /= alpha - # return xx + def decimate(self, signal, flags): + """Downsample previously highpass-filtered signal""" + return signal[:: self.nsum].copy(), flags[:: self.nsum].copy() @function_timer def log_bin(self, freq, nbin=100, fmin=None, fmax=None): @@ -806,11 +759,10 @@ def process_downsampled_noise_estimate( signal1, signal2, flags, - gapflags_nsum, - local_intervals, my_psds1, my_cov1, comm, + lagmax, ): # Get another PSD for a down-sampled TOD to measure the # low frequency power @@ -818,16 +770,12 @@ def process_downsampled_noise_estimate( timestamps_decim = timestamps[:: self.nsum] # decimate() will smooth and downsample the signal in # each valid interval separately - signal1_decim, flags_decim = self.decimate( - signal1, flags, gapflags_nsum, local_intervals - ) + signal1_decim, flags_decim = self.decimate(signal1, flags) if signal2 is not None: - signal2_decim, flags_decim = self.decimate( - signal2, flags, gapflags_nsum, local_intervals - ) + signal2_decim, flags_decim = self.decimate(signal2, flags) stationary_period = self.stationary_period.to_value(u.s) - lagmax = min(self.lagmax, timestamps_decim.size) + lagmax = min(lagmax, timestamps_decim.size) if signal2 is None: result = autocov_psd( timestamps_decim, @@ -904,22 +852,56 @@ def process_downsampled_noise_estimate( def process_noise_estimate( self, obs, + global_intervals, signal1, signal2, flags, - gapflags, - gapflags_nsum, timestamps, fsample, fileroot, det1, det2, - local_intervals, + lagmax, ): log = Logger.get() + + # We apply a prewhitening filter to the signal. To accommodate the + # quality flags, the filter is a moving average that only accounts + # for the unflagged samples + naverage = lagmax + + # Extend the local arrays to remove boundary effects from filtering + comm = obs.comm_row + extended_times, extended_flags, extended_signal1, extended_signal2 = \ + communicate_overlap( + timestamps, signal1, signal2, flags, lagmax, naverage, comm + ) # High pass filter the signal to avoid aliasing - # self.highpass(signal1, noise_flags) - # self.highpass(signal2, noise_flags) + extended_signal1 = highpass_flagged_signal( + extended_signal1, + extended_flags==0, + naverage, + ) + if signal2 is not None: + extended_signal2 = highpass_flagged_signal( + extended_signal2, + extended_flags==0, + naverage, + ) + # Crop the filtering margin but keep up to lagmax samples + half_average = naverage // 2 + 1 + if comm is not None and comm.rank > 0: + extended_times = extended_times[half_average:] + extended_flags = extended_flags[half_average:] + extended_signal1 = extended_signal1[half_average:] + if extended_signal2 is not None: + extended_signal2 = extended_signal2[half_average:] + if comm is not None and comm.rank < comm.size - 1: + extended_times = extended_times[:-half_average] + extended_flags = extended_flags[:-half_average] + extended_signal1 = extended_signal1[:-half_average] + if extended_signal2 is not None: + extended_signal2 = extended_signal2[:-half_average] # Compute the autocovariance function and the matching # PSD for each stationary interval @@ -930,25 +912,32 @@ def process_noise_estimate( if signal2 is None: result = autocov_psd( timestamps, - signal1, - flags + gapflags, - self.lagmax, + extended_times, + global_intervals, + extended_signal1, + extended_flags, + lagmax, + naverage, stationary_period, fsample, - comm=obs.comm_row, + comm=comm, return_cov=self.save_cov, ) else: result = crosscov_psd( timestamps, - signal1, - signal2, - flags + gapflags, - self.lagmax, + extended_times, + global_intervals, + extended_signal1, + extended_signal2, + extended_flags, + lagmax, + naverage, stationary_period, fsample, - comm=obs.comm_row, + comm=comm, return_cov=self.save_cov, + symmetric=self.symmetric, ) if self.save_cov: my_psds1, my_cov1 = result @@ -962,16 +951,15 @@ def process_noise_estimate( my_psds2, my_cov2, ) = self.process_downsampled_noise_estimate( - timestamps, + extended_times, fsample, - signal1, - signal2, - flags, - gapflags_nsum, - local_intervals, + extended_signal1, + extended_signal2, + extended_flags, my_psds1, my_cov1, - obs.comm_row, + comm, + lagmax, ) log.debug_rank( @@ -999,14 +987,15 @@ def process_noise_estimate( # concatenate + if self.save_cov: + my_cov = my_cov1 # Only store the fully sampled covariance + if binfreq10 is None: my_times = [] my_binned_psds = [] binfreq0 = None else: my_times = my_times1 - if self.save_cov: - my_cov = my_cov1 # Only store the fully sampled covariance if self.nsum > 1: # frequencies that are usable in the down-sampled PSD fcut = fsample / 2 / self.naverage / 100 diff --git a/src/toast/ops/noise_estimation_utils.py b/src/toast/ops/noise_estimation_utils.py index c5b284020..611f626bf 100644 --- a/src/toast/ops/noise_estimation_utils.py +++ b/src/toast/ops/noise_estimation_utils.py @@ -79,30 +79,124 @@ def highpass_flagged_signal(sig, good, naverage): (array): The processed array. """ - # First fit and remove a linear trend. Loss of power from this - # filter is assumed negligible in the frequency bins of interest ngood = np.sum(good) if ngood == 0: raise RuntimeError("No valid samples") + """ + De-trending disabled as unnecessary. It also changes results at + different concurrencies. + # First fit and remove a linear trend. Loss of power from this + # filter is assumed negligible in the frequency bins of interest templates = np.vstack([np.ones(ngood), np.arange(good.size)[good]]) invcov = np.dot(templates, templates.T) cov = np.linalg.inv(invcov) proj = np.dot(templates, sig[good]) coeff = np.dot(cov, proj) sig[good] -= coeff[0] + coeff[1] * templates[1] + """ # Then prewhiten the data. This filter will be corrected in the # PSD estimates. trend = flagged_running_average(sig, good == 0, naverage) - sig[good] -= trend[good] - return sig + return sig - trend + + +@function_timer +def communicate_overlap(times, signal1, signal2, flags, lagmax, naverage, comm): + """Send and receive TOD to have margin for filtering and lag """ + + if comm is None: + rank = 0 + ntask = 1 + else: + rank = comm.rank + ntask = comm.size + + # Communicate naverage + lagmax samples between processes so that + # running averages do not change with distributed data. + + nsamp = signal1.size + + if lagmax > nsamp and comm is not None and comm.size > 1: + msg = ( + f"communicate_overlap: lagmax = {lagmax} and nsample = {nsamp}. " + f"Communicating TOD beyond nearest neighbors is not " + f"implemented. Reduce lagmax or the size of the MPI communicator." + ) + raise RuntimeError(msg) + + half_average = naverage // 2 + 1 + nextend_backward = half_average + nextend_forward = half_average + lagmax + if rank == 0: + nextend_backward = 0 + if rank == ntask - 1: + nextend_forward = 0 + nextend = nextend_backward + nextend_forward + + extended_signal1 = np.zeros(nsamp + nextend, dtype=np.float64) + if signal2 is None: + extended_signal2 = None + else: + extended_signal2 = np.zeros(nsamp + nextend, dtype=np.float64) + extended_flags = np.zeros(nsamp + nextend, dtype=bool) + extended_times = np.zeros(nsamp + nextend, dtype=times.dtype) + + ind = slice(nextend_backward, nextend_backward + nsamp) + extended_signal1[ind] = signal1 + if signal2 is not None: + extended_signal2[ind] = signal2 + extended_flags[ind] = flags + extended_times[ind] = times + + if comm is not None: + for evenodd in range(2): + if rank % 2 == evenodd % 2: + # Send to rank - 1 + if rank != 0: + nsend = lagmax + half_average + comm.send(signal1[:nsend], dest=rank - 1, tag=1) + if signal2 is not None: + comm.send(signal2[:nsend], dest=rank - 1, tag=2) + comm.send(flags[:nsend], dest=rank - 1, tag=3) + comm.send(times[:nsend], dest=rank - 1, tag=4) + # Send to rank + 1 + if rank != ntask - 1: + nsend = half_average + comm.send(signal1[-nsend:], dest=rank + 1, tag=1) + if signal2 is not None: + comm.send(signal2[-nsend:], dest=rank + 1, tag=2) + comm.send(flags[-nsend:], dest=rank + 1, tag=3) + comm.send(times[-nsend:], dest=rank + 1, tag=4) + else: + # Receive from rank + 1 + if rank != ntask - 1: + nrecv = lagmax + half_average + extended_signal1[-nrecv:] = comm.recv(source=rank + 1, tag=1) + if signal2 is not None: + extended_signal2[-nrecv:] = comm.recv(source=rank + 1, tag=2) + extended_flags[-nrecv:] = comm.recv(source=rank + 1, tag=3) + extended_times[-nrecv:] = comm.recv(source=rank + 1, tag=4) + # Receive from rank - 1 + if rank != 0: + nrecv = half_average + extended_signal1[:nrecv] = comm.recv(source=rank - 1, tag=1) + if signal2 is not None: + extended_signal2[:nrecv] = comm.recv(source=rank - 1, tag=2) + extended_flags[:nrecv] = comm.recv(source=rank - 1, tag=3) + extended_times[:nrecv] = comm.recv(source=rank - 1, tag=4) + + return extended_times, extended_flags, extended_signal1, extended_signal2 @function_timer def autocov_psd( times, - signal, - flags, + extended_times, + global_intervals, + extended_signal, + extended_flags, lagmax, + naverage, stationary_period, fsample, comm=None, @@ -117,9 +211,12 @@ def autocov_psd( Args: times (float): Signal time stamps. + global_intervals (list): Time stamp ranges over which to + perform independent analysis signal (float): Regularly sampled signal vector. flags (float): Signal quality flags. lagmax (int): Largest sample separation to evaluate. + naverage (int): Length of running average in highpass filter stationary_period (float): Length of a stationary interval in units of the times vector. fsample (float): The sampling frequency in Hz @@ -132,21 +229,36 @@ def autocov_psd( """ return crosscov_psd( - times, signal, None, flags, lagmax, stationary_period, fsample, comm, return_cov + times, + extended_times, + global_intervals, + extended_signal, + None, + extended_flags, + lagmax, + naverage, + stationary_period, + fsample, + comm, + return_cov, ) @function_timer def crosscov_psd( times, - signal1, - signal2, - flags, + extended_times, + global_intervals, + extended_signal1, + extended_signal2, + extended_flags, lagmax, + naverage, stationary_period, fsample, comm=None, return_cov=False, + symmetric=False, ): """Compute the sample (cross)covariance. @@ -157,120 +269,88 @@ def crosscov_psd( Args: times (float): Signal time stamps. + global_intervals (list): Time stamp ranges over which to + perform independent analysis signal1 (float): Regularly sampled signal vector. signal2 (float): Regularly sampled signal vector or None. flags (float): Signal quality flags. lagmax (int): Largest sample separation to evaluate. + naverage (int): Length of running average in highpass filter stationary_period (float): Length of a stationary interval in units of the times vector. fsample (float): The sampling frequency in Hz comm (MPI.Comm): The MPI communicator or None. - return_cov (bool): Return also the covariance function + return_cov (bool): Return also the covariance function + symmetric (bool): Treat positive and negative lags as equivalent Returns: (list): List of local tuples of (start_time, stop_time, bin_frequency, bin_value) """ - rank = 0 - ntask = 1 - time_start = times[0] - time_stop = times[-1] - if comm is not None: + if comm is None: + rank = 0 + ntask = 1 + time_start = extended_times[0] + time_stop = extended_times[-1] + else: rank = comm.rank ntask = comm.size - time_start = comm.bcast(times[0], root=0) - time_stop = comm.bcast(times[-1], root=ntask - 1) - - # We apply a prewhitening filter to the signal. To accommodate the - # quality flags, the filter is a moving average that only accounts - # for the unflagged samples - naverage = lagmax + time_start = comm.bcast(extended_times[0], root=0) + time_stop = comm.bcast(extended_times[-1], root=ntask - 1) nreal = int(np.ceil((time_stop - time_start) / stationary_period)) - - # Communicate lagmax samples from the beginning of the array - # backwards in the MPI communicator - - nsamp = signal1.size - - if lagmax > nsamp and comm is not None and comm.size > 1: - msg = ( - f"crosscov_psd: lagmax = {lagmax} and nsample = {nsamp}. " - f"Communicating TOD beyond nearest neighbors is not " - f"implemented. Reduce lagmax or the size of the MPI communicator." - ) - raise RuntimeError(msg) - - if rank != ntask - 1: - nextend = lagmax - else: - nextend = 0 - - extended_signal1 = np.zeros(nsamp + nextend, dtype=np.float64) - if signal2 is not None: - extended_signal2 = np.zeros(nsamp + nextend, dtype=np.float64) - extended_flags = np.zeros(nsamp + nextend, dtype=bool) - extended_times = np.zeros(nsamp + nextend, dtype=times.dtype) - - extended_signal1[:nsamp] = signal1 - if signal2 is not None: - extended_signal2[:nsamp] = signal2 - extended_flags[:nsamp] = flags - extended_times[:nsamp] = times - - if comm is not None: - for evenodd in range(2): - if rank % 2 == evenodd % 2: - # Send - if rank == 0: - continue - comm.send(signal1[:lagmax], dest=rank - 1, tag=0) - if signal2 is not None: - comm.send(signal1[:lagmax], dest=rank - 1, tag=3) - comm.send(flags[:lagmax], dest=rank - 1, tag=1) - comm.send(times[:lagmax], dest=rank - 1, tag=2) - else: - # Receive - if rank == ntask - 1: - continue - extended_signal1[-lagmax:] = comm.recv(source=rank + 1, tag=0) - if signal2 is not None: - extended_signal1[-lagmax:] = comm.recv(source=rank + 1, tag=3) - extended_flags[-lagmax:] = comm.recv(source=rank + 1, tag=1) - extended_times[-lagmax:] = comm.recv(source=rank + 1, tag=2) - realization = ((extended_times - time_start) / stationary_period).astype(np.int64) # Set flagged elements to zero extended_signal1[extended_flags != 0] = 0 - if signal2 is not None: + if extended_signal2 is not None: extended_signal2[extended_flags != 0] = 0 covs = {} + # Loop over local realizations for ireal in range(realization[0], realization[-1] + 1): # Evaluate the covariance realflg = realization == ireal - good = extended_flags[realflg] == 0 - ngood = np.sum(good) - if ngood == 0: - continue - sig1 = extended_signal1[realflg].copy() - sig1 = highpass_flagged_signal(sig1, good, naverage) - # High pass filter does not work at the ends - ind = slice(naverage // 2, -naverage // 2) + realtimes = extended_times[realflg] + realgood = extended_flags[realflg] == 0 + realsig1 = extended_signal1[realflg].copy() + if extended_signal2 is not None: + realsig2 = extended_signal2[realflg].copy() cov_hits = np.zeros(lagmax, dtype=np.int64) cov = np.zeros(lagmax, dtype=np.float64) - if signal2 is None: - fod_autosums(sig1[ind], good[ind].astype(np.uint8), lagmax, cov, cov_hits) - else: - sig2 = extended_signal2[realflg].copy() - sig2 = highpass_flagged_signal(sig2, good, lagmax) - fod_crosssums( - sig1[ind], sig2[ind], good[ind].astype(np.uint8), lagmax, cov, cov_hits - ) + # Process all global intervals that overlap this realization + for start_time, stop_time in global_intervals: + if start_time > times[-1]: + # This interval is owned by another process + continue + if stop_time < realtimes[0] or start_time > realtimes[-1]: + # no overlap + continue + # Avoid double-counting sample pairs + all_sums = stop_time < realtimes[-1] + # Find the correct range of samples + istart, istop = np.searchsorted(realtimes, [start_time, stop_time]) + ind = slice(istart, istop + 1) + good = realgood[ind].astype(np.uint8) + ngood = np.sum(good) + if ngood == 0: + continue + if extended_signal2 is None: + fod_autosums(realsig1[ind], good, lagmax, cov, cov_hits, all_sums) + else: + fod_crosssums( + realsig1[ind], + realsig2[ind], + good, + lagmax, + cov, + cov_hits, + all_sums, + symmetric, + ) covs[ireal] = (cov_hits, cov) # Collect the estimated covariance functions From 1f90d1dd649ad91a11e17b31d79f58d5bb9b8ee6 Mon Sep 17 00:00:00 2001 From: keskitalo Date: Wed, 24 Jan 2024 13:03:50 -0800 Subject: [PATCH 2/4] Small fixes for self.view=None --- src/toast/ops/noise_estimation.py | 86 ++++++++++++++++++++++--- src/toast/ops/noise_estimation_utils.py | 17 +++-- 2 files changed, 88 insertions(+), 15 deletions(-) diff --git a/src/toast/ops/noise_estimation.py b/src/toast/ops/noise_estimation.py index 04da1492e..b21b50cf3 100644 --- a/src/toast/ops/noise_estimation.py +++ b/src/toast/ops/noise_estimation.py @@ -262,19 +262,22 @@ def _redistribute(self, obs): # Redistribute this temporary observation to be distributed by samples global_intervals = temp_obs.redistribute( 1, times=self.times, override_sample_sets=None, return_global_intervals=True) - global_intervals = global_intervals[self.view] + if self.view is not None: + global_intervals = global_intervals[self.view] log.debug_rank( f"{obs.comm.group:4} : Redistributed observation in", comm=temp_obs.comm.comm_group, timer=timer, ) - comm = None else: self.redistribute = False temp_obs = obs global_intervals = [] - for ival in obs.intervals[self.view]: - global_intervals.append((ival.start, ival.stop)) + if self.view is not None: + for ival in obs.intervals[self.view]: + global_intervals.append((ival.start, ival.stop)) + if self.view is None: + global_intervals = [(None, None)] return temp_obs, global_intervals @@ -505,6 +508,7 @@ def _exec(self, data, detectors=None, **kwargs): det2_name, self.lagmax, ) + if obs.comm.group_rank == 0: det_units = obs.detdata[self.det_data].units if det_units == u.dimensionless_unscaled: @@ -754,6 +758,8 @@ def save_psds( @function_timer def process_downsampled_noise_estimate( self, + obs, + global_intervals, timestamps, fsample, signal1, @@ -771,17 +777,67 @@ def process_downsampled_noise_estimate( # decimate() will smooth and downsample the signal in # each valid interval separately signal1_decim, flags_decim = self.decimate(signal1, flags) - if signal2 is not None: + if signal2 is None: + signal2_decim = None + else: signal2_decim, flags_decim = self.decimate(signal2, flags) stationary_period = self.stationary_period.to_value(u.s) lagmax = min(lagmax, timestamps_decim.size) - if signal2 is None: - result = autocov_psd( + + # We apply a prewhitening filter to the signal. To accommodate the + # quality flags, the filter is a moving average that only accounts + # for the unflagged samples + naverage = lagmax + + # Extend the local arrays to remove boundary effects from filtering + comm = obs.comm_row + extended_times, extended_flags, extended_signal1, extended_signal2 = \ + communicate_overlap( timestamps_decim, signal1_decim, + signal2_decim, flags_decim, lagmax, + naverage, + comm + ) + # High pass filter the signal to avoid aliasing + extended_signal1 = highpass_flagged_signal( + extended_signal1, + extended_flags==0, + naverage, + ) + if signal2 is not None: + extended_signal2 = highpass_flagged_signal( + extended_signal2, + extended_flags==0, + naverage, + ) + # Crop the filtering margin but keep up to lagmax samples + half_average = naverage // 2 + 1 + if comm is not None and comm.rank > 0: + extended_times = extended_times[half_average:] + extended_flags = extended_flags[half_average:] + extended_signal1 = extended_signal1[half_average:] + if extended_signal2 is not None: + extended_signal2 = extended_signal2[half_average:] + if comm is not None and comm.rank < comm.size - 1: + extended_times = extended_times[:-half_average] + extended_flags = extended_flags[:-half_average] + extended_signal1 = extended_signal1[:-half_average] + if extended_signal2 is not None: + extended_signal2 = extended_signal2[:-half_average] + + if signal2 is None: + result = autocov_psd( + timestamps_decim, + extended_times, + global_intervals, + extended_signal1, + extended_flags, + lagmax, + naverage, stationary_period, fsample / self.nsum, comm=comm, @@ -790,14 +846,18 @@ def process_downsampled_noise_estimate( else: result = crosscov_psd( timestamps_decim, - signal1_decim, - signal2_decim, - flags_decim, + extended_times, + global_intervals, + extended_signal1, + extended_signal2, + extended_flags, lagmax, + naverage, stationary_period, fsample / self.nsum, comm=comm, return_cov=self.save_cov, + symmetric=self.symmetric, ) if self.save_cov: my_psds2, my_cov2 = result @@ -863,6 +923,10 @@ def process_noise_estimate( det2, lagmax, ): + """Measure the sample (cross) covariance in the signal-subtracted + TOD and Fourier-transform it for noise PSD. + """ + log = Logger.get() # We apply a prewhitening filter to the signal. To accommodate the @@ -951,6 +1015,8 @@ def process_noise_estimate( my_psds2, my_cov2, ) = self.process_downsampled_noise_estimate( + obs, + global_intervals, extended_times, fsample, extended_signal1, diff --git a/src/toast/ops/noise_estimation_utils.py b/src/toast/ops/noise_estimation_utils.py index 611f626bf..ff07d2eef 100644 --- a/src/toast/ops/noise_estimation_utils.py +++ b/src/toast/ops/noise_estimation_utils.py @@ -323,17 +323,24 @@ def crosscov_psd( cov = np.zeros(lagmax, dtype=np.float64) # Process all global intervals that overlap this realization for start_time, stop_time in global_intervals: - if start_time > times[-1]: + if start_time is not None \ + and (start_time > times[-1] or start_time > realtimes[-1]): # This interval is owned by another process continue - if stop_time < realtimes[0] or start_time > realtimes[-1]: + if stop_time is not None and stop_time < realtimes[0]: # no overlap continue # Avoid double-counting sample pairs - all_sums = stop_time < realtimes[-1] + if stop_time is None: + all_sums = rank == ntask - 1 + else: + all_sums = stop_time > realtimes[-1] # Find the correct range of samples - istart, istop = np.searchsorted(realtimes, [start_time, stop_time]) - ind = slice(istart, istop + 1) + if start_time is None or stop_time is None: + ind = slice(realsig1.size) + else: + istart, istop = np.searchsorted(realtimes, [start_time, stop_time]) + ind = slice(istart, istop ) good = realgood[ind].astype(np.uint8) ngood = np.sum(good) if ngood == 0: From 112d38802780951a28fd60a8303bfc8d1c98326b Mon Sep 17 00:00:00 2001 From: Theodore Kisner Date: Thu, 25 Jan 2024 14:47:20 -0800 Subject: [PATCH 3/4] Some small fixes, discovered with unit tests and batch jobs (#730) with 8 or 16 processes per group: * When duplicating an observation, also duplicate per-detector flags. * When redistributing an observation, reset per-detector flags before setting. * Make the common mode removal an option (default True) prior to noise estimation. This is useful if the data has already had the common mode removed prior to this operator. * When high-pass filtering data that is distributed by time slices there may be some chunks that are fully flagged. This should not be an error. Instead, the flagged timestream is set to zero. * When communicating overlaps, ensure that the span used (lagmax + half_average) is less than the number of samples. --- src/toast/observation.py | 2 + src/toast/observation_dist.py | 7 +- src/toast/ops/noise_estimation.py | 107 +++++++++++++----------- src/toast/ops/noise_estimation_utils.py | 84 ++++++++++++------- src/toast/tests/ops_noise_estim.py | 6 +- 5 files changed, 121 insertions(+), 85 deletions(-) diff --git a/src/toast/observation.py b/src/toast/observation.py index aaadf120c..3b329048c 100644 --- a/src/toast/observation.py +++ b/src/toast/observation.py @@ -656,6 +656,7 @@ def duplicate( sample_sets=self.all_sample_sets, process_rows=self.dist.process_rows, ) + new_obs.set_local_detector_flags(self.local_detector_flags) for k, v in self._internal.items(): if meta is None or k in meta: new_obs[k] = copy.deepcopy(v) @@ -817,6 +818,7 @@ def redistribute( self.intervals = new_intervals_manager # Restore detector flags for our new local detectors + self._detflags = {x: int(0) for x in self.dist.dets[self.dist.comm.group_rank]} self.set_local_detector_flags( {x: all_det_flags[x] for x in self.local_detectors} ) diff --git a/src/toast/observation_dist.py b/src/toast/observation_dist.py index 72c2eabe4..5fb4b19a2 100644 --- a/src/toast/observation_dist.py +++ b/src/toast/observation_dist.py @@ -917,4 +917,9 @@ def redistribute_data( glb = global_intervals[field] new_intervals_manager.create(field, glb, new_shared_manager[times], fromrank=0) - return new_shared_manager, new_detdata_manager, new_intervals_manager, global_intervals + return ( + new_shared_manager, + new_detdata_manager, + new_intervals_manager, + global_intervals, + ) diff --git a/src/toast/ops/noise_estimation.py b/src/toast/ops/noise_estimation.py index b21b50cf3..45f1abf31 100644 --- a/src/toast/ops/noise_estimation.py +++ b/src/toast/ops/noise_estimation.py @@ -179,6 +179,8 @@ class NoiseEstim(Operator): None, allow_none=True, help="When set, PSDs are measured over averaged TODs" ) + remove_common_mode = Bool(True, help="Remove common mode signal before estimation") + @traitlets.validate("detector_pointing") def _check_detector_pointing(self, proposal): detpointing = proposal["value"] @@ -261,7 +263,11 @@ def _redistribute(self, obs): ) # Redistribute this temporary observation to be distributed by samples global_intervals = temp_obs.redistribute( - 1, times=self.times, override_sample_sets=None, return_global_intervals=True) + 1, + times=self.times, + override_sample_sets=None, + return_global_intervals=True, + ) if self.view is not None: global_intervals = global_intervals[self.view] log.debug_rank( @@ -318,29 +324,27 @@ def _exec(self, data, detectors=None, **kwargs): log = Logger.get() - # FIXME: The common mode filter would be more efficient if we moved - # this block of code to working on a data view with a single observation - # that is already re-distributed below. if self.focalplane_key is not None: if len(self.pairs) > 0: msg = "focalplane_key is not compatible with pairs" raise RuntimeError(msg) - # Measure the averages of the signal across the focalplane. - Copy(detdata=[(self.det_data, "temp_signal")]).apply(data) - CommonModeFilter( - det_data="temp_signal", - det_mask=self.det_mask, - det_flags=self.det_flags, - det_flag_mask=self.det_flag_mask, - focalplane_key=self.focalplane_key, - ).apply(data) - Combine( - op="subtract", - first=self.det_data, - second="temp_signal", - output=self.det_data, - ).apply(data) - Delete(detdata="temp_signal") + if self.remove_common_mode: + # Measure and subtract the common mode signal across the focalplane. + Copy(detdata=[(self.det_data, "temp_signal")]).apply(data) + CommonModeFilter( + det_data="temp_signal", + det_mask=self.det_mask, + det_flags=self.det_flags, + det_flag_mask=self.det_flag_mask, + focalplane_key=self.focalplane_key, + ).apply(data) + Combine( + op="subtract", + first=self.det_data, + second="temp_signal", + output=self.det_data, + ).apply(data) + Delete(detdata="temp_signal") if self.mapfile is not None: if self.pol: @@ -372,19 +376,11 @@ def _exec(self, data, detectors=None, **kwargs): for orig_obs in data.obs: obs, global_intervals = self._redistribute(orig_obs) - # Get the set of all detectors we are considering for this obs + # Get the set of all detectors we are considering for this obs. Since + # we have already redistributed the data, every process has a time slice + # and the same set of local detectors. local_dets = obs.select_local_detectors(detectors, flagmask=self.det_mask) - if obs.comm.comm_group is not None: - pdets = obs.comm.comm_group.gather(local_dets, root=0) - good_dets = None - if obs.comm.group_rank == 0: - good_dets = set() - for plocal in pdets: - for d in plocal: - good_dets.add(d) - good_dets = obs.comm.comm_group.bcast(good_dets, root=0) - else: - good_dets = set(local_dets) + good_dets = set(local_dets) if self.focalplane_key is not None: # Pick just one detector to represent each key value @@ -392,7 +388,7 @@ def _exec(self, data, detectors=None, **kwargs): det_names = [] key2det = {} det2key = {} - for det in obs.all_detectors: + for det in local_dets: key = fp[det][self.focalplane_key] if key not in key2det: det_names.append(det) @@ -792,26 +788,31 @@ def process_downsampled_noise_estimate( # Extend the local arrays to remove boundary effects from filtering comm = obs.comm_row - extended_times, extended_flags, extended_signal1, extended_signal2 = \ - communicate_overlap( - timestamps_decim, - signal1_decim, - signal2_decim, - flags_decim, - lagmax, - naverage, - comm - ) + ( + extended_times, + extended_flags, + extended_signal1, + extended_signal2, + ) = communicate_overlap( + timestamps_decim, + signal1_decim, + signal2_decim, + flags_decim, + lagmax, + naverage, + comm, + obs.comm.group, + ) # High pass filter the signal to avoid aliasing extended_signal1 = highpass_flagged_signal( extended_signal1, - extended_flags==0, + extended_flags == 0, naverage, ) if signal2 is not None: extended_signal2 = highpass_flagged_signal( extended_signal2, - extended_flags==0, + extended_flags == 0, naverage, ) # Crop the filtering margin but keep up to lagmax samples @@ -936,20 +937,24 @@ def process_noise_estimate( # Extend the local arrays to remove boundary effects from filtering comm = obs.comm_row - extended_times, extended_flags, extended_signal1, extended_signal2 = \ - communicate_overlap( - timestamps, signal1, signal2, flags, lagmax, naverage, comm - ) + ( + extended_times, + extended_flags, + extended_signal1, + extended_signal2, + ) = communicate_overlap( + timestamps, signal1, signal2, flags, lagmax, naverage, comm, obs.comm.group + ) # High pass filter the signal to avoid aliasing extended_signal1 = highpass_flagged_signal( extended_signal1, - extended_flags==0, + extended_flags == 0, naverage, ) if signal2 is not None: extended_signal2 = highpass_flagged_signal( extended_signal2, - extended_flags==0, + extended_flags == 0, naverage, ) # Crop the filtering margin but keep up to lagmax samples diff --git a/src/toast/ops/noise_estimation_utils.py b/src/toast/ops/noise_estimation_utils.py index ff07d2eef..fc9cb4c84 100644 --- a/src/toast/ops/noise_estimation_utils.py +++ b/src/toast/ops/noise_estimation_utils.py @@ -81,7 +81,8 @@ def highpass_flagged_signal(sig, good, naverage): """ ngood = np.sum(good) if ngood == 0: - raise RuntimeError("No valid samples") + # No valid samples + return np.zeros_like(sig) """ De-trending disabled as unnecessary. It also changes results at different concurrencies. @@ -101,8 +102,8 @@ def highpass_flagged_signal(sig, good, naverage): @function_timer -def communicate_overlap(times, signal1, signal2, flags, lagmax, naverage, comm): - """Send and receive TOD to have margin for filtering and lag """ +def communicate_overlap(times, signal1, signal2, flags, lagmax, naverage, comm, group): + """Send and receive TOD to have margin for filtering and lag""" if comm is None: rank = 0 @@ -116,14 +117,6 @@ def communicate_overlap(times, signal1, signal2, flags, lagmax, naverage, comm): nsamp = signal1.size - if lagmax > nsamp and comm is not None and comm.size > 1: - msg = ( - f"communicate_overlap: lagmax = {lagmax} and nsample = {nsamp}. " - f"Communicating TOD beyond nearest neighbors is not " - f"implemented. Reduce lagmax or the size of the MPI communicator." - ) - raise RuntimeError(msg) - half_average = naverage // 2 + 1 nextend_backward = half_average nextend_forward = half_average + lagmax @@ -133,6 +126,15 @@ def communicate_overlap(times, signal1, signal2, flags, lagmax, naverage, comm): nextend_forward = 0 nextend = nextend_backward + nextend_forward + if lagmax + half_average > nsamp and comm is not None and comm.size > 1: + msg = ( + f"communicate_overlap: lagmax + half_average = {lagmax+half_average} and " + f"nsample = {nsamp}. Communicating TOD beyond nearest neighbors is not " + f"implemented. Reduce lagmax, the size of the averaging window, or the " + f"size of the MPI communicator." + ) + raise RuntimeError(msg) + extended_signal1 = np.zeros(nsamp + nextend, dtype=np.float64) if signal2 is None: extended_signal2 = None @@ -151,39 +153,60 @@ def communicate_overlap(times, signal1, signal2, flags, lagmax, naverage, comm): if comm is not None: for evenodd in range(2): if rank % 2 == evenodd % 2: + # Tag offset is based on the sending rank + tag = 8 * (comm.rank + group * comm.size) # Send to rank - 1 if rank != 0: nsend = lagmax + half_average - comm.send(signal1[:nsend], dest=rank - 1, tag=1) + comm.send(signal1[:nsend], dest=rank - 1, tag=tag + 0) if signal2 is not None: - comm.send(signal2[:nsend], dest=rank - 1, tag=2) - comm.send(flags[:nsend], dest=rank - 1, tag=3) - comm.send(times[:nsend], dest=rank - 1, tag=4) + comm.send(signal2[:nsend], dest=rank - 1, tag=tag + 1) + comm.send(flags[:nsend], dest=rank - 1, tag=tag + 2) + comm.send(times[:nsend], dest=rank - 1, tag=tag + 3) # Send to rank + 1 if rank != ntask - 1: nsend = half_average - comm.send(signal1[-nsend:], dest=rank + 1, tag=1) + comm.send(signal1[-nsend:], dest=rank + 1, tag=tag + 4) if signal2 is not None: - comm.send(signal2[-nsend:], dest=rank + 1, tag=2) - comm.send(flags[-nsend:], dest=rank + 1, tag=3) - comm.send(times[-nsend:], dest=rank + 1, tag=4) + comm.send(signal2[-nsend:], dest=rank + 1, tag=tag + 5) + comm.send(flags[-nsend:], dest=rank + 1, tag=tag + 6) + comm.send(times[-nsend:], dest=rank + 1, tag=tag + 7) else: # Receive from rank + 1 if rank != ntask - 1: + tag = 8 * ((comm.rank + 1) + group * comm.size) nrecv = lagmax + half_average - extended_signal1[-nrecv:] = comm.recv(source=rank + 1, tag=1) + extended_signal1[-nrecv:] = comm.recv( + source=rank + 1, tag=tag + 0 + ) if signal2 is not None: - extended_signal2[-nrecv:] = comm.recv(source=rank + 1, tag=2) - extended_flags[-nrecv:] = comm.recv(source=rank + 1, tag=3) - extended_times[-nrecv:] = comm.recv(source=rank + 1, tag=4) + extended_signal2[-nrecv:] = comm.recv( + source=rank + 1, tag=tag + 1 + ) + extended_flags[-nrecv:] = comm.recv( + source=rank + 1, tag=tag + 2 + ) + extended_times[-nrecv:] = comm.recv( + source=rank + 1, tag=tag + 3 + ) # Receive from rank - 1 if rank != 0: + tag = 8 * ((comm.rank - 1) + group * comm.size) nrecv = half_average - extended_signal1[:nrecv] = comm.recv(source=rank - 1, tag=1) + extended_signal1[:nrecv] = comm.recv( + source=rank - 1, tag=tag + 4 + ) if signal2 is not None: - extended_signal2[:nrecv] = comm.recv(source=rank - 1, tag=2) - extended_flags[:nrecv] = comm.recv(source=rank - 1, tag=3) - extended_times[:nrecv] = comm.recv(source=rank - 1, tag=4) + extended_signal2[:nrecv] = comm.recv( + source=rank - 1, tag=tag + 5 + ) + extended_flags[:nrecv] = comm.recv( + source=rank - 1, tag=tag + 6 + ) + extended_times[:nrecv] = comm.recv( + source=rank - 1, tag=tag + 7 + ) + comm.barrier() return extended_times, extended_flags, extended_signal1, extended_signal2 @@ -323,8 +346,9 @@ def crosscov_psd( cov = np.zeros(lagmax, dtype=np.float64) # Process all global intervals that overlap this realization for start_time, stop_time in global_intervals: - if start_time is not None \ - and (start_time > times[-1] or start_time > realtimes[-1]): + if start_time is not None and ( + start_time > times[-1] or start_time > realtimes[-1] + ): # This interval is owned by another process continue if stop_time is not None and stop_time < realtimes[0]: @@ -340,7 +364,7 @@ def crosscov_psd( ind = slice(realsig1.size) else: istart, istop = np.searchsorted(realtimes, [start_time, stop_time]) - ind = slice(istart, istop ) + ind = slice(istart, istop) good = realgood[ind].astype(np.uint8) ngood = np.sum(good) if ngood == 0: diff --git a/src/toast/tests/ops_noise_estim.py b/src/toast/tests/ops_noise_estim.py index cb4954fdc..722600380 100644 --- a/src/toast/tests/ops_noise_estim.py +++ b/src/toast/tests/ops_noise_estim.py @@ -402,7 +402,7 @@ def test_noise_estim(self): shared_flag_mask=1, # view="scanning", output_dir=self.outdir, - lagmax=1000, + lagmax=200, nbin_psd=300, ) estim.apply(data) @@ -417,7 +417,7 @@ def test_noise_estim(self): stokes_weights=weights, # view="scanning", output_dir=self.outdir, - lagmax=1000, + lagmax=200, nbin_psd=300, ) estim.apply(data) @@ -490,7 +490,7 @@ def test_noise_estim_model(self): name="estimate_model", output_dir=self.outdir, out_model="noise_estimate", - lagmax=1000, + lagmax=200, nbin_psd=128, nsum=4, ) From 4c65a5a6672040d2c4c099c4d899c270c30253e9 Mon Sep 17 00:00:00 2001 From: Ted Kisner Date: Thu, 25 Jan 2024 19:59:14 -0800 Subject: [PATCH 4/4] Ignore flagged detectors in poly2d fit --- src/toast/ops/polyfilter/polyfilter.py | 46 +++++++++++--------------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/src/toast/ops/polyfilter/polyfilter.py b/src/toast/ops/polyfilter/polyfilter.py index 8f6a72618..2aa89cbcf 100644 --- a/src/toast/ops/polyfilter/polyfilter.py +++ b/src/toast/ops/polyfilter/polyfilter.py @@ -226,16 +226,12 @@ def _exec(self, data, detectors=None, use_accel=None, **kwargs): # Enumerate detectors to process - # Mapping from detector name to index + # Mapping from good detector name to index detector_index = {y: x for x, y in enumerate(detectors)} - # The integer group ID for a given detector index - group_det = np.array([group_index[x] for x in detectors]) - # Measure offset for each group, translate and scale # detector positions to [-1, 1] - group_offset = {} all_positions = [] for group, detectors_group in groups.items(): ndet_group = len(detectors_group) @@ -269,11 +265,7 @@ def _exec(self, data, detectors=None, use_accel=None, **kwargs): yorders = yorders.ravel() detector_templates = np.zeros([ndet, nmode]) - for det in temp_ob.local_detectors: - if det not in detector_index: - # Skip detectors that were cut by per-detector flags or - # pattern match. - continue + for det in detectors: idet = detector_index[det] theta, phi = detector_position[det] detector_templates[idet] = theta**xorders * phi**yorders @@ -322,16 +314,14 @@ def _exec(self, data, detectors=None, use_accel=None, **kwargs): else: shared_mask = np.ones(nsample, dtype=bool) - for idet, det in enumerate(temp_ob.local_detectors): - if det not in detector_index: - continue + for det in detectors: ind_det = detector_index[det] ind_group = group_index[det] det_groups[ind_det] = ind_group - signal = temp_ob.detdata[self.det_data][idet, vslice] + signal = temp_ob.detdata[self.det_data][det, vslice] if self.det_flags is not None: - det_flags = temp_ob.detdata[self.det_flags][idet, vslice] + det_flags = temp_ob.detdata[self.det_flags][det, vslice] det_mask = (det_flags & self.det_flag_mask) == 0 mask = np.logical_and(shared_mask, det_mask) else: @@ -358,16 +348,15 @@ def _exec(self, data, detectors=None, use_accel=None, **kwargs): gt.start("Poly2D: Update detector flags") for igroup in range(ngroup): - local_dets = temp_ob.local_detectors - dets_in_group = np.zeros(len(local_dets), dtype=bool) - for idet, det in enumerate(local_dets): + dets_in_group = np.zeros(ndet, dtype=bool) + for idet, det in enumerate(detectors): if group_index[det] == igroup: dets_in_group[idet] = True if not np.any(dets_in_group): continue if self.det_flags is not None: sample_flags = np.ones( - len(local_dets), + ndet, dtype=temp_ob.detdata[self.det_flags].dtype, ) sample_flags *= self.poly_flag_mask @@ -375,7 +364,7 @@ def _exec(self, data, detectors=None, use_accel=None, **kwargs): for isample in range(nsample): if np.all(coeff[isample, igroup] == 0): temp_ob.detdata[self.det_flags][ - :, view.first + isample + detectors, view.first + isample ] |= sample_flags gt.stop("Poly2D: Update detector flags") @@ -386,12 +375,10 @@ def _exec(self, data, detectors=None, use_accel=None, **kwargs): np.array(coeff), [1, 0, 2] ) # ngroup x nsample x nmode trmasks = np.array(masks).T # ndet x nsample - for idet, det in enumerate(temp_ob.local_detectors): - if det not in detector_index: - continue + for idet, det in enumerate(detectors): igroup = group_index[det] ind = detector_index[det] - signal = temp_ob.detdata[self.det_data][idet, vslice] + signal = temp_ob.detdata[self.det_data][det, vslice] mask = trmasks[idet] signal -= np.sum(trcoeff[igroup] * templates[ind], 1) * mask @@ -406,9 +393,14 @@ def _exec(self, data, detectors=None, use_accel=None, **kwargs): # Copy data to original observation gt.start("Poly2D: Copy output") - obs.detdata[self.det_data][:] = temp_ob.detdata[self.det_data][:] - if self.det_flags is not None: - obs.detdata[self.det_flags][:] = temp_ob.detdata[self.det_flags][:] + for det in obs.select_local_detectors( + selection=None, flagmask=self.det_mask + ): + obs.detdata[self.det_data][det] = temp_ob.detdata[self.det_data][det] + if self.det_flags is not None: + obs.detdata[self.det_flags][det] = temp_ob.detdata[self.det_flags][ + det + ] gt.stop("Poly2D: Copy output") # Free data copy