Skip to content

Commit

Permalink
Extract HDF5 observation metadata loading into a separate function (#699
Browse files Browse the repository at this point in the history
)

* Extract HDF5 observation metadata loading into a separate function

* Run format_source
  • Loading branch information
tskisner authored Sep 13, 2023
1 parent cde33e3 commit 485736a
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 99 deletions.
2 changes: 1 addition & 1 deletion src/toast/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@

from .compression import compress_detdata, decompress_detdata
from .hdf_utils import H5File, have_hdf5_parallel, hdf5_config, hdf5_open
from .observation_hdf_load import load_hdf5
from .observation_hdf_load import load_hdf5, load_hdf5_obs_meta
from .observation_hdf_save import save_hdf5
220 changes: 125 additions & 95 deletions src/toast/io/observation_hdf_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,111 +443,21 @@ def load_hdf5_intervals(obs, hgrp, times, fields, log_prefix, parallel):


@function_timer
def load_hdf5(
path,
def load_hdf5_obs_meta(
comm,
process_rows=None,
hgroup,
parallel=False,
log_prefix="",
meta=None,
detdata=None,
shared=None,
intervals=None,
detectors=None,
force_serial=False,
process_rows=None,
):
"""Load an HDF5 observation.
By default, all detdata, shared, intervals, and noise models are loaded into
memory. A subset of objects may be specified with a list of names
passed to the corresponding function arguments.
Args:
path (str): The path to the file on disk.
comm (toast.Comm): The toast communicator to use.
process_rows (int): (Optional) The size of the rectangular process grid
in the detector direction. This number must evenly divide into the size of
comm. If not specified, defaults to the size of the communicator.
meta (list): Only load this list of metadata objects.
detdata (list): Only load this list of detdata objects.
shared (list): Only load this list of shared objects.
intervals (list): Only load this list of intervals objects.
detectors (list): Only load this list of detectors from all detector data
objects.
force_serial (bool): If True, do not use HDF5 parallel support,
even if it is available.
Returns:
(Observation): The constructed observation.
"""
log = Logger.get()
env = Environment.get()

rank = comm.group_rank
nproc = comm.group_size
if nproc == 1:
# Force serial usage in this case, to avoid any MPI overhead
force_serial = True

timer = Timer()
timer.start()
log_prefix = f"HDF5 load {os.path.basename(path)}: "

# Open the file and get the root group.
hf = None
hfgroup = None

parallel, _, _ = hdf5_config(comm=comm.comm_group, force_serial=force_serial)
if (
(not parallel)
and (process_rows is not None)
and (process_rows != comm.group_size)
):
msg = f"When loading observations with serial HDF5, process_rows must equal "
msg += "the group size"
log.error(msg)
raise RuntimeError(msg)

hf = hdf5_open(path, "r", comm=comm.comm_group, force_serial=force_serial)
hgroup = hf

log.debug_rank(
f"{log_prefix} Opened file {path} in",
comm=comm.comm_group,
timer=timer,
)

# The rank zero process gets the file format version and communicates to all
# processes in the group, regardless of whether they are participating in
# the load.
file_version = None
if rank == 0:
# Data format version check
file_version = int(hgroup.attrs["toast_format_version"])
if comm.comm_group is not None:
file_version = comm.comm_group.bcast(file_version, root=0)

# As the file format evolves, we might close the file at this point and call
# an earlier version of the loader. However, v0 and v1 only differ in the
# detector data loading, so we can just branch at that point.
#
# Example for future:
# if file_version == 12345:
# # Close file and call older version
# del hgroup
# if hf is not None:
# hf.close()
# del hf
# return load_hdf5_v12345(...)
#
if file_version == 0 and detectors is not None:
msg = f"HDF5 file '{path}' uses format v0 which does not support loading"
msg = " a subset of detectors"
log.error(msg)
raise RuntimeError(msg)
if file_version > 1:
msg = f"HDF5 file '{path}' using unsupported data format {file_version}"
log.error(msg)
raise RuntimeError(msg)

telescope = None
obs_samples = None
Expand Down Expand Up @@ -810,6 +720,126 @@ def load_hdf5(
comm=comm.comm_group,
timer=timer,
)
return obs


@function_timer
def load_hdf5(
path,
comm,
process_rows=None,
meta=None,
detdata=None,
shared=None,
intervals=None,
detectors=None,
force_serial=False,
):
"""Load an HDF5 observation.
By default, all detdata, shared, intervals, and noise models are loaded into
memory. A subset of objects may be specified with a list of names
passed to the corresponding function arguments.
Args:
path (str): The path to the file on disk.
comm (toast.Comm): The toast communicator to use.
process_rows (int): (Optional) The size of the rectangular process grid
in the detector direction. This number must evenly divide into the size of
comm. If not specified, defaults to the size of the communicator.
meta (list): Only load this list of metadata objects.
detdata (list): Only load this list of detdata objects.
shared (list): Only load this list of shared objects.
intervals (list): Only load this list of intervals objects.
detectors (list): Only load this list of detectors from all detector data
objects.
force_serial (bool): If True, do not use HDF5 parallel support,
even if it is available.
Returns:
(Observation): The constructed observation.
"""
log = Logger.get()
env = Environment.get()

rank = comm.group_rank
nproc = comm.group_size
if nproc == 1:
# Force serial usage in this case, to avoid any MPI overhead
force_serial = True

timer = Timer()
timer.start()
log_prefix = f"HDF5 load {os.path.basename(path)}: "

# Open the file and get the root group.
hf = None
hfgroup = None

parallel, _, _ = hdf5_config(comm=comm.comm_group, force_serial=force_serial)
if (
(not parallel)
and (process_rows is not None)
and (process_rows != comm.group_size)
):
msg = f"When loading observations with serial HDF5, process_rows must equal "
msg += "the group size"
log.error(msg)
raise RuntimeError(msg)

hf = hdf5_open(path, "r", comm=comm.comm_group, force_serial=force_serial)
hgroup = hf

log.debug_rank(
f"{log_prefix} Opened file {path} in",
comm=comm.comm_group,
timer=timer,
)

# The rank zero process gets the file format version and communicates to all
# processes in the group, regardless of whether they are participating in
# the load.
file_version = None
if rank == 0:
# Data format version check
file_version = int(hgroup.attrs["toast_format_version"])
if comm.comm_group is not None:
file_version = comm.comm_group.bcast(file_version, root=0)

# As the file format evolves, we might close the file at this point and call
# an earlier version of the loader. However, v0 and v1 only differ in the
# detector data loading, so we can just branch at that point.
#
# Example for future:
# if file_version == 12345:
# # Close file and call older version
# del hgroup
# if hf is not None:
# hf.close()
# del hf
# return load_hdf5_v12345(...)
#
if file_version == 0 and detectors is not None:
msg = f"HDF5 file '{path}' uses format v0 which does not support loading"
msg = " a subset of detectors"
log.error(msg)
raise RuntimeError(msg)
if file_version > 1:
msg = f"HDF5 file '{path}' using unsupported data format {file_version}"
log.error(msg)
raise RuntimeError(msg)

# Load all metadata into an empty Observation
obs = load_hdf5_obs_meta(
comm,
hgroup,
parallel=parallel,
log_prefix="",
meta=meta,
detectors=detectors,
process_rows=process_rows,
)

# Load shared data

Expand Down
4 changes: 1 addition & 3 deletions src/toast/spt3g/spt3g_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,7 @@ def __call__(self, frames):
from_g3_time(frm["site_weather_time"]),
tz=datetime.timezone.utc,
)
weather_uid = from_g3_scalar_type(
frm["site_weather_uid"]
)
weather_uid = from_g3_scalar_type(frm["site_weather_uid"])
weather_median = from_g3_scalar_type(
frm["site_weather_use_median"]
)
Expand Down

0 comments on commit 485736a

Please sign in to comment.