Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move to using composite config dataset in requested jobs #394

Merged
merged 25 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ea0812c
Stub work for deriving composite config data.
robertbartel Jun 13, 2023
ee413fc
Add missing attributes to NGENRequestBody.
robertbartel Jul 12, 2023
a8dabb9
Add several concrete InitialDataAdder subtypes.
robertbartel Jul 13, 2023
11b36c5
Move data derive util to initial data adders.
robertbartel Jul 13, 2023
3b2b2b8
Removing deprecated functions of DataDeriveUtil.
robertbartel Jul 13, 2023
e20f090
Fix ngen-config, add ngen-cal dataservice deps.
robertbartel Jul 13, 2023
6943014
Adjust dep defs using NOAA-OWP in git url.
robertbartel Jul 13, 2023
6e64dfb
Add ngen-cal dep to project requirements.txt.
robertbartel Jul 13, 2023
f85e9fc
Add source dataset data_id as index of composite.
robertbartel Jul 17, 2023
6624058
Update NGENRequestBody for composite datasets.
robertbartel Jul 17, 2023
51a0874
Add NGENRequestBody.composite_config_source_ids.
robertbartel Jul 17, 2023
3f34cb6
Update AbstractNgenRequest for composite datasets.
robertbartel Jul 17, 2023
78af07b
Add specified forcings support in ngen request.
robertbartel Jul 17, 2023
049aa2b
Fix AbstractNgenRequest property impl errors.
robertbartel Jul 17, 2023
c1bb497
Update calibration request body for composite cfg.
robertbartel Jul 17, 2023
4992b6d
Update ngen-cal request for composite cfg support.
robertbartel Jul 17, 2023
efca425
Bump version of dmod.core to 0.10.0.
robertbartel Jul 17, 2023
f7998ae
Bump dmod.communication dep on dmod.core.
robertbartel Jul 17, 2023
fc6f5b9
Add unit tests for NgenCalRequestEstimationConfig.
robertbartel Jul 17, 2023
d866682
Add unit tests for NgenCalibrationRequest.
robertbartel Jul 17, 2023
df873c4
Update test_ngen_request.py for changes to type.
robertbartel Jul 17, 2023
ef30d0e
Update dmod.scheduler tests that depend on comms.
robertbartel Jul 17, 2023
400dc91
Bump dmod.communication version to 0.14.0.
robertbartel Jul 17, 2023
11e8114
Update core and comms deps for dataservice.
robertbartel Jul 17, 2023
92a127a
Bump dmod.dataservice version to 0.8.0.
robertbartel Jul 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/lib/communication/dmod/communication/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.13.0'
__version__ = '0.14.0'
Original file line number Diff line number Diff line change
Expand Up @@ -26,32 +26,21 @@ class AbstractNgenRequest(DmodJobRequest, ABC):

- execution time range
- hydrofabric UID, dataset id, and ::class:`DataRequirement`
- primary config dataset id (i.e., the realization config) and ::class:`DataRequirement`
- BMI init configs dataset id and ::class:`DataRequirement`
- forcing ::class:`DataRequirement`
- config files dataset id (uses ::class:`DataFormat.NGEN_JOB_COMPOSITE_CONFIG`) and ::class:`DataRequirement`
- list of each output dataset's ::class:`DataFormat`
- forcing ::class:`DataRequirement` and (optionally) a forcing dataset id
- (Optional) partitioning config dataset id and ::class:`DataRequirement`
- (Optional) list of catchments

This type provides the implementation for ::method:`factory_init_from_deserialized_json` for all subtypes. This
works by having each level of the class hierarchy be responsible for deserialization applicable to it, as described
below.

Instead of implementing full deserialization, this type and subtypes include a function to deserialize from JSON the
type-specific keyword parameters passed to the individual type's ::method:`__init__`. This is the
::method:`deserialize_for_init` class method. Subclass implementations should ensure they call superclass's version
and build on the returned dict of deserialized keyword params from ancestor levels.

This abstract type also implements a version of ::method:`to_dict` for serializing all the state included at this
level.
It is possible for the ``NGEN_JOB_COMPOSITE_CONFIG`` dataset to not already exist, in which case an instance of this
type also implies a request to derive such a dataset.
"""

request_body: NGENRequestBody

_hydrofabric_data_requirement = PrivateAttr(None)
_forcing_data_requirement = PrivateAttr(None)
_realization_cfg_data_requirement = PrivateAttr(None)
_bmi_cfg_data_requirement = PrivateAttr(None)
_composite_config_data_requirement = PrivateAttr(None)
_partition_cfg_data_requirement = PrivateAttr(None)

class Config:
Expand Down Expand Up @@ -103,43 +92,61 @@ def bmi_config_data_id(self) -> str:
return self.request_body.bmi_config_data_id

@property
def bmi_cfg_data_requirement(self) -> DataRequirement:
def catchments(self) -> Optional[List[str]]:
"""
A requirement object defining of the BMI configuration data needed to execute this request.
An optional list of catchment ids for those catchments in the request ngen execution.

