Skip to content

Commit

Permalink
Merge pull request #2598 from cta-observatory/input_reference_meta
Browse files Browse the repository at this point in the history
Input reference meta
  • Loading branch information
kosack authored Aug 7, 2024
2 parents 6758c1b + 819dc87 commit 4fd7ab7
Show file tree
Hide file tree
Showing 13 changed files with 204 additions and 109 deletions.
2 changes: 2 additions & 0 deletions docs/changes/2598.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The provenance system now records the reference metadata
of input and output files, if available.
48 changes: 40 additions & 8 deletions src/ctapipe/core/provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import platform
import sys
import uuid
import warnings
from collections import UserList
from contextlib import contextmanager
from importlib import import_module
Expand Down Expand Up @@ -57,6 +58,10 @@ def get_module_version(name):
return "not installed"


class MissingReferenceMetadata(UserWarning):
"""Warning raised if reference metadata could not be read from input file."""


class Provenance(metaclass=Singleton):
"""
Manage the provenance info for a stack of *activities*
Expand Down Expand Up @@ -91,7 +96,7 @@ def _get_current_or_start_activity(self):

return self.current_activity

def add_input_file(self, filename, role=None):
def add_input_file(self, filename, role=None, add_meta=True):
"""register an input to the current activity
Parameters
Expand All @@ -102,14 +107,14 @@ def add_input_file(self, filename, role=None):
role this input file satisfies (optional)
"""
activity = self._get_current_or_start_activity()
activity.register_input(abspath(filename), role=role)
activity.register_input(abspath(filename), role=role, add_meta=add_meta)
log.debug(
"added input entity '%s' to activity: '%s'",
filename,
activity.name,
)

def add_output_file(self, filename, role=None):
def add_output_file(self, filename, role=None, add_meta=True):
"""
register an output to the current activity
Expand All @@ -122,7 +127,7 @@ def add_output_file(self, filename, role=None):
"""
activity = self._get_current_or_start_activity()
activity.register_output(abspath(filename), role=role)
activity.register_output(abspath(filename), role=role, add_meta=add_meta)
log.debug(
"added output entity '%s' to activity: '%s'",
filename,
Expand Down Expand Up @@ -239,7 +244,7 @@ def start(self):
self._prov["start"].update(_sample_cpu_and_memory())
self._prov["system"].update(_get_system_provenance())

def register_input(self, url, role=None):
def register_input(self, url, role=None, add_meta=True):
"""
Add a URL of a file to the list of inputs (can be a filename or full
url, if no URL specifier is given, assume 'file://')
Expand All @@ -250,22 +255,37 @@ def register_input(self, url, role=None):
filename or url of input file
role: str
role name that this input satisfies
add_meta: bool
If true, try to load reference metadata from input file
and add to provenance.
"""
self._prov["input"].append(dict(url=url, role=role))
reference_meta = self._get_reference_meta(url=url) if add_meta else None
self._prov["input"].append(
dict(url=url, role=role, reference_meta=reference_meta)
)

def register_output(self, url, role=None):
def register_output(self, url, role=None, add_meta=True):
"""
Add a URL of a file to the list of outputs (can be a filename or full
url, if no URL specifier is given, assume 'file://')
Should only be called once the file is finalized, so that reference metadata
can be read.
Parameters
----------
url: str
filename or url of output file
role: str
role name that this output satisfies
add_meta: bool
If true, try to load reference metadata from input file
and add to provenance.
"""
self._prov["output"].append(dict(url=url, role=role))
reference_meta = self._get_reference_meta(url=url) if add_meta else None
self._prov["output"].append(
dict(url=url, role=role, reference_meta=reference_meta)
)

def register_config(self, config):
"""add a dictionary of configuration parameters to this activity"""
Expand Down Expand Up @@ -302,6 +322,18 @@ def sample_cpu_and_memory(self):
def provenance(self):
return self._prov

def _get_reference_meta(self, url):
# here to prevent circular imports / top-level cross-dependencies
from ..io.metadata import read_reference_metadata

try:
return read_reference_metadata(url).to_dict()
except Exception:
warnings.warn(
f"Could not read reference metadata for input file: {url}",
MissingReferenceMetadata,
)


def _get_python_packages():
def _sortkey(dist):
Expand Down
34 changes: 25 additions & 9 deletions src/ctapipe/core/tests/test_provenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from ctapipe.core import Provenance
from ctapipe.core.provenance import _ActivityProvenance
from ctapipe.io.metadata import Reference


@pytest.fixture
Expand All @@ -15,19 +16,18 @@ def provenance(monkeypatch):
prov = Provenance()
monkeypatch.setattr(prov, "_activities", [])
monkeypatch.setattr(prov, "_finished_activities", [])

prov.start_activity("test1")
prov.add_input_file("input.txt")
prov.add_output_file("output.txt")
prov.start_activity("test2")
prov.add_input_file("input_a.txt")
prov.add_input_file("input_b.txt")
prov.finish_activity("test2")
prov.finish_activity("test1")
return prov


def test_provenance_activity_names(provenance):
provenance.start_activity("test1")
provenance.add_input_file("input.txt")
provenance.add_output_file("output.txt")
provenance.start_activity("test2")
provenance.add_input_file("input_a.txt")
provenance.add_input_file("input_b.txt")
provenance.finish_activity("test2")
provenance.finish_activity("test1")
assert set(provenance.finished_activity_names) == {"test2", "test1"}


Expand All @@ -52,6 +52,8 @@ def test_provenence_contextmanager():


def test_provenance_json(provenance: Provenance):
provenance.start_activity("test1")
provenance.finish_activity("test1")
data = json.loads(provenance.as_json())

activity = data[0]
Expand All @@ -60,3 +62,17 @@ def test_provenance_json(provenance: Provenance):
packages = activity["system"]["python"].get("packages")
assert isinstance(packages, list)
assert any(p["name"] == "numpy" for p in packages)


def test_provenance_input_reference_meta(provenance: Provenance, dl1_file):
provenance.start_activity("test1")
provenance.add_input_file(dl1_file, "events")
provenance.finish_activity("test1")
data = json.loads(provenance.as_json())

inputs = data[0]["input"]
assert len(inputs) == 1
input_meta = inputs[0]
assert "reference_meta" in input_meta
assert "CTA PRODUCT ID" in input_meta["reference_meta"]
Reference.from_dict(input_meta["reference_meta"])
2 changes: 1 addition & 1 deletion src/ctapipe/core/tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def load_config_file(self, path: str | pathlib.Path) -> None:
# fall back to traitlets.config.Application's implementation
super().load_config_file(str(path))

Provenance().add_input_file(path, role="Tool Configuration")
Provenance().add_input_file(path, role="Tool Configuration", add_meta=False)

def update_logging_config(self):
"""Update the configuration of loggers."""
Expand Down
2 changes: 1 addition & 1 deletion src/ctapipe/io/datawriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ def finish(self):

self._write_context_metadata_headers()
self._writer.close()
PROV.add_output_file(str(self.output_path), role="DL1/Event")

@property
def datalevels(self):
Expand Down Expand Up @@ -432,7 +433,6 @@ def _setup_output_path(self):
", use the `overwrite` option or choose another `output_path` "
)
self.log.debug("output path: %s", self.output_path)
PROV.add_output_file(str(self.output_path), role="DL1/Event")

# check that options make sense
writable_things = [
Expand Down
4 changes: 2 additions & 2 deletions src/ctapipe/io/hdf5merger.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ def __init__(self, output_path=None, **kwargs):
mode="a" if appending else "w",
filters=DEFAULT_FILTERS,
)
Provenance().add_output_file(str(self.output_path))

self.required_nodes = None
self.data_model_version = None
Expand Down Expand Up @@ -247,7 +246,7 @@ def _update_meta(self):

def _read_meta(self, h5file):
try:
return metadata.Reference.from_dict(metadata.read_metadata(h5file))
return metadata._read_reference_metadata_hdf5(h5file)
except Exception:
raise CannotMerge(
f"CTA Reference meta not found in input file: {h5file.filename}"
Expand Down Expand Up @@ -384,6 +383,7 @@ def __exit__(self, exc_type, exc_value, traceback):
def close(self):
if hasattr(self, "h5file"):
self.h5file.close()
Provenance().add_output_file(str(self.output_path))

def _append_subarray(self, other):
# focal length choice doesn't matter here, set to equivalent so we don't get
Expand Down
83 changes: 69 additions & 14 deletions src/ctapipe/io/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
some_astropy_table.write("output.ecsv")
"""
import gzip
import os
import uuid
import warnings
from collections import OrderedDict, defaultdict
from contextlib import ExitStack

