diff --git a/python/ngen_cal/src/ngen/cal/_hookspec.py b/python/ngen_cal/src/ngen/cal/_hookspec.py index c046dbc6..ef1e5bf6 100644 --- a/python/ngen_cal/src/ngen/cal/_hookspec.py +++ b/python/ngen_cal/src/ngen/cal/_hookspec.py @@ -7,9 +7,13 @@ from ngen.cal import PROJECT_SLUG if TYPE_CHECKING: + from datetime import datetime + + import pandas as pd + from hypy.nexus import Nexus + from ngen.cal.configuration import General from ngen.cal.meta import JobMeta - from pandas import Series hookspec = pluggy.HookspecMarker(PROJECT_SLUG) @@ -45,20 +49,41 @@ def ngen_cal_finish(exception: Exception | None) -> None: `exception` will be non-none if an exception was raised during calibration. """ + class ModelHooks: @hookspec(firstresult=True) - def ngen_cal_model_output(self, id: str | None) -> Series: + def ngen_cal_model_observations( + self, + nexus: Nexus, + start_time: datetime, + end_time: datetime, + simulation_interval: pd.Timedelta, + ) -> pd.Series: """ - Called during each calibration iteration to provide the model output in the form - of a pandas Series, indexed by time. - Output series should be in units of cubic meters per second. + Called during each calibration iteration to provide truth / observation + values in the form of a pandas Series, indexed by time with a record + every `simulation_interval`. + The returned pandas Series should be in units of cubic meters per + second. + + `nexus`: HY_Features Nexus + `start_time`, `end_time`: inclusive simulation time range + `simulation_interval`: time (distance) between simulation values + """ + + @hookspec(firstresult=True) + def ngen_cal_model_output(self, id: str | None) -> pd.Series: + """ + Called during each calibration iteration to provide the model output in + the form of a pandas Series, indexed by time. + Output series should be in units of cubic meters per second. """ @hookspec def ngen_cal_model_iteration_finish(self, iteration: int, info: JobMeta) -> None: """ - Called after each model iteration is completed and evaluated. - And before the next iteration is configured and started. - Currently called at the end of an Adjustable's check_point function - which writes out calibration/parameter state data each iteration. - """ \ No newline at end of file + Called after each model iteration is completed and evaluated. + And before the next iteration is configured and started. + Currently called at the end of an Adjustable's check_point function + which writes out calibration/parameter state data each iteration. + """ diff --git a/python/ngen_cal/src/ngen/cal/calibration_set.py b/python/ngen_cal/src/ngen/cal/calibration_set.py index 13a0a082..e690e86c 100644 --- a/python/ngen_cal/src/ngen/cal/calibration_set.py +++ b/python/ngen_cal/src/ngen/cal/calibration_set.py @@ -1,5 +1,6 @@ from __future__ import annotations +import pandas as pd from pandas import DataFrame# type: ignore from typing import TYPE_CHECKING, Sequence if TYPE_CHECKING: @@ -20,23 +21,24 @@ class CalibrationSet(Evaluatable): A HY_Features based catchment with additional calibration information/functionality """ - def __init__(self, adjustables: Sequence[Adjustable], eval_nexus: Nexus, hooks: ModelHooks, start_time: str, end_time: str, eval_params: EvaluationOptions): - """ - - """ + def __init__(self, adjustables: Sequence[Adjustable], eval_nexus: Nexus, hooks: ModelHooks, start_time: datetime, end_time: datetime, eval_params: EvaluationOptions): super().__init__(eval_params) self._eval_nexus = eval_nexus self._adjustables = adjustables # record the hooks needed for output and checkpointing self._hooks = hooks - #use the nwis location to get observation data - obs =self._eval_nexus._hydro_location.get_data(start_time, end_time) - #make sure data is hourly - self._observed = obs.set_index('value_time')['value'].resample('1h').nearest() - self._observed.rename('obs_flow', inplace=True) - #observations in ft^3/s convert to m^3/s - self._observed = self._observed * 0.028316847 + # TODO: derive this from realization config + simulation_interval: pd.Timedelta = pd.Timedelta(3600, unit="s") + obs = self._hooks.ngen_cal_model_observations( + nexus=self._eval_nexus, + start_time=start_time, + end_time=end_time, + simulation_interval=simulation_interval, + ) + obs.rename("obs_flow", inplace=True) + self._observed = obs + self._output = None self._eval_range = self.eval_params._eval_range diff --git a/python/ngen_cal/src/ngen/cal/ngen.py b/python/ngen_cal/src/ngen/cal/ngen.py index 661d92c9..27c7db13 100644 --- a/python/ngen_cal/src/ngen/cal/ngen.py +++ b/python/ngen_cal/src/ngen/cal/ngen.py @@ -24,7 +24,6 @@ from .parameter import Parameter, Parameters from .calibration_cathment import CalibrationCatchment, AdjustableCatchment from .calibration_set import CalibrationSet, UniformCalibrationSet -from .ngen_hooks.ngen_output import TrouteOutput #HyFeatures components from hypy.hydrolocation import NWISLocation from hypy.nexus import Nexus @@ -67,6 +66,7 @@ def _map_params_to_realization(params: Mapping[str, Parameters], realization: Re else: return _params_as_df(params, module.model_name) + class NgenBase(ModelExec): """ Data class specific for Ngen @@ -113,10 +113,10 @@ def __init__(self, **kwargs): #Let pydantic work its magic super().__init__(**kwargs) #now we work ours - # Register the default ngen output hook - self._plugin_manager.register(TrouteOutput(self.routing_output)) #Make a copy of the config file, just in case shutil.copy(self.realization, str(self.realization)+'_original') + + self._register_default_ngen_plugins() # Read the catchment hydrofabric data if self.hydrofabric is not None: @@ -132,6 +132,15 @@ def __init__(self, **kwargs): data = json.load(fp) self.ngen_realization = NgenRealization(**data) + def _register_default_ngen_plugins(self): + from .ngen_hooks.ngen_output import TrouteOutput + from .ngen_hooks.observations import UsgsObservations + + # t-route outputs + self._plugin_manager.register(TrouteOutput(self.routing_output)) + # observations + self._plugin_manager.register(UsgsObservations()) + @staticmethod def _is_legacy_gpkg_hydrofabric(hydrofabric: Path) -> bool: """Return True if legacy (<=v2.1) gpkg hydrofabric.""" diff --git a/python/ngen_cal/src/ngen/cal/ngen_hooks/observations.py b/python/ngen_cal/src/ngen/cal/ngen_hooks/observations.py new file mode 100644 index 00000000..a375f62c --- /dev/null +++ b/python/ngen_cal/src/ngen/cal/ngen_hooks/observations.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import typing + +import pandas as pd +from hypy.hydrolocation.nwis_location import NWISLocation + +from ngen.cal import hookimpl + +if typing.TYPE_CHECKING: + from datetime import datetime + + from hypy.nexus import Nexus + + +class UsgsObservations: + CFS_TO_CSM = 0.028316847 + """ft**3/s to m**3/s""" + + @hookimpl(trylast=True) + def ngen_cal_model_observations( + self, + nexus: Nexus, + start_time: datetime, + end_time: datetime, + simulation_interval: pd.Timedelta, + ) -> pd.Series: + # use the nwis location to get observation data + location = nexus._hydro_location + assert isinstance(location, NWISLocation), f"expected hypy.hydrolocation.NWISLocation instance, got {type(location)}. cannot retrieve observations" + + try: + df = location.get_data(start=start_time, end=end_time) + except BaseException as e: + raise RuntimeError(f"failed to retrieve observations for usgs gage: {location.station_id}") from e + + df.set_index("value_time", inplace=True) + ds = df["value"].resample(simulation_interval).nearest() + ds.rename("obs_flow", inplace=True) + + # convert from CFS to CMS observations + ds = ds * UsgsObservations.CFS_TO_CSM + return ds diff --git a/python/ngen_cal/tests/test_plugin_system.py b/python/ngen_cal/tests/test_plugin_system.py index f0383670..b76e9baf 100644 --- a/python/ngen_cal/tests/test_plugin_system.py +++ b/python/ngen_cal/tests/test_plugin_system.py @@ -1,16 +1,20 @@ from __future__ import annotations -from typing import TYPE_CHECKING from types import ModuleType +from typing import TYPE_CHECKING from ngen.cal import hookimpl from ngen.cal._plugin_system import setup_plugin_manager if TYPE_CHECKING: + from datetime import datetime + from pathlib import Path from typing import Callable + import pandas as pd + from hypy.nexus import Nexus + from ngen.cal.configuration import General - from pathlib import Path def test_setup_plugin_manager(): @@ -76,4 +80,14 @@ def ngen_cal_model_output(self) -> None: @hookimpl def ngen_cal_model_post_iteration(self, path: Path, iteration: int) -> None: - """Test model post iteration""" \ No newline at end of file + """Test model post iteration""" + + @hookimpl + def ngen_cal_model_observations( + self, + nexus: Nexus, + start_time: datetime, + end_time: datetime, + simulation_interval: pd.Timedelta, + ) -> pd.Series: + """Test observation plugin"""