No list implies "all" known catchments.
aaraney marked this conversation as resolved.
Show resolved Hide resolved

Returns
-------
DataRequirement
A requirement object defining of the BMI configuration data needed to execute this request.
Optional[List[str]]
An optional list of catchment ids for those catchments in the request ngen execution.
"""
if self._bmi_cfg_data_requirement is None:
bmi_config_restrict = [
DiscreteRestriction(
variable="data_id", values=[self.bmi_config_data_id]
)
]
bmi_config_domain = DataDomain(
data_format=DataFormat.BMI_CONFIG,
discrete_restrictions=bmi_config_restrict,
)
self._bmi_cfg_data_requirement = DataRequirement(
domain=bmi_config_domain, is_input=True, category=DataCategory.CONFIG
)
return self._bmi_cfg_data_requirement
return self.request_body.catchments

@property
def catchments(self) -> Optional[List[str]]:
def composite_config_data_id(self) -> str:
"""
An optional list of catchment ids for those catchments in the request ngen execution.
Index value of ``data_id`` to uniquely identify the applicable composite config dataset for the requested job.

No list implies "all" known catchments.
Returns
-------
str
Index value of ``data_id`` of the applicable composite config dataset for the requested job.
"""
return self.request_body.composite_config_data_id

@property
def composite_config_data_requirement(self) -> DataRequirement:
"""
A requirement object defining of the composite configuration data needed to execute this request.

Returns
-------
Optional[List[str]]
An optional list of catchment ids for those catchments in the request ngen execution.
DataRequirement
A requirement object defining of the composite configuration data needed to execute this request.
"""
return self.request_body.catchments
if self._composite_config_data_requirement is None:
cont_restricts = [self.time_range]

disc_restricts = [
DiscreteRestriction(variable="HYDROFABRIC_ID", values=[self.request_body.hydrofabric_uid]),
self._gen_catchments_domain_restriction(),
aaraney marked this conversation as resolved.
Show resolved Hide resolved
DiscreteRestriction(variable="DATA_ID", values=[self.composite_config_data_id])
]

# Add a restriction including source dataset ids as well, if there are any
src_ds_ids = self.request_body.composite_config_source_ids
if len(src_ds_ids) > 0:
disc_restricts.append(DiscreteRestriction(variable="COMPOSITE_SOURCE_ID", values=src_ds_ids))

composite_config_domain = DataDomain(data_format=DataFormat.NGEN_JOB_COMPOSITE_CONFIG,
continuous_restrictions=cont_restricts,
discrete_restrictions=disc_restricts)
self._composite_config_data_requirement = DataRequirement(domain=composite_config_domain, is_input=True,
category=DataCategory.CONFIG)
return self._composite_config_data_requirement

@property
def data_requirements(self) -> List[DataRequirement]:
Expand All @@ -152,10 +159,9 @@ def data_requirements(self) -> List[DataRequirement]:
List of all the explicit and implied data requirements for this request.
"""
requirements = [
self.bmi_cfg_data_requirement,
self.forcing_data_requirement,
self.hydrofabric_data_requirement,
self.realization_cfg_data_requirement,
self.composite_config_data_requirement,
]
if self.use_parallel_ngen:
requirements.append(self.partition_cfg_data_requirement)
Expand All @@ -180,9 +186,21 @@ def formulation_configs(self) -> Optional[PartialRealizationConfig]:
Optional[PartialRealizationConfig]
The user-supplied pieces of the ngen realization config to use for this request.
"""
return self.request_body.formulation_configs
return self.request_body.partial_realization_config

@property
def forcings_data_id(self) -> Optional[str]:
"""
Unique ``data_id`` of requested forcings dataset, if one was specified by the user.