import tables
from astropy.io import fits
from astropy.table import Table
from astropy.time import Time
from tables import NaturalNameWarning
from traitlets import Enum, HasTraits, Instance, List, Unicode, UseEnum, default
Expand All @@ -45,7 +48,8 @@
"Activity",
"Instrument",
"write_to_hdf5",
"read_metadata",
"read_hdf5_metadata",
"read_reference_metadata",
]


Expand Down Expand Up @@ -295,6 +299,12 @@ def from_dict(cls, metadata):
instrument=Instrument(**kwargs["instrument"]),
)

@classmethod
def from_fits(cls, header):
# for now, just use from_dict, but we might need special handling
# of some keys
return cls.from_dict(header)

def __repr__(self):
return str(self.to_dict())

Expand All @@ -320,27 +330,72 @@ def write_to_hdf5(metadata, h5file, path="/"):
node._v_attrs[key] = value # pylint: disable=protected-access


def read_metadata(h5file, path="/"):
def read_hdf5_metadata(h5file, path="/"):
"""
Read hdf5 attributes into a dict
"""
with ExitStack() as stack:
if not isinstance(h5file, tables.File):
h5file = stack.enter_context(tables.open_file(h5file))

node = h5file.get_node(path)
return {key: node._v_attrs[key] for key in node._v_attrs._f_list()}


