Skip to content

Commit

Permalink
Improvements to AzimuthIntervals (#800)
Browse files Browse the repository at this point in the history
* Improvements to AzimuthIntervals

Attempt to better handle azimuth data that is flagged
and data when the telescope is not scanning.

* Improve robustness of numerical derivative in presence of noise and flags.  Add unit test.

* Tweak polynomial order in turnaround finding

* Fix bug when skipping zero-length objects from numpy.split

* Do not purge original data in demodulation unit tests, since we are making comparison plots of the original.

* Fix typo

* Pull common gap filling code out of multiple operators and place it in utils.

* Small tweak to support saving / loading jump information

* Fix a typo and use a better set of parameters for one case of gap filling.

* Add small operator to wrap the gap filling utility.  Change deglitch defaults to not wipe exisiting detector flags.

* Fix typo

* Debug test failure

* Add unit test for gap filling.  Also fix a flag interpretation issue.

* Add missing file

* Tweak noise levels in azimuth data in unit tests
  • Loading branch information
tskisner authored Nov 20, 2024
1 parent fb57958 commit 92b4eed
Show file tree
Hide file tree
Showing 16 changed files with 1,061 additions and 282 deletions.
21 changes: 12 additions & 9 deletions src/toast/intervals.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,18 @@ def __init__(self, timestamps, intervals=None, timespans=None, samplespans=None)
raise RuntimeError(
"If constructing from intervals, other spans should be None"
)
timespans = [(x.start, x.stop) for x in intervals]
indices = self._find_indices(timespans)
self.data = np.array(
[
(self.timestamps[x[0]], self.timestamps[x[1]], x[0], x[1])
for x in indices
],
dtype=interval_dtype,
).view(np.recarray)
if len(intervals) == 0:
self.data = np.zeros(0, dtype=interval_dtype).view(np.recarray)
else:
timespans = [(x.start, x.stop) for x in intervals]
indices = self._find_indices(timespans)
self.data = np.array(
[
(self.timestamps[x[0]], self.timestamps[x[1]], x[0], x[1])
for x in indices
],
dtype=interval_dtype,
).view(np.recarray)
elif timespans is not None:
if samplespans is not None:
raise RuntimeError("Cannot construct from both time and sample spans")
Expand Down
1 change: 1 addition & 0 deletions src/toast/ops/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ install(FILES
delete.py
copy.py
reset.py
fill_gaps.py
arithmetic.py
memory_counter.py
simple_deglitch.py
Expand Down
1 change: 1 addition & 0 deletions src/toast/ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from .delete import Delete
from .demodulation import Demodulate, StokesWeightsDemod
from .elevation_noise import ElevationNoise
from .fill_gaps import FillGaps
from .filterbin import FilterBin, combine_observation_matrix
from .flag_intervals import FlagIntervals
from .flag_sso import FlagSSO
Expand Down
538 changes: 351 additions & 187 deletions src/toast/ops/azimuth_intervals.py

Large diffs are not rendered by default.

11 changes: 3 additions & 8 deletions src/toast/ops/demodulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,18 +309,13 @@ def _exec(self, data, detectors=None, **kwargs):
self.demod_data.obs.append(demod_obs)

if self.purge:
if self.shared_flags is not None:
del obs.shared[self.shared_flags]
for det_data in self.det_data.split(";"):
del obs.detdata[det_data]
if self.det_flags is not None:
del obs.detdata[self.det_flags]
if self.noise_model is not None:
del obs[self.noise_model]
obs.clear()

log.debug_rank(
"Demodulated observation in", comm=data.comm.comm_group, timer=timer
)
if self.purge:
data.clear()

return

Expand Down
183 changes: 183 additions & 0 deletions src/toast/ops/fill_gaps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# Copyright (c) 2024-2024 by the parties listed in the AUTHORS file.
# All rights reserved. Use of this source code is governed by
# a BSD-style license that can be found in the LICENSE file.
import os

import numpy as np
import traitlets
from astropy import units as u

from ..observation import default_values as defaults
from ..timing import Timer, function_timer
from ..traits import Int, Quantity, Unicode, trait_docs
from ..utils import Environment, Logger, flagged_noise_fill
from .operator import Operator


@trait_docs
class FillGaps(Operator):
"""Operator that fills flagged samples with noise.
Currently this operator just fills flagged samples with a simple polynomial
plus white noise. It is mostly used for visualization. No attempt is made
yet to fill the gaps with a constrained noise realization.
"""

# Class traits

API = Int(0, help="Internal interface version for this operator")

times = Unicode(defaults.times, help="Observation shared key for timestamps")

det_data = Unicode(
defaults.det_data,
help="Observation detdata key",
)

det_mask = Int(
defaults.det_mask_invalid,
help="Bit mask value for per-detector flagging",
)

shared_flags = Unicode(
defaults.shared_flags,
allow_none=True,
help="Observation shared key for telescope flags to use",
)

shared_flag_mask = Int(
defaults.shared_mask_invalid,
help="Bit mask value for optional shared flagging",
)

det_flags = Unicode(
defaults.det_flags,
allow_none=True,
help="Observation detdata key for flags to use",
)

det_flag_mask = Int(
defaults.det_mask_invalid,
help="Bit mask value for detector sample flagging",
)

buffer = Quantity(
1.0 * u.s,
help="Buffer of time on either side of each gap",
)

poly_order = Int(
1,
help="Order of the polynomial to fit across each gap",
)

@traitlets.validate("poly_order")
def _check_poly_order(self, proposal):
check = proposal["value"]
if check <= 0:
raise traitlets.TraitError("poly_order should be >= 1")
return check

@traitlets.validate("det_mask")
def _check_det_mask(self, proposal):
check = proposal["value"]
if check < 0:
raise traitlets.TraitError("Det mask should be a positive integer")
return check

@traitlets.validate("det_flag_mask")
def _check_det_flag_mask(self, proposal):
check = proposal["value"]
if check < 0:
raise traitlets.TraitError("Flag mask should be a positive integer")
return check

@traitlets.validate("shared_flag_mask")
def _check_shared_flag_mask(self, proposal):
check = proposal["value"]
if check < 0:
raise traitlets.TraitError("Flag mask should be a positive integer")
return check

def __init__(self, **kwargs):
super().__init__(**kwargs)

@function_timer
def _exec(self, data, detectors=None, **kwargs):
env = Environment.get()
log = Logger.get()

for ob in data.obs:
timer = Timer()
timer.start()

# Sample rate for this observation
rate = ob.telescope.focalplane.sample_rate.to_value(u.Hz)

# The buffer size in samples
buf_samp = int(self.buffer.to_value(u.second) * rate)

# Check that parameters make sense
if self.poly_order > buf_samp + 1:
msg = f"Cannot fit an order {self.poly_order} polynomial "
msg += f"to {buf_samp} samples"
raise RuntimeError(msg)

if buf_samp > ob.n_local_samples // 4:
msg = f"Using {buf_samp} samples of buffer around gaps is"
msg += f" not reasonable for an observation with {ob.n_local_samples}"
msg += " local samples"
raise RuntimeError(msg)

# Local detectors we are considering
local_dets = ob.select_local_detectors(flagmask=self.det_mask)
n_dets = len(local_dets)

# The shared flags
if self.shared_flags is None:
shared_flags = np.zeros(ob.n_local_samples, dtype=bool)
else:
shared_flags = (
ob.shared[self.shared_flags].data & self.shared_flag_mask
) != 0

for idet, det in enumerate(local_dets):
if self.det_flags is None:
flags = shared_flags
else:
flags = np.logical_or(
shared_flags,
(ob.detdata[self.det_flags][det, :] & self.det_flag_mask) != 0,
)
flagged_noise_fill(
ob.detdata[self.det_data][det],
flags,
buf_samp,
poly_order=self.poly_order,
)
msg = f"FillGaps {ob.name}: completed in"
log.debug_rank(msg, comm=data.comm.comm_group, timer=timer)

def _finalize(self, data, **kwargs):
return

def _requires(self):
# Note that the hwp_angle is not strictly required- this
# is just a no-op.
req = {
"shared": [self.times],
"detdata": [self.det_data],
}
if self.shared_flags is not None:
req["shared"].append(self.shared_flags)
if self.det_flags is not None:
req["detdata"].append(self.det_flags)
return req

def _provides(self):
prov = {
"meta": [],
"detdata": [self.det_data],
}
return prov
13 changes: 9 additions & 4 deletions src/toast/ops/flag_intervals.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,15 @@ def _exec(self, data, detectors=None, **kwargs):
if ob.comm_col_rank == 0:
new_flags = np.array(ob.shared[self.shared_flags])
for vname, vmask in self.view_mask:
views = ob.view[vname]
for vw in views:
# Note that a View acts like a slice
new_flags[vw] |= vmask
if vname in ob.intervals:
views = ob.view[vname]
for vw in views:
# Note that a View acts like a slice
new_flags[vw] |= vmask
else:
msg = f"Intervals '{vname}' does not exist in {ob.name}"
msg += " skipping flagging"
log.warning(msg)
ob.shared[self.shared_flags].set(new_flags, offset=(0,), fromrank=0)

def _finalize(self, data, **kwargs):
Expand Down
41 changes: 11 additions & 30 deletions src/toast/ops/hwpss_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from ..observation import default_values as defaults
from ..timing import Timer, function_timer
from ..traits import Bool, Int, Quantity, Unicode, Float, trait_docs
from ..utils import Environment, Logger
from ..utils import Environment, Logger, flagged_noise_fill
from .operator import Operator


Expand Down Expand Up @@ -126,7 +126,7 @@ class HWPSynchronousModel(Operator):

time_drift = Bool(False, help="If True, include time drift terms in the model")

fill_gaps = Bool(False, help="If True, fit a simple line across gaps")
fill_gaps = Bool(False, help="If True, fill gaps with a simple noise model")

debug = Unicode(
None,
Expand Down Expand Up @@ -334,7 +334,15 @@ def _exec(self, data, detectors=None, **kwargs):
dc = np.mean(ob.detdata[self.det_data][det][good])
ob.detdata[self.det_data][det][good] -= dc
if self.fill_gaps:
self._fill_gaps(ob, det, det_flags[det])
rate = ob.telescope.focalplane.sample_rate.to_value(u.Hz)
# 1 second buffer
buffer = int(rate)
flagged_noise_fill(
ob.detdata[self.det_data][det],
det_flags[det],
buffer,
poly_order=1,
)
if self.relcal_continuous is not None:
ob.detdata[self.relcal_continuous][det, :] = cal_center / det_mag

Expand Down Expand Up @@ -925,33 +933,6 @@ def _stopped_flags(self, obs):
stopped = np.array(unstable, dtype=np.uint8)
return stopped

def _fill_gaps(self, obs, det, flags):
# Fill gaps with a line, just to kill large artifacts in flagged
# regions after removal of the HWPSS. This is mostly just for visualization.
# Downstream codes should ignore these flagged samples anyway.
sig = obs.detdata[self.det_data][det]
flag_indx = np.arange(len(flags), dtype=np.int64)[np.nonzero(flags)]
flag_groups = np.split(flag_indx, np.where(np.diff(flag_indx) != 1)[0] + 1)
for grp in flag_groups:
if len(grp) == 0:
continue
bad_first = grp[0]
bad_last = grp[-1]
if bad_first == 0:
# Starting bad samples
sig[: bad_last + 1] = sig[bad_last + 1]
elif bad_last == len(flags) - 1:
# Ending bad samples
sig[bad_first:] = sig[bad_first - 1]
else:
int_first = bad_first - 1
int_last = bad_last + 1
sig[bad_first : bad_last + 1] = np.interp(
np.arange(bad_first, bad_last + 1, 1, dtype=np.int32),
[int_first, int_last],
[sig[int_first], sig[int_last]],
)

def _finalize(self, data, **kwargs):
return

Expand Down
24 changes: 11 additions & 13 deletions src/toast/ops/simple_deglitch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from ..observation import default_values as defaults
from ..timing import Timer, function_timer
from ..traits import Bool, Float, Instance, Int, Quantity, Unicode, trait_docs
from ..utils import Environment, Logger, name_UID
from ..utils import Environment, Logger, name_UID, flagged_noise_fill
from .operator import Operator


Expand Down Expand Up @@ -48,7 +48,7 @@ class SimpleDeglitch(Operator):
)

reset_det_flags = Bool(
True,
False,
help="Replace existing detector flags",
)

Expand Down Expand Up @@ -204,17 +204,15 @@ def _exec(self, data, detectors=None, **kwargs):
continue
bad_view = np.isnan(sig_view)
det_flags[ind][bad_view] |= self.glitch_mask
if self.fill_gaps:
nbad = np.sum(bad_view)
corrected_signal = sig[ind].copy()
corrected_signal[bad_view] = trend[bad_view]
corrected_signal[bad_view] += np.random.randn(nbad) * rms
# DEBUG begin
# import pdb
# import matplotlib.pyplot as plt
# pdb.set_trace()
# DEBUG end
sig[ind] = corrected_signal
if self.fill_gaps:
# 1 second buffer
buffer = int(focalplane.sample_rate.to_value(u.Hz))
flagged_noise_fill(
sig,
det_flags,
buffer,
poly_order=1,
)

return

Expand Down
Loading

0 comments on commit 92b4eed

Please sign in to comment.