Returns
-------
Optional[str]
``data_id`` of the requested forcings dataset, or ``None`` if any forcings dataset may be used that
aaraney marked this conversation as resolved.
Show resolved Hide resolved
otherwise satisfies the requirements of the request.
"""
return self.request_body.forcings_data_id

# TODO: #needs_issue - Account for option when forcing dataset is explicitly provided
@property
def forcing_data_requirement(self) -> DataRequirement:
"""
Expand All @@ -194,11 +212,15 @@ def forcing_data_requirement(self) -> DataRequirement:
A requirement object defining of the forcing data needed to execute this request.
"""
if self._forcing_data_requirement is None:
discrete_restricts = [self._gen_catchments_domain_restriction()]
if self.forcings_data_id is not None:
discrete_restricts.append(DiscreteRestriction(variable="DATA_ID", values=[self.forcings_data_id]))

# TODO: going to need to address the CSV usage later
forcing_domain = DataDomain(
data_format=DataFormat.AORC_CSV,
continuous_restrictions=[self.time_range],
discrete_restrictions=[self._gen_catchments_domain_restriction()],
discrete_restrictions=discrete_restricts,
)
self._forcing_data_requirement = DataRequirement(
domain=forcing_domain, is_input=True, category=DataCategory.FORCING
Expand Down Expand Up @@ -309,14 +331,14 @@ def partition_cfg_data_id(self) -> Optional[str]:
return self.request_body.partition_cfg_data_id

@property
def partition_cfg_data_requirement(self) -> DataRequirement:
def partition_cfg_data_requirement(self) -> Optional[DataRequirement]:
"""
A requirement object defining of the partitioning configuration data needed to execute this request.
A requirement object defining the partitioning configuration data needed to execute this request.

Returns
-------
DataRequirement
A requirement object defining of the partitioning configuration data needed to execute this request.
Optional[DataRequirement]
Optional requirement object defining the partitioning configuration data needed to execute this request.
"""
if self._partition_cfg_data_requirement is None and self.use_parallel_ngen:
d_restricts = []
Expand Down Expand Up @@ -366,31 +388,20 @@ def realization_config_data_id(self) -> str:
return self.request_body.realization_config_data_id

@property
def realization_cfg_data_requirement(self) -> DataRequirement:
def t_route_config_data_id(self) -> Optional[str]:
"""
A requirement object defining of the realization configuration data needed to execute this request.
The index value of ``data_id`` to uniquely identify sets of t-route config data that are otherwise similar.

For example, two t-route configs may apply to the same time and catchments, but be very different. The
nature of the differences is not necessarily even possible to define generally, and certainly not through
(pre-existing) indices. As such, the `data_id` index is added for such differentiating purposes.

Returns
-------
DataRequirement
A requirement object defining of the realization configuration data needed to execute this request.
str
The index value of ``data_id`` to uniquely identify the required t-route config dataset.
"""
if self._realization_cfg_data_requirement is None:
real_cfg_dis_restrict = [
self._gen_catchments_domain_restriction(),
DiscreteRestriction(
variable="data_id", values=[self.realization_config_data_id]
),
]
real_cfg_domain = DataDomain(
data_format=DataFormat.NGEN_REALIZATION_CONFIG,
continuous_restrictions=[self.time_range],
discrete_restrictions=real_cfg_dis_restrict,
)
self._realization_cfg_data_requirement = DataRequirement(
domain=real_cfg_domain, is_input=True, category=DataCategory.CONFIG
)
return self._realization_cfg_data_requirement
return self.request_body.t_route_config_data_id

@property
def time_range(self) -> TimeRange:
Expand Down Expand Up @@ -460,16 +471,4 @@ class ExternalAbstractNgenRequest(ExternalRequest, AbstractNgenRequest, ABC):
def __eq__(self, other):
return super().__eq__(other) and self.session_secret == other.session_secret

def __hash__(self):
hash_str = "{}-{}-{}-{}-{}-{}-{}-{}-{}".format(
self.time_range.to_json(),
self.hydrofabric_data_id,
self.hydrofabric_uid,
self.realization_config_data_id,
self.bmi_config_data_id,
self.session_secret,
self.cpu_count,
self.partition_cfg_data_id,
",".join(self.catchments),
)
return hash(hash_str)

Original file line number Diff line number Diff line change
@@ -1,19 +1,106 @@
from pydantic import Field
from typing import Any, Dict, List
from pydantic import Field, validator
from typing import Any, Dict, List, Optional
from .ngen_exec_request_body import NGENRequestBody


# TODO: (later) consider adding different type for when receiving data_id of existing ngen_cal config dataset

class NgenCalRequestEstimationConfig(NGENRequestBody):
"""
Request body encapsulating data within outer calibration request.

Encapsulated data to define a requested ngen-cal calibration job. It includes details on the time range,
hydrofabric, ngen configurations, and calibration configurations need for performing an ngen-cal calibration. It may
also include a reference to what forcing data to use.