def read_reference_metadata(path):
"""Read CTAO data product metadata from path
File is first opened to determine file format, then the metadata
is read. Supported are currently FITS and HDF5.
"""
Read metadata from an hdf5 file
header_bytes = 8
with open(path, "rb") as f:
first_bytes = f.read(header_bytes)

if first_bytes.startswith(b"\x1f\x8b"):
with gzip.open(path, "rb") as f:
first_bytes = f.read(header_bytes)

if first_bytes.startswith(b"\x89HDF"):
return _read_reference_metadata_hdf5(path)

if first_bytes.startswith(b"SIMPLE"):
return _read_reference_metadata_fits(path)

if first_bytes.startswith(b"# %ECSV"):
return Reference.from_dict(Table.read(path).meta)

raise ValueError(
f"'{path}' is not one of the supported file formats: fits, hdf5, ecsv"
)


def _read_reference_metadata_hdf5(h5file, path="/"):
meta = read_hdf5_metadata(h5file, path)
return Reference.from_dict(meta)


def _read_reference_metadata_ecsv(path):
return Reference.from_dict(Table.read(path).meta)


def _read_reference_metadata_fits(fitsfile, hdu: int | str = 0):
"""
Read reference metadata from a fits file
Parameters
----------
h5filename: string, Path, or `tables.file.File`
fitsfile: string, Path, or `tables.file.File`
hdf5 file
path: string
default: '/' is the path to ctapipe global metadata
hdu: int or str
HDU index or name.
Returns
-------
metadata: dictionary
reference_metadata: Reference
"""
with ExitStack() as stack:
if not isinstance(h5file, tables.File):
h5file = stack.enter_context(tables.open_file(h5file))

node = h5file.get_node(path)
metadata = {key: node._v_attrs[key] for key in node._v_attrs._f_list()}
return metadata
if not isinstance(fitsfile, fits.HDUList):
fitsfile = stack.enter_context(fits.open(fitsfile))

raise OSError("Could not read metadata")
return Reference.from_fits(fitsfile[hdu].header)
6 changes: 5 additions & 1 deletion src/ctapipe/io/simteleventsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,12 @@ def read_atmosphere_profile_from_simtel(

if isinstance(simtelfile, str | Path):
context_manager = SimTelFile(simtelfile)
# FIXME: simtel files currently do not have CTAO reference
# metadata, should be set to True once we store metadata
Provenance().add_input_file(
filename=simtelfile, role="ctapipe.atmosphere.AtmosphereDensityProfile"
filename=simtelfile,
role="ctapipe.atmosphere.AtmosphereDensityProfile",
add_meta=False,
)

else:
Expand Down
Loading

0 comments on commit 4fd7ab7

Please sign in to comment.