An instance contains a reference to the ::class:`DataFormat.NGEN_JOB_COMPOSITE_CONFIG` dataset containing
configurations for the requested job. In cases when this dataset doesn't yet exist, an instance also contains the
necessary details for generating such a dataset. In particular, this includes:

- a realization config dataset id **OR** a ::class:`PartialRealizationConfig`
- a calibration config dataset **OR** calibration config parameters directly
- (optionally) a BMI init config dataset id
- a t-route configuration dataset id

When dataset ids are given, these are treated as sources for the new ::class:`DataFormat.NGEN_JOB_COMPOSITE_CONFIG`,
with the contents of the former copied into the latter as appropriate.
"""

ngen_cal_config_data_id: Optional[str] = Field(None, description="Id of existing source ngen_cal config dataset.")
parallel_proc: int = Field(1, gt=0, description="Number of parallel ngen processes for ngen-cal to use.")
# TODO: #needs_issue - Add validator for supported values
algorithm: str = Field("dds", description="The calibration optimization algorithm.")
objective_function: str = Field(description="The calibration objective function.")
iterations: int = Field(gt=0, description="The number of ngen iterations to run through during calibration.")
objective_function: str = Field(None, description="The calibration objective function.")
iterations: int = Field(None, gt=0, description="The number of ngen iterations to run through during calibration.")
# TODO: #needs_issue - Add validator to ensure this isn't larger than the total number of iterations
start_iteration: int = Field(1, gt=0, description="The starting iteration, which is greater than 1 for restarts.")
# TODO: #needs_issue - Add validator for supported values (independent, uniform, explicit)
# TODO: #needs_issue - Add validator for adjusting case when needed
model_strategy: str = Field(description="The particular ngen calibration strategy to use.")
model_params: Dict[str, List[Dict[str, Any]]]
ngen_cal_config_data_id: str
model_strategy: str = Field(None, description="The particular ngen calibration strategy to use.")
model_params: Dict[str, List[Dict[str, Any]]] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should model_strategy exist on this class if this class is only for estimation?

  strategy: 
      # Type of strategey, currently supported is estimation
      type: estimation

https://github.com/NOAA-OWP/ngen-cal/blob/8d46c053b4a6d553d0b6e2c4ac539b5c092a2a63/python/example_config.yaml#L12

disregard. I mistook this field for another field named strategy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few fields on the class that I think can be improved. I think that is outside of the scope of this PR though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created #402 to track improving this class.


@validator("ngen_cal_config_data_id", pre=True)
def _validate_ngen_cal_config_data_id(cls, value, field):
"""
Validate that, if ``ngen_cal_config_data_id`` is provided and not ``None``, it is a non-empty string.

Parameters
----------
value
field

Returns
-------
Optional[str]
The value, if valid.
"""
if value is not None and len(value.strip()) == 0:
raise ValueError(f"{field.name} must either be None or non-empty string")
return value

@validator("objective_function", "iterations", "model_strategy", pre=True, always=True)
def _validate_objective_function(cls, value, values, field):
"""
Validate that ``objective_function`` is set correctly unless ``ngen_cal_config_data_id`` is provided.

Parameters
----------
value
field

Returns
-------
Optional[str]
The value, if valid.
"""
if values.get("ngen_cal_config_data_id") is None and value is None:
raise ValueError(f"{field.name} must be set unless ngen_cal config dataset id is provided.")
return value

@property
def composite_config_source_ids(self) -> List[str]:
"""
A list of the data ids for any datasets that are sources of data for a generated composite config dataset.

An instance may know dataset ids of existing datasets from which a new composite config dataset should be
derived. For the type, this potentially includes datasets for a realization config, BMI init configs, and
t-route configs. For this subtype, that is extended further to also include ngen-cal config datasets.

Any such datasets are referenced in certain attributes for the instance; e.g., ::attribute:`bmi_config_data_id`.
This property encapsulates collecting the applicable attribute values while filtering out any that are not set.

Note that an empty list does not (by itself) imply the composite config dataset is expected to exist, as it is
possible for the dataset to be created from a ::class:`PartialRealizationConfig`, auto-generated BMI init
configs, and the instance's explicit calibration configuration attributes.

Returns
-------
List[str]
List of the data ids for any datasets that are sources of data for a generated composite config dataset.
"""
result = super().composite_config_source_ids

if self.ngen_cal_config_data_id is not None:
result.append(self.ngen_cal_config_data_id)

return result
Loading
Loading