From ea0812ca35ce5a8becf2367b0a2ee89f9a53e0f2 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Tue, 13 Jun 2023 11:39:53 -0400 Subject: [PATCH 01/25] Stub work for deriving composite config data. Updating DataDeriveUtil.derive_datasets with conditional logic to branch execution and derive composite configuration datasets when appropriate (and fix the return for this function); also, adding stub function for actually implementing the deriving of aforementioned composite configuration datasets. --- .../dmod/dataservice/data_derive_util.py | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/python/services/dataservice/dmod/dataservice/data_derive_util.py b/python/services/dataservice/dmod/dataservice/data_derive_util.py index 96ac4b992..cd02b798d 100644 --- a/python/services/dataservice/dmod/dataservice/data_derive_util.py +++ b/python/services/dataservice/dmod/dataservice/data_derive_util.py @@ -113,6 +113,24 @@ def _build_ngen_realization_config_from_request(self, request: AbstractNgenReque return NgenRealization(**params) + async def _derive_composite_job_config(self, requirement: DataRequirement, job: Job): + """ + Derive and link a ``DataFormat.NGEN_JOB_COMPOSITE_CONFIG`` dataset to fulfill the given job's given requirement. + + Derive a new composite config format dataset in order to fulfill the given requirement. Then, update the + requirement to note that it is fulfilled by the new dataset. + + Parameters + ---------- + requirement : DataRequirement + The requirement needing a dataset to be created in order to be fulfilled. + job : Job + The job "owning" the relevant requirement. + """ + # TODO: ********* implement ********* + msg = "{}._derive_composite_job_config still must be implemented".format(self.__class__.__name__) + raise NotImplementedError(msg) + def _derive_realization_config_from_formulations(self, requirement: DataRequirement, job: Job): """ Derive a new realization config dataset for this requirement from the formulations within the job. @@ -438,8 +456,12 @@ async def derive_datasets(self, job: Job) -> List[DataRequirement]: if req.category == DataCategory.CONFIG and req.domain.data_format == DataFormat.NGEN_REALIZATION_CONFIG: self._derive_realization_config_from_formulations(requirement=req, job=job) results.append(req) + # Derive composite dataset with all config details need for executing job + elif req.category == DataCategory.CONFIG and req.domain.data_format == DataFormat.NGEN_JOB_COMPOSITE_CONFIG: + await self._derive_composite_job_config(requirement=req, job=job) + results.append(req) # The above are the only supported derivations, so blow up here if there was something else else: msg_template = "Unsupported requirement dataset derivation for job {} (requirement: {})" raise DmodRuntimeError(msg_template.format(job.job_id, str(req))) - return results \ No newline at end of file + return results From ee413fc9f40f151dcc0392e15fd8f25bd0507748 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 12 Jul 2023 08:58:25 -0400 Subject: [PATCH 02/25] Add missing attributes to NGENRequestBody. Adding missing attributes for an optional PartialRealizationConfig (when supplied by the user) and t-route config dataset id (when requested job should include routing). --- .../maas_request/ngen/ngen_exec_request_body.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/python/lib/communication/dmod/communication/maas_request/ngen/ngen_exec_request_body.py b/python/lib/communication/dmod/communication/maas_request/ngen/ngen_exec_request_body.py index d2b73bedb..7ac05495d 100644 --- a/python/lib/communication/dmod/communication/maas_request/ngen/ngen_exec_request_body.py +++ b/python/lib/communication/dmod/communication/maas_request/ngen/ngen_exec_request_body.py @@ -1,5 +1,7 @@ from pydantic import Field, validator +from .partial_realization_config import PartialRealizationConfig + from dmod.core.meta_data import TimeRange from dmod.core.serializable import Serializable @@ -16,7 +18,10 @@ class NGENRequestBody(Serializable): bmi_config_data_id: str # NOTE: consider pydantic.conlist to constrain this type rather than using validators catchments: Optional[List[str]] + partial_realization_config: Optional[PartialRealizationConfig] = Field( + default=None, description="Partial realization config, when supplied by user.") partition_cfg_data_id: Optional[str] + t_route_config_data_id: Optional[str] = Field(None, description="Id of composite source of t-route config.") @validator("catchments") def validate_deduplicate_and_sort_catchments( @@ -35,7 +40,8 @@ class Config: def dict(self, **kwargs) -> dict: # if exclude is set, ignore this _get_exclude_fields() - only_if_set = ("catchments", "partition_cfg_data_id", "forcings_data_id") + only_if_set = ("catchments", "partition_cfg_data_id", "forcings_data_id", "partial_realization_config", + "t_route_config_data_id") if kwargs.get("exclude", False) is False: kwargs["exclude"] = {f for f in only_if_set if not self.__getattribute__(f)} return super().dict(**kwargs) From a8dabb9f2df96ddb6dbb730e3cb8a4bbeadcb8cc Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 13 Jul 2023 14:41:17 -0400 Subject: [PATCH 03/25] Add several concrete InitialDataAdder subtypes. Adding FromFilesInitialDataAdder, FromRawInitialDataAdder, FromPartialRealizationConfigAdder, and CompositeConfigDataAdder classes within dmod.dataservice package. --- .../dataservice/initial_data_adder_impl.py | 480 ++++++++++++++++++ 1 file changed, 480 insertions(+) create mode 100644 python/services/dataservice/dmod/dataservice/initial_data_adder_impl.py diff --git a/python/services/dataservice/dmod/dataservice/initial_data_adder_impl.py b/python/services/dataservice/dmod/dataservice/initial_data_adder_impl.py new file mode 100644 index 000000000..73a9b8168 --- /dev/null +++ b/python/services/dataservice/dmod/dataservice/initial_data_adder_impl.py @@ -0,0 +1,480 @@ +import json +from dmod.communication import AbstractNgenRequest, NgenCalibrationRequest +from dmod.communication.maas_request.ngen.partial_realization_config import PartialRealizationConfig +from dmod.core.dataset import Dataset, DatasetManager, DatasetType, InitialDataAdder +from dmod.core.exception import DmodRuntimeError +from dmod.core.meta_data import DataCategory, DataFormat, DataRequirement, StandardDatasetIndex +from dmod.scheduler.job import Job +from ngen.config.configurations import Forcing, Time +from ngen.config.realization import NgenRealization, Realization +from pathlib import Path +from typing import Dict, List, Optional, Union + + +class FromFilesInitialDataAdder(InitialDataAdder): + """ + Implementation that sources data from a file or directory path received during initialization. + + A simple implementation that adds data directly sourced from either a single file or all files directly within a + supplied directory. + """ + + def __init__(self, source_path: Union[str, Path], *args, **kwargs): + super().__init__(*args, **kwargs) + self._source_path: Path = source_path if isinstance(source_path, Path) else Path(source_path) + if not self._source_path.exists(): + msg = f"Can't initialize {self.__class__.__name__}: given source path '{str(source_path)}' doesn't exist" + raise DmodRuntimeError(msg) + + def add_initial_data(self): + """ + Assemble and add the initial data. + + If adding data from a file, the data item will be named after the basename of the file. + + If adding data from a directory, this top directory will be treated as an "add root." The directory will then + be traversed, adding all non-directory files to the dataset as data items, and recursively descending into + subdirectories. Each added data item will be named based on the path of its source file relative to the original + "add root." + + Raises + ------- + DmodRuntimeError + Raised when initial data could not be assembled and/or added successfully to the dataset. + """ + # TODO: later, look at options for threading + def _add(path: Path, dest: Optional[str] = None, add_root: Optional[Path] = None): + if path.is_dir(): + for file_name in path.iterdir(): + _add(path=file_name, add_root=path if add_root is None else add_root) + return + + if not dest and not add_root: + dest = path.name + elif not dest: + dest = path.relative_to(add_root) + + if not self._dataset_manager.add_data(dataset_name=self._dataset_name, dest=dest, data=path.read_bytes()): + msg = f"{self.__class__.__name__} failed to add item {dest} under {str(self._source_path)}" + raise DmodRuntimeError(msg) + + try: + _add(path=self._source_path) + except DmodRuntimeError as e: + raise e + except Exception as e: + raise DmodRuntimeError(f"{self.__class__.__name__} failed due to {e.__class__.__name__}: {str(e)}") + + +class FromRawInitialDataAdder(InitialDataAdder): + """ + Very simple implementation that receives raw data at initialization that it needs to add. + + A near trivial subtype that simply receives a dictionary object whose values are the raw ``bytes`` that should be + added to individual data items. The ``str`` keys are the names of these data items. + """ + + def __init__(self, data_items: Dict[str, bytes], *args, **kwargs): + super().__init__(*args, **kwargs) + self._data_items: Dict[str, bytes] = data_items + + def add_initial_data(self): + """ + Assemble and add the initial data. + + Initial data is provided in the form of a dictionary, keyed by item name, with values that are raw ``bytes`` to + add. Function simply iterates through the dictionary, adding items one by one, but stopping immediately and + raising a ::class:`DmodRuntimeError` if any item fails. + + Raises + ------- + DmodRuntimeError + Raised when initial data could not be assembled and/or added successfully to the dataset. + """ + try: + for item, data in self._data_items.items(): + if not self._dataset_manager.add_data(dataset_name=self._dataset_name, dest=item, data=data): + raise DmodRuntimeError(f"{self.__class__.__name__} failed to add initial data item {item}") + except DmodRuntimeError as e: + raise e + except Exception as e: + msg = f"{self.__class__.__name__} failed due to {e.__class__.__name__}: {str(e)}" + raise DmodRuntimeError(msg) + + +class FromPartialRealizationConfigAdder(InitialDataAdder): + """ + Subtype that adds a realization config derived from a ::class:`PartialRealizationConfig` to a newly created dataset. + """ + + # TODO: centrally define this somewhere else + _REAL_CONFIG_FILE_NAME = 'realization_config.json' + + def __init__(self, job: Job, all_dataset_managers: Dict[DatasetType, DatasetManager], *args, **kwargs): + """ + + Parameters + ---------- + job : Job + The job requiring a realization config dataset, which must be an ngen-related job. + all_dataset_managers : Dict[DatasetType, DatasetManager] + Dictionary of all dataset managers currently available to the data service, keyed by dataset type. + args + kwargs + """ + super().__init__(*args, **kwargs) + self._job: Job = job + request = self._job.model_request + if isinstance(request, AbstractNgenRequest): + self._job_request: AbstractNgenRequest = request + else: + raise ValueError("Can't do {} for job with {}".format(self.__class__.__name__, request.__class__.__name__)) + self._all_dataset_managers: Dict[DatasetType, DatasetManager] = all_dataset_managers + + def _build_forcing_config_for_realization(self) -> Forcing: + """ + Build a ::class:`Forcing` config object from to satisfy requirements of this request. + + Function builds a ::class:`Forcing` config object as a part of the steps to create a ngen realization config + for the given request. It is typically expected that the provided request will include a partial realization + config object that includes certain details. + + Returns + ------- + Forcing + Forcing config object to be used in building a ngen realization config to satisfy this request. + """ + forcing_cfg_params = dict() + + # Get the correct forcing dataset from associated requirement + # TODO: double check that this is being added when we do data checks + forcing_req = [r for r in self._job_request.data_requirements if r.category == DataCategory.FORCING][0] + forcing_dataset_name = forcing_req.fulfilled_by + manager = [m for _, m in self._all_dataset_managers.items() if forcing_dataset_name in m.datasets][0] + forcing_dataset = manager.datasets[forcing_dataset_name] + + # Figure out the correct provider type from the dataset format + # TODO: this may not be the right way to do this to instantiate the object directly (i.e., not through JSON) + if forcing_dataset.data_format == DataFormat.NETCDF_FORCING_CANONICAL: + forcing_cfg_params['provider'] = 'NetCDF' + elif forcing_dataset.data_format == DataFormat.AORC_CSV: + forcing_cfg_params['provider'] = 'CsvPerFeature' + + # TODO: (#needs_issue) introduce logic to examine forcing dataset and intelligently assess what the file + # name(s)/pattern(s) should be if they aren't explicitly provided + + if self.partial_realization_config is not None and self.partial_realization_config.forcing_file_pattern: + forcing_cfg_params['file_pattern'] = self.partial_realization_config.forcing_file_pattern + + # Finally, produce the right path + # TODO: these come from scheduler.py; may need to centralize somehow + forcing_cfg_params['path'] = '/dmod/datasets/' + if self.partial_realization_config is not None and self.partial_realization_config.is_env_workaround: + forcing_cfg_params['path'] += 'from_env' + else: + forcing_cfg_params['path'] += '{}/{}/'.format(DataCategory.FORCING.name.lower(), forcing_dataset_name) + + if self.partial_realization_config is not None and self.partial_realization_config.forcing_file_name: + forcing_cfg_params['path'] += self.partial_realization_config.forcing_file_name + + return Forcing(**forcing_cfg_params) + + def build_realization_config_from_partial(self) -> NgenRealization: + """ + Build a ngen realization config object from current service state and partial config within the job request. + + Returns + ------- + NgenRealization + The built realization config. + """ + params = dict() + + if self.partial_realization_config.global_formulations is not None: + params['global_config'] = Realization(formulations=self.partial_realization_config.global_formulations, + forcing=self._build_forcing_config_for_realization()) + + params['time'] = Time(start_time=self._job_request.time_range.begin, end_time=self._job_request.time_range.end) + + if self.partial_realization_config.routing_config is not None: + params['routing'] = self.partial_realization_config.routing_config + + if self.partial_realization_config.catchment_formulations is not None: + params['catchments'] = self.partial_realization_config.catchment_formulations + + return NgenRealization(**params) + + def add_initial_data(self): + """ + Assemble and add the initial data. + + Raises + ------- + DmodRuntimeError + Raised when initial data could not be assembled and/or added successfully to the dataset. + """ + if self.partial_realization_config is not None: + raise DmodRuntimeError(f"{self.__class__.__name__} can't have 'None' for partial realization property") + + try: + real_config: NgenRealization = self.build_realization_config_from_partial() + if not self._dataset_manager.add_data(dataset_name=self._dataset_name, dest=self._REAL_CONFIG_FILE_NAME, + data=json.dumps(real_config.json()).encode()): + raise DmodRuntimeError(f"{self.__class__.__name__} failed to add realization config item") + except DmodRuntimeError as e: + raise e + except Exception as e: + msg = f"{self.__class__.__name__} failed due to {e.__class__.__name__}: {str(e)}" + raise DmodRuntimeError(msg) + + @property + def partial_realization_config(self) -> Optional[PartialRealizationConfig]: + """ + The ::class:`PartialRealizationConfig` included with the original job request, if present. + + Returns + ------- + Optional[PartialRealizationConfig] + The ::class:`PartialRealizationConfig` included with the original job request, if present. + """ + return self._job_request.formulation_configs + + +class CompositeConfigDataAdder(FromPartialRealizationConfigAdder): + """ + An ::class:`InitialDataAdder` subtype for a dataset of the ``NGEN_JOB_COMPOSITE_CONFIG`` ::class:`DataFormat`. + + An instance expects the received ::class:`DataRequirement` passed during initialization to have a domain that + includes `data_id` values of any/all datasets from which initial data is obtained or derived, per the composite + config data format. However, data can be supplied via other means; e.g., a ::class:`PartialRealizationConfig` + embedded within the originating job request. + """ + + def __init__(self, requirement: DataRequirement, hydrofabric_id: str, *args, **kwargs): + """ + + Parameters + ---------- + requirement : DataRequirement + The requirement needing (i.e. to be fulfilled by) a composite config dataset to be created + hydrofabric_id : str + The hydrofabric id of the hydrofabric used by this job. + args + kwargs + """ + super().__init__(*args, **kwargs) + self._requirement: DataRequirement = requirement + self._hydrofabric_id: str = hydrofabric_id + + self._source_datasets = None + + def _add_bmi_init_config_data(self): + """ + Acquired and add initial BMI init config data to be added to dataset. + + Raises + ------- + DmodRuntimeError + Raised if not all BMI init config data items could be added successfully. + """ + # Determine if we have a BMI init config dataset already or must generate configs using tools and the hydrofabric + bmi_src_ds_list = [d for n, d in self.source_datasets.items() if d.data_format == DataFormat.BMI_CONFIG] + # If there were source BMI config datasets, copy things from them + if len(bmi_src_ds_list) > 0: + # TODO: add threading support + for ds in bmi_src_ds_list: + if not self.copy_items(item_names=ds.manager.list_files(dataset_name=ds.name), other_dataset=ds): + raise DmodRuntimeError(f"{self.__class__.__name__} could not copy BMI configs from {ds.name}") + + # TODO: implement a proper check, based on request, of whether any config generation is appropriate + should_generate_bmi_configs = len(bmi_src_ds_list) == 0 + + if should_generate_bmi_configs: + # TODO: support for this needs to be added later + raise NotImplementedError(f"{self.__class__.__name__} doesn't yet support BMI init config auto generation") + + def _add_ngen_cal_config_data(self): + """ + Acquired and add initial ngen-cal config data to be added to dataset. + + Raises + ------- + DmodRuntimeError + Raised if ngen-cal config data could not be added successfully. + """ + src_ds_list = [d for n, d in self.source_datasets.items() if d.data_format == DataFormat.NGEN_CAL_CONFIG] + if len(src_ds_list) == 1: + src_ds = src_ds_list[0] + if not self.copy_items(item_names=src_ds.manager.list_files(src_ds.name), other_dataset=src_ds): + raise DmodRuntimeError(f"{self.__class__.__name__} could not copy ngen-cal config from {src_ds.name}") + elif len(src_ds_list) > 1: + raise DmodRuntimeError(f"{self.__class__.__name__} can't copy initial ngen-cal data from multiple sources") + else: + # TODO: implement properly once we can actually generate t-route configs + raise NotImplementedError(f"{self.__class__.__name__} doesn't yet support ngen-cal config auto generation") + + def _add_realization_config_data(self): + """ + Acquire initial realization config data and add to the dataset. + + Function branches depending on whether the originating request for the involved job contained a partial config + containing base formulation configs. If that is not the case, then we expect ::attribute:`_requirement` to + have a domain that includes `data_id` values of all source datasets, per the composite config data format. + + Raises + ------- + DmodRuntimeError + Raised if realization config data could not be added successfully. + """ + # TODO: add optimizations (perhaps in subtype) + # TODO: add threading support + + # TODO: centrally define this, and probably somewhere else + real_cfg_file_name = 'realization_config.json' + + # Branch based on if we have realization config dataset already or are building from formulations + if self.partial_realization_config is not None: + # In this case, we need to derive a dataset from formulations + real_config: NgenRealization = self.build_realization_config_from_partial() + if not self._dataset_manager.add_data(dataset_name=self._dataset_name, dest=real_cfg_file_name, + data=json.dumps(real_config.json()).encode()): + raise DmodRuntimeError(f"{self.__class__.__name__} could not add built realization config") + else: + # In this case, we need to find the right existing realization config dataset and get data from it + names = [n for n, d in self.source_datasets.items() if d.data_format == DataFormat.NGEN_REALIZATION_CONFIG] + real_ds_name = names[0] if len(names) > 0 else None + # Sanity check that we found an actual name + if real_ds_name is None: + raise DmodRuntimeError("Couldn't find source realization config in {}".format(self.__class__.__name__)) + # If we did, copy the realization config file to the new dataset + if not self.copy_item(item_name=real_cfg_file_name, other_dataset=self.source_datasets[real_ds_name]): + raise DmodRuntimeError(f"{self.__class__.__name__} failed copy realization config from {real_ds_name}") + + def _add_troute_config_data(self): + """ + Acquired and add initial t-route routing config data to be added to dataset. + + Raises + ------- + DmodRuntimeError + Raised if t-route config data could not be added successfully. + """ + src_ds_list = [d for n, d in self.source_datasets.items() if d.data_format == DataFormat.T_ROUTE_CONFIG] + if len(src_ds_list) == 1: + src_ds = src_ds_list[0] + if not self.copy_items(item_names=src_ds.manager.list_files(src_ds.name), other_dataset=src_ds): + raise DmodRuntimeError(f"{self.__class__.__name__} could not copy t-route config from {src_ds.name}") + elif len(src_ds_list) > 1: + raise DmodRuntimeError(f"{self.__class__.__name__} can't copy initial t-route data from multiple sources") + else: + # TODO: implement properly once we can actually generate t-route configs + raise NotImplementedError(f"{self.__class__.__name__} doesn't yet support t-route config auto generation") + + def add_initial_data(self): + """ + Assemble and add the initial data. + + Raises + ------- + DmodRuntimeError + Raised when initial data could not be assembled and/or added successfully to the dataset. + """ + # TODO: add threading support + + # A composite config will always need these items, so we can immediately add them + self._add_realization_config_data() + self._add_bmi_init_config_data() + + # However, these two things may not be necessary, depending on the job + if self._job_request.request_body.t_route_config_data_id is not None: + self._add_troute_config_data() + + if isinstance(self._job_request, NgenCalibrationRequest): + self._add_ngen_cal_config_data() + + def copy_item(self, item_name: str, other_dataset: Dataset, dest_path: Optional[str] = None) -> bool: + """ + Copy a data item that already exists in some other dataset. + + Parameters + ---------- + item_name : str + The data item name. + other_dataset : Dataset + The source dataset containing the item already. + dest_path : Optional[str] + An optional item name when adding to the new dataset, if something other than `item_name`. + + Returns + ------- + bool + Whether the copy was successful. + + See Also + ------- + copy_items + """ + # TODO: when optimizations are added, might look at really keeping this with its own implementation + return self.copy_items(item_names=[item_name], other_dataset=other_dataset, + dest_names=None if dest_path is None else {item_name: dest_path}) + + def copy_items(self, item_names: List[str], other_dataset: Dataset, dest_names: Optional[Dict[str, str]] = None): + """ + Copy data items that already exists in some other dataset. + + Parameters + ---------- + item_names : List[str] + The data item names. + other_dataset : Dataset + The source dataset containing the items already. + dest_names : Optional[Dict[str, str]] + An optional mapping of exiting item name to new, destination item name within the new dataset, if not simply + the name of the existing item (even if not ``None``, not required to map all items from ``item_names``). + + Returns + ------- + bool + Whether the copy was successful. + """ + if dest_names is None: + dest_names = dict() + + # TODO: add optimizations later for this in cases when other_dataset uses the same data manager + for item_name in item_names: + # Immediately stop and return False if anything doesn't work properly + if not self._dataset_manager.add_data(dataset_name=self._dataset_name, + dest=dest_names.get(item_name, item_name), + data=other_dataset.manager.get_data(dataset_name=other_dataset.name, + item_name=item_name)): + return False + # If we complete the loop, everything must have been successful + return True + + @property + def source_datasets(self) -> Dict[str, Dataset]: + """ + Datasets that are sources of data that make up the initial data this instance will add, keyed by name. + + An instance expects the received ::class:`DataRequirement` passed during initialization to have a domain that + includes `data_id` values of any/all datasets from which initial data is obtained or derived, per the composite + config data format. The corresponding source datasets are made accessible via this property. + + Property lazily initializes and must communicate with **all** dataset managers to search for ::class:`Dataset` + objects for the collection. + + Returns + ------- + Dict[str, Dataset] + Datasets that are sources of data that make up the initial data this instance will add, keyed by name. + """ + if self._source_datasets is None: + self._source_datasets: Dict[str, Dataset] = {} + try: + for ds_id in self._requirement.domain.discrete_restrictions[StandardDatasetIndex.COMPOSITE_SOURCE_ID].values: + manager = [m for _, m in self._all_dataset_managers.items() if ds_id in m.datasets][0] + self._source_datasets[ds_id] = manager.datasets[ds_id] + except Exception as e: + msg = "Failed to find source datasets and managers initializing {} ({}: {})" + raise DmodRuntimeError(msg.format(self.__class__.__name__, e.__class__.__name__, str(e))) + return self._source_datasets From 11b36c5049acaca50d643741c21007f771023361 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 13 Jul 2023 14:50:52 -0400 Subject: [PATCH 04/25] Move data derive util to initial data adders. Add implementation of _derive_composite_job_config() using initial data adder class, and update _derive_realization_config_from_formulations function to also use an initial data adder class. --- .../dmod/dataservice/data_derive_util.py | 228 +++++++----------- 1 file changed, 89 insertions(+), 139 deletions(-) diff --git a/python/services/dataservice/dmod/dataservice/data_derive_util.py b/python/services/dataservice/dmod/dataservice/data_derive_util.py index cd02b798d..6a25c077d 100644 --- a/python/services/dataservice/dmod/dataservice/data_derive_util.py +++ b/python/services/dataservice/dmod/dataservice/data_derive_util.py @@ -1,6 +1,6 @@ -import json import logging +from .initial_data_adder_impl import CompositeConfigDataAdder, FromPartialRealizationConfigAdder from dmod.communication import AbstractNgenRequest, NGENRequest from dmod.communication.maas_request.ngen.partial_realization_config import PartialRealizationConfig from dmod.core.meta_data import DataCategory, DataDomain, DataFormat, DataRequirement, StandardDatasetIndex @@ -9,7 +9,7 @@ from dmod.scheduler.job import Job, JobExecStep from ngen.config.configurations import Forcing, Time from ngen.config.realization import NgenRealization, Realization -from typing import Callable, Dict, List, Optional, Tuple +from typing import Dict, List, Optional class DataDeriveUtil: @@ -28,6 +28,37 @@ class DataDeriveUtil: def __init__(self, data_mgrs_by_ds_type: Dict[DatasetType, DatasetManager]): self._all_data_managers: Dict[DatasetType, DatasetManager] = data_mgrs_by_ds_type + def _apply_dataset_to_requirement(self, dataset: Dataset, requirement: DataRequirement, job: Job): + """ + Set ::attribute:`DataRequirement.fulfilled_access_at` and ::attribute:`DataRequirement.fulfilled_by`. + + Update the provided requirement's ::attribute:`DataRequirement.fulfilled_access_at` and + ::attribute:`DataRequirement.fulfilled_by` attributes to associate the requirement with the provided dataset. + The dataset is assume to have already been determined as satisfactory to fulfill the given requirement. + + Parameters + ---------- + dataset : Dataset + The dataset that fulfills the given requirement. + requirement : DataRequirement + The requirement fulfilled by the given dataset. + job : Job + The job owning the given requirement, which is needed for determining the appropriate value to use for + :attribute:`DataRequirement.fulfilled_access_at` + + See Also + ------- + _determine_access_location + """ + ################################################################################# + # It is important that `fulfilled_access_at` is set first (or at least that the # + # _determine_access_location function is called first) to ensure `fulfilled_by # + # isn't set if something with `fulfilled_access_at` goes wrong. # + ################################################################################# + requirement.fulfilled_access_at = self._determine_access_location(dataset, job) + ################################################################################# + requirement.fulfilled_by = dataset.name + def _build_forcing_config_for_realization(self, request: AbstractNgenRequest) -> Forcing: """ Build a ::class:`Forcing` config object from to satisfy requirements of this request. @@ -127,9 +158,37 @@ async def _derive_composite_job_config(self, requirement: DataRequirement, job: job : Job The job "owning" the relevant requirement. """ - # TODO: ********* implement ********* - msg = "{}._derive_composite_job_config still must be implemented".format(self.__class__.__name__) - raise NotImplementedError(msg) + # First, determine appropriate hydrofabric + restricts = [r for i, r in requirement.domain.discrete_restrictions if i == StandardDatasetIndex.HYDROFABRIC_ID] + if len(restricts) != 1: + msg = "Cannot derive composite config for job {} requirement that has no Hydrofabric id defined" + raise DmodRuntimeError(msg.format(job.job_id)) + # TODO: (later) consider if we need to account (as error or otherwise) for multiple hydrofabric ids here + hydrofabric_id = restricts[0].values[0] + + # Also construct a name for the dataset we are generating, based on the job + ds_name = "job-{}-composite-config".format(job.job_id) + + # Build a modified domain, based on the requirement, but with any name/data_id restriction removed + req_domain = requirement.domain + continuous_restricts = [r for idx, r in req_domain.continuous_restrictions.items()] + # Leave out dataset's name/data_id restriction, as it's unnecessary here, and just use None if nothing else + discrete_restricts = [r for idx, r in req_domain.discrete_restrictions if idx != StandardDatasetIndex.DATA_ID] + if len(discrete_restricts) == 0: + discrete_restricts = None + domain = DataDomain(data_format=req_domain.data_format, continuous_restrictions=continuous_restricts, + discrete_restrictions=discrete_restricts) + + # TODO: (later) more intelligently determine type + ds_type = DatasetType.OBJECT_STORE + manager = self._all_data_managers[ds_type] + data_adder = CompositeConfigDataAdder(requirement=requirement, job=job, hydrofabric_id=hydrofabric_id, + all_dataset_managers=self._all_data_managers, dataset_name=ds_name, + dataset_manager=manager) + dataset: Dataset = manager.create(name=ds_name, category=DataCategory.CONFIG, domain=domain, is_read_only=False, + initial_data=data_adder) + + self._apply_dataset_to_requirement(dataset=dataset, requirement=requirement, job=job) def _derive_realization_config_from_formulations(self, requirement: DataRequirement, job: Job): """ @@ -140,38 +199,31 @@ def _derive_realization_config_from_formulations(self, requirement: DataRequirem requirement job """ - request = job.model_request - if isinstance(request, AbstractNgenRequest): - # TODO: make sure that, once we are generating BMI init config datasets, the path details get provided as - # needed to this function when generating the realization config - # Get the necessary items to define our data_adder function (in this case, just a realization config) - real_config_obj = self._build_ngen_realization_config_from_request(request=request, job=job) - - def data_add_func(dataset_name: str, manager: DatasetManager) -> bool: - result = manager.add_data(dataset_name=dataset_name, dest='realization_config.json', - data=json.dumps(real_config_obj.json()).encode()) - return False if not result else True - - # TODO: (later) more intelligently determine type - ds_type = DatasetType.OBJECT_STORE - ds_name = requirement.domain.discrete_restrictions[StandardDatasetIndex.DATA_ID].values[0] - dataset: Dataset = self._exec_dataset_derive_for_requirement(dataset_type=ds_type, - data_category=DataCategory.CONFIG, - dataset_name=ds_name, - requirement=requirement, - data_adder=data_add_func) - # Update the requirement fulfilled_access_at and fulfilled_by to associate with the new dataset - ################################################################################# - # It is important that `fulfilled_access_at` is set first (or at least that the # - # _determine_access_location function is called first) to ensure `fulfilled_by # - # isn't set if something with `fulfilled_access_at` goes wrong. # - ################################################################################# - requirement.fulfilled_access_at = self._determine_access_location(dataset, job) - ################################################################################# - requirement.fulfilled_by = dataset.name - else: - msg = 'Bad job request type for {} when deriving realization config from formulations'.format(job.job_id) - raise DmodRuntimeError(msg) + # TODO: (later) more intelligently determine type + ds_type = DatasetType.OBJECT_STORE + ds_name = requirement.domain.discrete_restrictions[StandardDatasetIndex.DATA_ID].values[0] + ds_mgr = self._all_data_managers[ds_type] + + initial_data = FromPartialRealizationConfigAdder(job=job, all_dataset_managers=self._all_data_managers, + dataset_name=ds_name, dataset_manager=ds_mgr) + + # Build a modified domain, based on the requirement, but with any name/data_id restriction removed + req_domain = requirement.domain + continuous_restricts = [r for idx, r in req_domain.continuous_restrictions.items()] + # Leave out dataset's name/data_id restriction, as it's unnecessary here, and just use None if nothing else + discrete_restricts = [r for idx, r in req_domain.discrete_restrictions if idx != StandardDatasetIndex.DATA_ID] + if len(discrete_restricts) == 0: + discrete_restricts = None + domain = DataDomain(data_format=req_domain.data_format, continuous_restrictions=continuous_restricts, + discrete_restrictions=discrete_restricts) + + dataset: Dataset = self._all_data_managers[ds_type].create(name=ds_name, + category=DataCategory.CONFIG, + domain=domain, + is_read_only=False, + initial_data=initial_data) + + self._apply_dataset_to_requirement(dataset=dataset, requirement=requirement, job=job) def _determine_access_location(self, dataset: Dataset, job: Job) -> str: """ @@ -209,108 +261,6 @@ def _determine_access_location(self, dataset: Dataset, job: Job) -> str: msg = "Could not determine proper access location for new dataset of type {} by non-Docker job {}." raise DmodRuntimeError(msg.format(dataset.__class__.__name__, job.job_id)) - def _exec_dataset_derive(self, - dataset_type: DatasetType, - data_category: DataCategory, - dataset_name: str, - domain: DataDomain, - data_adder: Callable[[str, DatasetManager], bool]) -> Dataset: - """ - General (reusable) function to execute the derivation function to build a new dataset from existing data. - - Function works by first receiving parameters for all things necessary to create a new dataset, and using them to - do so. Function also receives as a param a callable. This callable should handle all the steps for deriving - data from other sources and adding this data to the newly created dataset. - - Note that if any exceptions occur after the empty dataset is created and before all data is successfully added - (or if that is not successful), then the new dataset will be deleted. - - Parameters - ---------- - dataset_type : DatasetType - The backend storage type to use for the new dataset, which implies the dataset manager to use. - data_category : DataCategory - The category of data to be stored within the new dataset. - dataset_name : str - The name to use for the new dataset. - domain : DataDomain - The data domain definition for the new dataset. - data_adder : Callable[[str, DatasetManager], bool] - A callable that performs the act of adding data to the new dataset, accepting the dataset name string and - dataset manager object as arguments, and returning whether data was added successfully. - - Returns - ------- - Dataset - The newly created dataset with derived data successfully written to it. - """ - # Then create the dataset - mgr = self._all_data_managers[dataset_type] - dataset: Dataset = mgr.create(name=dataset_name, is_read_only=False, category=data_category, domain=domain) - # Once dataset is created, everything else goes in a try block so we can remove the dataset if something fails - try: - # Exec our data-add callable func; if it returns False, raise exception so the empty dataset will be removed - if not data_adder(dataset_name, mgr): - msg_tmp = "Could not write data to new {} dataset {}" - raise DmodRuntimeError(msg_tmp.format(domain.data_format.name, dataset_name)) - # If an exception is thrown after dataset was created, then data was not added; i.e., dataset should be deleted - except Exception as e: - mgr.delete(dataset) - raise e - - # If we past the try/except, the data was added successfully, so return the dataset - return dataset - - def _exec_dataset_derive_for_requirement(self, - dataset_type: DatasetType, - data_category: DataCategory, - dataset_name: str, - requirement: DataRequirement, - data_adder: Callable[[str, DatasetManager], bool]) -> Dataset: - """ - Execute derivation function to build a new dataset from existing data that will satisfy a supplied requirement. - - Function works by first extracting a domain from the requirement (omitting any ``data_id`` restrictions). From - there, it returns the result of a nested call to ::method:`_exec_dataset_derive`, passing as args the - aforementioned domain and its applicable parameters. - - Parameters - ---------- - dataset_type : DatasetType - The backend storage type to use for the new dataset, which implies the dataset manager to use. - data_category : DataCategory - The category of data to be stored within the new dataset. - dataset_name : str - The name to use for the new dataset. - requirement : DataRequirement - The requirement to be fulfilled by the derived dataset, used to create the necessary ::class:`DataDomain`. - data_adder : Callable[[str], bool] - A callable that performs the act of adding data to the new dataset, accepting the dataset name string and - dataset manager object as arguments, and returning whether data was added successfully. - - Returns - ------- - Dataset - The newly created dataset with derived data successfully written to it. - - See Also - ------- - _exec_dataset_derive - - """ - # Build a modified domain, based on the requirement, but with any name/data_id restriction removed - req_domain = requirement.domain - continuous_restricts = [r for idx, r in req_domain.continuous_restrictions.items()] - # Leave out dataset's name/data_id restriction, as it's unnecessary here, and just use None if nothing else - discrete_restricts = [r for idx, r in req_domain.discrete_restrictions if idx != StandardDatasetIndex.DATA_ID] - if len(discrete_restricts) == 0: - discrete_restricts = None - domain = DataDomain(data_format=req_domain.data_format, continuous_restrictions=continuous_restricts, - discrete_restrictions=discrete_restricts) - # Then defer to this function - return self._exec_dataset_derive(dataset_type=dataset_type, data_category=data_category, - dataset_name=dataset_name, domain=domain, data_adder=data_adder) - def _get_known_datasets(self) -> Dict[str, Dataset]: """ Get real-time mapping of all datasets known to this instance via its managers, in a map keyed by dataset name. From 3b2b2b8a606462a8a5aaaa3a4d5026e66750aad8 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 13 Jul 2023 14:52:53 -0400 Subject: [PATCH 05/25] Removing deprecated functions of DataDeriveUtil. Removing _build_forcing_config_for_realization and _build_ngen_realization_config_from_request functions for util class, as this functionality is now encapsulated by initial data adder subtypes. --- .../dmod/dataservice/data_derive_util.py | 85 ------------------- 1 file changed, 85 deletions(-) diff --git a/python/services/dataservice/dmod/dataservice/data_derive_util.py b/python/services/dataservice/dmod/dataservice/data_derive_util.py index 6a25c077d..5835cdf1a 100644 --- a/python/services/dataservice/dmod/dataservice/data_derive_util.py +++ b/python/services/dataservice/dmod/dataservice/data_derive_util.py @@ -59,91 +59,6 @@ def _apply_dataset_to_requirement(self, dataset: Dataset, requirement: DataRequi ################################################################################# requirement.fulfilled_by = dataset.name - def _build_forcing_config_for_realization(self, request: AbstractNgenRequest) -> Forcing: - """ - Build a ::class:`Forcing` config object from to satisfy requirements of this request. - - Function builds a ::class:`Forcing` config object as a part of the steps to create a ngen realization config - for the given request. It is typically expected that the provided request will include a partial realization - config object that includes certain details. - - Parameters - ---------- - request: AbstractNgenRequest - An AbstractNgenRequest request that needs a realization config generated, and as part of that, a forcing - config. - - Returns - ------- - Forcing - Forcing config object to be used in building a ngen realization config to satisfy this request. - """ - forcing_cfg_params = dict() - - # Get the correct forcing dataset from associated requirement - # TODO: double check that this is being added when we do data checks - forcing_req = [r for r in request.data_requirements if r.category == DataCategory.FORCING][0] - forcing_dataset_name = forcing_req.fulfilled_by - forcing_dataset = self._get_known_datasets().get(forcing_dataset_name) - - # Figure out the correct provider type from the dataset format - # TODO: this may not be the right way to do this to instantiate the object directly (i.e., not through JSON) - if forcing_dataset.data_format == DataFormat.NETCDF_FORCING_CANONICAL: - forcing_cfg_params['provider'] = 'NetCDF' - elif forcing_dataset.data_format == DataFormat.AORC_CSV: - forcing_cfg_params['provider'] = 'CsvPerFeature' - - # TODO: (#needs_issue) introduce logic to examine forcing dataset and intelligently assess what the file - # name(s)/pattern(s) should be if they aren't explicitly provided - - if request.formulation_configs is not None and request.formulation_configs.forcing_file_pattern is not None: - forcing_cfg_params['file_pattern'] = request.formulation_configs.forcing_file_pattern - - # Finally, produce the right path - # TODO: these come from scheduler.py; may need to centralize somehow - forcing_cfg_params['path'] = '/dmod/datasets/' - if request.formulation_configs is not None and request.formulation_configs.is_env_workaround: - forcing_cfg_params['path'] += 'from_env' - else: - forcing_cfg_params['path'] += '{}/{}/'.format(DataCategory.FORCING.name.lower(), forcing_dataset_name) - - if request.formulation_configs is not None and request.formulation_configs.forcing_file_name is not None: - forcing_cfg_params['path'] += request.formulation_configs.forcing_file_name - - return Forcing(**forcing_cfg_params) - - def _build_ngen_realization_config_from_request(self, request: AbstractNgenRequest, job: Job) -> NgenRealization: - """ - Build a ngen realization config object from current service state and partial config within the job request. - - Parameters - ---------- - request: NGENRequest - The original request initiating the related NextGen workflow job. - job: Job - The NextGen job for which an explicit realization config needs to be built from implied details. - - Returns - ------- - NgenRealization - The built realization config. - """ - params = dict() - - if request.formulation_configs.global_formulations is not None: - params['global_config'] = Realization(formulations=request.formulation_configs.global_formulations, - forcing=self._build_forcing_config_for_realization(request)) - - params['time'] = Time(start_time=request.time_range.begin, end_time=request.time_range.end) - - if request.formulation_configs.routing_config is not None: - params['routing'] = request.formulation_configs.routing_config - - if request.formulation_configs.catchment_formulations is not None: - params['catchments'] = request.formulation_configs.catchment_formulations - - return NgenRealization(**params) - async def _derive_composite_job_config(self, requirement: DataRequirement, job: Job): """ Derive and link a ``DataFormat.NGEN_JOB_COMPOSITE_CONFIG`` dataset to fulfill the given job's given requirement. From e20f090e14ff86f6ccaa9a276405d70c46582460 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 13 Jul 2023 15:04:34 -0400 Subject: [PATCH 06/25] Fix ngen-config, add ngen-cal dataservice deps. Fixing dependency definition for ngen-config to use git URL (since not published in pypi), and adding similar definition for ngen-cal. --- python/services/dataservice/setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/services/dataservice/setup.py b/python/services/dataservice/setup.py index 0d163eb95..fbda1a239 100644 --- a/python/services/dataservice/setup.py +++ b/python/services/dataservice/setup.py @@ -19,6 +19,7 @@ license='', install_requires=['dmod-core>=0.8.0', 'dmod-communication>=0.13.0', 'dmod-scheduler>=0.10.0', 'dmod-modeldata>=0.9.0', 'redis', "pydantic>=1.10.8,~=1.10", "fastapi", "uvicorn[standard]", - "ngen-config>=0.1.1"], + 'ngen-config@git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_conf', + 'ngen-cal@git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_cal'], packages=find_namespace_packages(exclude=['dmod.test', 'deprecated', 'conf', 'schemas', 'ssl', 'src']) ) From 6943014d2bf8ee97d731ad4da25d75a374fab56f Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 13 Jul 2023 15:08:47 -0400 Subject: [PATCH 07/25] Adjust dep defs using NOAA-OWP in git url. Updating dependency definitions to OWP packages not published to pypi, which therefore use git urls, to change usage of NOAA-OWP to noaa-owp in order to be consistent with usage of such urls within dependencies. --- python/lib/communication/setup.py | 2 +- python/lib/modeldata/setup.py | 2 +- requirements.txt | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/lib/communication/setup.py b/python/lib/communication/setup.py index 5ca3bf096..c5da0bf80 100644 --- a/python/lib/communication/setup.py +++ b/python/lib/communication/setup.py @@ -22,6 +22,6 @@ license='', include_package_data=True, install_requires=['dmod-core>=0.4.2', 'websockets>=8.1', 'jsonschema', 'redis', 'pydantic>=1.10.8,~=1.10', - 'ngen-config@git+https://github.com/NOAA-OWP/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_conf'], + 'ngen-config@git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_conf'], packages=find_namespace_packages(include=['dmod.*'], exclude=['dmod.test']) ) diff --git a/python/lib/modeldata/setup.py b/python/lib/modeldata/setup.py index 869c77080..ce355e185 100644 --- a/python/lib/modeldata/setup.py +++ b/python/lib/modeldata/setup.py @@ -30,7 +30,7 @@ "minio", "aiohttp<=3.7.4", "shapely>=2.0.0", - "hypy@git+https://github.com/NOAA-OWP/hypy@master#egg=hypy&subdirectory=python", + "hypy@git+https://github.com/noaa-owp/hypy@master#egg=hypy&subdirectory=python", "gitpython", "pydantic>=1.10.8,~=1.10", ], diff --git a/requirements.txt b/requirements.txt index 547144221..353a5af83 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,7 +19,7 @@ geopandas gitpython python-dotenv djangorestframework -git+https://github.com/NOAA-OWP/hypy@master#egg=hypy&subdirectory=python +git+https://github.com/noaa-owp/hypy@master#egg=hypy&subdirectory=python hydrotools.nwis-client numpy scikit-learn @@ -58,4 +58,4 @@ packaging pyparsing shapely>=2.0.0 pydantic>=1.10.8,~=1.10 -git+https://github.com/NOAA-OWP/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_conf +git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_conf From 6e64dfb2301fa802fe97c08ff6e5d723fd42ba5f Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 13 Jul 2023 15:09:15 -0400 Subject: [PATCH 08/25] Add ngen-cal dep to project requirements.txt. --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 353a5af83..45914a11a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -59,3 +59,4 @@ pyparsing shapely>=2.0.0 pydantic>=1.10.8,~=1.10 git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_conf +git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-cal&subdirectory=python/ngen_cal From f85e9fc7d2dd030c496ea4da78e712d0cfb01049 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 13:50:37 -0400 Subject: [PATCH 09/25] Add source dataset data_id as index of composite. Adding new standard index value COMPOSITE_SOURCE_ID and updating the NGEN_JOB_COMPOSITE_CONFIG DataFormat value to include the former as one of the latter's indices. --- python/lib/core/dmod/core/meta_data.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/lib/core/dmod/core/meta_data.py b/python/lib/core/dmod/core/meta_data.py index 428731040..012d8d74e 100644 --- a/python/lib/core/dmod/core/meta_data.py +++ b/python/lib/core/dmod/core/meta_data.py @@ -40,6 +40,8 @@ class StandardDatasetIndex(str, PydanticEnum): """ A specialized index for the unique data id of an associated realization config dataset. """ FILE_NAME = (8, str, "FILE_NAME") """ Index for the name of a data file within a dataset. """ + COMPOSITE_SOURCE_ID = (9, str, "COMPOSITE_SOURCE_ID") + """ Index for DATA_ID values of source dataset(s) when dataset is composite format and derives from others. """ def __new__(cls, index: int, ty: type, name: str): o = str.__new__(cls, name) @@ -160,6 +162,7 @@ class DataFormat(PydanticEnum): StandardDatasetIndex.TIME: None, StandardDatasetIndex.DATA_ID: None, StandardDatasetIndex.FILE_NAME: None, + StandardDatasetIndex.COMPOSITE_SOURCE_ID: None }, None, False From 662405810c34c0aeaf000a49b6c27ecc7a814a95 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 14:06:34 -0400 Subject: [PATCH 10/25] Update NGENRequestBody for composite datasets. Updating type with attribute for composite config dataset id, making formerly non-Optional data_id values for other config datasets Optional, and updating docstring and attribute descriptions. --- .../ngen/ngen_exec_request_body.py | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/python/lib/communication/dmod/communication/maas_request/ngen/ngen_exec_request_body.py b/python/lib/communication/dmod/communication/maas_request/ngen/ngen_exec_request_body.py index 7ac05495d..0e183a749 100644 --- a/python/lib/communication/dmod/communication/maas_request/ngen/ngen_exec_request_body.py +++ b/python/lib/communication/dmod/communication/maas_request/ngen/ngen_exec_request_body.py @@ -9,18 +9,36 @@ class NGENRequestBody(Serializable): + """ + Request body encapsulating data within outer request. - time_range: TimeRange - hydrofabric_uid: str - hydrofabric_data_id: str - realization_config_data_id: str = Field(description="Unique id of the realization config dataset for this request.") - forcings_data_id: Optional[str] = Field(None, description="Unique id of forcings dataset, if provided.") - bmi_config_data_id: str + Encapsulated data to define a requested ngen job. It includes details on the time range, hydrofabric, and + configurations need for executing ngen. It may also include a reference to what forcing data to use. + + An instance contains a reference to the ::class:`DataFormat.NGEN_JOB_COMPOSITE_CONFIG` dataset containing + configurations for the requested job. In cases when this dataset doesn't yet exist, an instance also contains the + necessary details for generating such a dataset. In particular, this includes: + + - a realization config dataset id **OR** a ::class:`PartialRealizationConfig` + - (optionally) a BMI init config dataset id + - (optionally) a t-route configuration dataset id + + When dataset ids are given, these are treated as sources for the new ::class:`DataFormat.NGEN_JOB_COMPOSITE_CONFIG`, + with the contents of the former copied into the latter as appropriate. + """ + + time_range: TimeRange = Field(description="The time range over which to run ngen simulation(s).") + hydrofabric_uid: str = Field(description="The (DMOD-generated) unique id of the hydrofabric to use.") + hydrofabric_data_id: str = Field(description="The dataset id of the hydrofabric dataset to use.") + composite_config_data_id: str = Field(None, description="Id of required ngen composite config dataset.") + realization_config_data_id: Optional[str] = Field(None, description="Id of composite source of realization config.") + forcings_data_id: Optional[str] = Field(None, description="Id of requested forcings dataset.") + bmi_config_data_id: Optional[str] = Field(None, description="Id of composite source of BMI init configs.") # NOTE: consider pydantic.conlist to constrain this type rather than using validators - catchments: Optional[List[str]] + catchments: Optional[List[str]] = Field(None, description="Subset of ids of catchments to include in job.") partial_realization_config: Optional[PartialRealizationConfig] = Field( default=None, description="Partial realization config, when supplied by user.") - partition_cfg_data_id: Optional[str] + partition_cfg_data_id: Optional[str] = Field(None, description="Partition config dataset, when multi-process job.") t_route_config_data_id: Optional[str] = Field(None, description="Id of composite source of t-route config.") @validator("catchments") From 51a08748fd7987eeae82f6b8fef870cb6e4a4355 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 14:09:11 -0400 Subject: [PATCH 11/25] Add NGENRequestBody.composite_config_source_ids. Adding method to encapsulate logic for gathering data_id values for any source dataset for a new, to-be-created composite configuration dataset, while also encapsulating the logic for keeping 'None' values out for any applicable data_id attributes that were not set. --- .../ngen/ngen_exec_request_body.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/python/lib/communication/dmod/communication/maas_request/ngen/ngen_exec_request_body.py b/python/lib/communication/dmod/communication/maas_request/ngen/ngen_exec_request_body.py index 0e183a749..644b60d67 100644 --- a/python/lib/communication/dmod/communication/maas_request/ngen/ngen_exec_request_body.py +++ b/python/lib/communication/dmod/communication/maas_request/ngen/ngen_exec_request_body.py @@ -63,3 +63,32 @@ def dict(self, **kwargs) -> dict: if kwargs.get("exclude", False) is False: kwargs["exclude"] = {f for f in only_if_set if not self.__getattribute__(f)} return super().dict(**kwargs) + + @property + def composite_config_source_ids(self) -> List[str]: + """ + A list of the data ids for any datasets that are sources of data for a generated composite config dataset. + + An instance may know dataset ids of existing datasets from which a new composite config dataset should be + derived. For the base type, this potentially includes datasets for a realization config, BMI init configs, and + t-route configs. Any such datasets are referenced in certain attributes for the instance; + e.g., ::attribute:`bmi_config_data_id`. This property encapsulates collecting those applicable attribute values + while filtering out any that are not set. + + Note that an empty list does not (by itself) imply the composite config dataset is expected to exist, as it is + possible for the dataset to be created from a ::class:`PartialRealizationConfig` and auto-generated BMI init + configs. + + Returns + ------- + List[str] + List of the data ids for any datasets that are sources of data for a generated composite config dataset. + """ + result = [] + if self.realization_config_data_id is not None: + result.append(self.realization_config_data_id) + if self.bmi_config_data_id is not None: + result.append(self.bmi_config_data_id) + if self.t_route_config_data_id is not None: + result.append(self.t_route_config_data_id) + return result From 3f34cb6ffd36e84ebe3811518a2011bbc388684f Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 14:13:43 -0400 Subject: [PATCH 12/25] Update AbstractNgenRequest for composite datasets. Removing realization and BMI init config data requirements and replacing with on for composite config data; also including t_route_config_data_id property when such a data_id is provided for part of the source data for a new composite config dataset. --- .../ngen/abstract_nextgen_request.py | 133 ++++++++---------- 1 file changed, 58 insertions(+), 75 deletions(-) diff --git a/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py b/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py index dab245d61..fc6143e53 100644 --- a/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py +++ b/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py @@ -26,32 +26,21 @@ class AbstractNgenRequest(DmodJobRequest, ABC): - execution time range - hydrofabric UID, dataset id, and ::class:`DataRequirement` - - primary config dataset id (i.e., the realization config) and ::class:`DataRequirement` - - BMI init configs dataset id and ::class:`DataRequirement` - - forcing ::class:`DataRequirement` + - config files dataset id (uses ::class:`DataFormat.NGEN_JOB_COMPOSITE_CONFIG`) and ::class:`DataRequirement` - list of each output dataset's ::class:`DataFormat` + - forcing ::class:`DataRequirement` and (optionally) a forcing dataset id - (Optional) partitioning config dataset id and ::class:`DataRequirement` - (Optional) list of catchments - This type provides the implementation for ::method:`factory_init_from_deserialized_json` for all subtypes. This - works by having each level of the class hierarchy be responsible for deserialization applicable to it, as described - below. - - Instead of implementing full deserialization, this type and subtypes include a function to deserialize from JSON the - type-specific keyword parameters passed to the individual type's ::method:`__init__`. This is the - ::method:`deserialize_for_init` class method. Subclass implementations should ensure they call superclass's version - and build on the returned dict of deserialized keyword params from ancestor levels. - - This abstract type also implements a version of ::method:`to_dict` for serializing all the state included at this - level. + It is possible for the ``NGEN_JOB_COMPOSITE_CONFIG`` dataset to not already exist, in which case an instance of this + type also implies a request to derive such a dataset. """ request_body: NGENRequestBody _hydrofabric_data_requirement = PrivateAttr(None) _forcing_data_requirement = PrivateAttr(None) - _realization_cfg_data_requirement = PrivateAttr(None) - _bmi_cfg_data_requirement = PrivateAttr(None) + _composite_config_data_requirement = PrivateAttr(None) _partition_cfg_data_requirement = PrivateAttr(None) class Config: @@ -103,43 +92,61 @@ def bmi_config_data_id(self) -> str: return self.request_body.bmi_config_data_id @property - def bmi_cfg_data_requirement(self) -> DataRequirement: + def catchments(self) -> Optional[List[str]]: """ - A requirement object defining of the BMI configuration data needed to execute this request. + An optional list of catchment ids for those catchments in the request ngen execution. + + No list implies "all" known catchments. Returns ------- - DataRequirement - A requirement object defining of the BMI configuration data needed to execute this request. + Optional[List[str]] + An optional list of catchment ids for those catchments in the request ngen execution. """ - if self._bmi_cfg_data_requirement is None: - bmi_config_restrict = [ - DiscreteRestriction( - variable="data_id", values=[self.bmi_config_data_id] - ) - ] - bmi_config_domain = DataDomain( - data_format=DataFormat.BMI_CONFIG, - discrete_restrictions=bmi_config_restrict, - ) - self._bmi_cfg_data_requirement = DataRequirement( - domain=bmi_config_domain, is_input=True, category=DataCategory.CONFIG - ) - return self._bmi_cfg_data_requirement + return self.request_body.catchments @property - def catchments(self) -> Optional[List[str]]: + def composite_config_data_id(self) -> str: """ - An optional list of catchment ids for those catchments in the request ngen execution. + Index value of ``data_id`` to uniquely identify the applicable composite config dataset for the requested job. - No list implies "all" known catchments. + Returns + ------- + str + Index value of ``data_id`` of the applicable composite config dataset for the requested job. + """ + return self.request_body.composite_config_data_id + + @property + def composite_config_data_requirement(self) -> DataRequirement: + """ + A requirement object defining of the composite configuration data needed to execute this request. Returns ------- - Optional[List[str]] - An optional list of catchment ids for those catchments in the request ngen execution. + DataRequirement + A requirement object defining of the composite configuration data needed to execute this request. """ - return self.request_body.catchments + if self._composite_config_data_requirement is None: + cont_restricts = [self.time_range] + + disc_restricts = [ + DiscreteRestriction(variable="HYDROFABRIC_ID", values=[self.request_body.hydrofabric_uid]), + self._gen_catchments_domain_restriction(), + DiscreteRestriction(variable="DATA_ID", values=[self.composite_config_data_id]) + ] + + # Add a restriction including source dataset ids as well, if there are any + src_ds_ids = self.request_body.composite_config_source_ids + if len(src_ds_ids) > 0: + disc_restricts.append(DiscreteRestriction(variable="COMPOSITE_SOURCE_ID", values=src_ds_ids)) + + composite_config_domain = DataDomain(data_format=DataFormat.NGEN_JOB_COMPOSITE_CONFIG, + continuous_restrictions=cont_restricts, + discrete_restrictions=disc_restricts) + self._composite_config_data_requirement = DataRequirement(domain=composite_config_domain, is_input=True, + category=DataCategory.CONFIG) + return self._composite_config_data_requirement @property def data_requirements(self) -> List[DataRequirement]: @@ -152,10 +159,9 @@ def data_requirements(self) -> List[DataRequirement]: List of all the explicit and implied data requirements for this request. """ requirements = [ - self.bmi_cfg_data_requirement, self.forcing_data_requirement, self.hydrofabric_data_requirement, - self.realization_cfg_data_requirement, + self.composite_config_data_requirement, ] if self.use_parallel_ngen: requirements.append(self.partition_cfg_data_requirement) @@ -366,31 +372,20 @@ def realization_config_data_id(self) -> str: return self.request_body.realization_config_data_id @property - def realization_cfg_data_requirement(self) -> DataRequirement: + def t_route_config_data_id(self) -> Optional[str]: """ - A requirement object defining of the realization configuration data needed to execute this request. + The index value of ``data_id`` to uniquely identify sets of t-route config data that are otherwise similar. + + For example, two t-route configs may apply to the same time and catchments, but be very different. The + nature of the differences is not necessarily even possible to define generally, and certainly not through + (pre-existing) indices. As such, the `data_id` index is added for such differentiating purposes. Returns ------- - DataRequirement - A requirement object defining of the realization configuration data needed to execute this request. + str + The index value of ``data_id`` to uniquely identify the required t-route config dataset. """ - if self._realization_cfg_data_requirement is None: - real_cfg_dis_restrict = [ - self._gen_catchments_domain_restriction(), - DiscreteRestriction( - variable="data_id", values=[self.realization_config_data_id] - ), - ] - real_cfg_domain = DataDomain( - data_format=DataFormat.NGEN_REALIZATION_CONFIG, - continuous_restrictions=[self.time_range], - discrete_restrictions=real_cfg_dis_restrict, - ) - self._realization_cfg_data_requirement = DataRequirement( - domain=real_cfg_domain, is_input=True, category=DataCategory.CONFIG - ) - return self._realization_cfg_data_requirement + return self.request_body.t_route_config_data_id @property def time_range(self) -> TimeRange: @@ -460,16 +455,4 @@ class ExternalAbstractNgenRequest(ExternalRequest, AbstractNgenRequest, ABC): def __eq__(self, other): return super().__eq__(other) and self.session_secret == other.session_secret - def __hash__(self): - hash_str = "{}-{}-{}-{}-{}-{}-{}-{}-{}".format( - self.time_range.to_json(), - self.hydrofabric_data_id, - self.hydrofabric_uid, - self.realization_config_data_id, - self.bmi_config_data_id, - self.session_secret, - self.cpu_count, - self.partition_cfg_data_id, - ",".join(self.catchments), - ) - return hash(hash_str) \ No newline at end of file + From 78af07b1388c58973080eb2110a3a070029fa079 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 14:21:09 -0400 Subject: [PATCH 13/25] Add specified forcings support in ngen request. Add support AbstractNgenRequest for getting a forcing dataset data_id from request_body and using that, if provided, as part of the built forcing_data_requirement property. --- .../ngen/abstract_nextgen_request.py | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py b/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py index fc6143e53..314993a69 100644 --- a/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py +++ b/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py @@ -188,7 +188,19 @@ def formulation_configs(self) -> Optional[PartialRealizationConfig]: """ return self.request_body.formulation_configs - # TODO: #needs_issue - Account for option when forcing dataset is explicitly provided + @property + def forcings_data_id(self) -> Optional[str]: + """ + Unique ``data_id`` of requested forcings dataset, if one was specified by the user. + + Returns + ------- + Optional[str] + ``data_id`` of the requested forcings dataset, or ``None`` if any forcings dataset may be used that + otherwise satisfies the requirements of the request. + """ + return self.request_body.forcings_data_id + @property def forcing_data_requirement(self) -> DataRequirement: """ @@ -200,11 +212,15 @@ def forcing_data_requirement(self) -> DataRequirement: A requirement object defining of the forcing data needed to execute this request. """ if self._forcing_data_requirement is None: + discrete_restricts = [self._gen_catchments_domain_restriction()] + if self.forcings_data_id is not None: + discrete_restricts.append(DiscreteRestriction(variable="DATA_ID", values=[self.forcings_data_id])) + # TODO: going to need to address the CSV usage later forcing_domain = DataDomain( data_format=DataFormat.AORC_CSV, continuous_restrictions=[self.time_range], - discrete_restrictions=[self._gen_catchments_domain_restriction()], + discrete_restrictions=discrete_restricts, ) self._forcing_data_requirement = DataRequirement( domain=forcing_domain, is_input=True, category=DataCategory.FORCING From 049aa2b749fe2d2b42d1b3a55d622af2106954d7 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 14:24:47 -0400 Subject: [PATCH 14/25] Fix AbstractNgenRequest property impl errors. Fix implementation of formulation_configs with correct nested property name, and updating type hint and docstring for partition config data requirement property to properly indication it is an Optional value. --- .../maas_request/ngen/abstract_nextgen_request.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py b/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py index 314993a69..88eca2e8f 100644 --- a/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py +++ b/python/lib/communication/dmod/communication/maas_request/ngen/abstract_nextgen_request.py @@ -186,7 +186,7 @@ def formulation_configs(self) -> Optional[PartialRealizationConfig]: Optional[PartialRealizationConfig] The user-supplied pieces of the ngen realization config to use for this request. """ - return self.request_body.formulation_configs + return self.request_body.partial_realization_config @property def forcings_data_id(self) -> Optional[str]: @@ -331,14 +331,14 @@ def partition_cfg_data_id(self) -> Optional[str]: return self.request_body.partition_cfg_data_id @property - def partition_cfg_data_requirement(self) -> DataRequirement: + def partition_cfg_data_requirement(self) -> Optional[DataRequirement]: """ - A requirement object defining of the partitioning configuration data needed to execute this request. + A requirement object defining the partitioning configuration data needed to execute this request. Returns ------- - DataRequirement - A requirement object defining of the partitioning configuration data needed to execute this request. + Optional[DataRequirement] + Optional requirement object defining the partitioning configuration data needed to execute this request. """ if self._partition_cfg_data_requirement is None and self.use_parallel_ngen: d_restricts = [] From c1bb497986b87a6bc86f78f4bddbc72bbeb43d1c Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 14:29:10 -0400 Subject: [PATCH 15/25] Update calibration request body for composite cfg. Updating NgenCalRequestEstimationConfig to align with super type to implement support for use of composite configuration datasets. --- .../ngen_cal_request_estimation_config.py | 101 ++++++++++++++++-- 1 file changed, 94 insertions(+), 7 deletions(-) diff --git a/python/lib/communication/dmod/communication/maas_request/ngen/ngen_cal_request_estimation_config.py b/python/lib/communication/dmod/communication/maas_request/ngen/ngen_cal_request_estimation_config.py index 2acfc4bab..334fa42eb 100644 --- a/python/lib/communication/dmod/communication/maas_request/ngen/ngen_cal_request_estimation_config.py +++ b/python/lib/communication/dmod/communication/maas_request/ngen/ngen_cal_request_estimation_config.py @@ -1,19 +1,106 @@ -from pydantic import Field -from typing import Any, Dict, List +from pydantic import Field, validator +from typing import Any, Dict, List, Optional from .ngen_exec_request_body import NGENRequestBody +# TODO: (later) consider adding different type for when receiving data_id of existing ngen_cal config dataset + class NgenCalRequestEstimationConfig(NGENRequestBody): + """ + Request body encapsulating data within outer calibration request. + + Encapsulated data to define a requested ngen-cal calibration job. It includes details on the time range, + hydrofabric, ngen configurations, and calibration configurations need for performing an ngen-cal calibration. It may + also include a reference to what forcing data to use. + + An instance contains a reference to the ::class:`DataFormat.NGEN_JOB_COMPOSITE_CONFIG` dataset containing + configurations for the requested job. In cases when this dataset doesn't yet exist, an instance also contains the + necessary details for generating such a dataset. In particular, this includes: + + - a realization config dataset id **OR** a ::class:`PartialRealizationConfig` + - a calibration config dataset **OR** calibration config parameters directly + - (optionally) a BMI init config dataset id + - a t-route configuration dataset id + + When dataset ids are given, these are treated as sources for the new ::class:`DataFormat.NGEN_JOB_COMPOSITE_CONFIG`, + with the contents of the former copied into the latter as appropriate. + """ + ngen_cal_config_data_id: Optional[str] = Field(None, description="Id of existing source ngen_cal config dataset.") parallel_proc: int = Field(1, gt=0, description="Number of parallel ngen processes for ngen-cal to use.") # TODO: #needs_issue - Add validator for supported values algorithm: str = Field("dds", description="The calibration optimization algorithm.") - objective_function: str = Field(description="The calibration objective function.") - iterations: int = Field(gt=0, description="The number of ngen iterations to run through during calibration.") + objective_function: str = Field(None, description="The calibration objective function.") + iterations: int = Field(None, gt=0, description="The number of ngen iterations to run through during calibration.") # TODO: #needs_issue - Add validator to ensure this isn't larger than the total number of iterations start_iteration: int = Field(1, gt=0, description="The starting iteration, which is greater than 1 for restarts.") # TODO: #needs_issue - Add validator for supported values (independent, uniform, explicit) # TODO: #needs_issue - Add validator for adjusting case when needed - model_strategy: str = Field(description="The particular ngen calibration strategy to use.") - model_params: Dict[str, List[Dict[str, Any]]] - ngen_cal_config_data_id: str + model_strategy: str = Field(None, description="The particular ngen calibration strategy to use.") + model_params: Dict[str, List[Dict[str, Any]]] = None + + @validator("ngen_cal_config_data_id", pre=True) + def _validate_ngen_cal_config_data_id(cls, value, field): + """ + Validate that, if ``ngen_cal_config_data_id`` is provided and not ``None``, it is a non-empty string. + + Parameters + ---------- + value + field + + Returns + ------- + Optional[str] + The value, if valid. + """ + if value is not None and len(value.strip()) == 0: + raise ValueError(f"{field.name} must either be None or non-empty string") + return value + + @validator("objective_function", "iterations", "model_strategy", pre=True, always=True) + def _validate_objective_function(cls, value, values, field): + """ + Validate that ``objective_function`` is set correctly unless ``ngen_cal_config_data_id`` is provided. + + Parameters + ---------- + value + field + + Returns + ------- + Optional[str] + The value, if valid. + """ + if values.get("ngen_cal_config_data_id") is None and value is None: + raise ValueError(f"{field.name} must be set unless ngen_cal config dataset id is provided.") + return value + + @property + def composite_config_source_ids(self) -> List[str]: + """ + A list of the data ids for any datasets that are sources of data for a generated composite config dataset. + + An instance may know dataset ids of existing datasets from which a new composite config dataset should be + derived. For the type, this potentially includes datasets for a realization config, BMI init configs, and + t-route configs. For this subtype, that is extended further to also include ngen-cal config datasets. + + Any such datasets are referenced in certain attributes for the instance; e.g., ::attribute:`bmi_config_data_id`. + This property encapsulates collecting the applicable attribute values while filtering out any that are not set. + + Note that an empty list does not (by itself) imply the composite config dataset is expected to exist, as it is + possible for the dataset to be created from a ::class:`PartialRealizationConfig`, auto-generated BMI init + configs, and the instance's explicit calibration configuration attributes. + + Returns + ------- + List[str] + List of the data ids for any datasets that are sources of data for a generated composite config dataset. + """ + result = super().composite_config_source_ids + + if self.ngen_cal_config_data_id is not None: + result.append(self.ngen_cal_config_data_id) + + return result From 4992b6d6650b6395fd6305a384e0e3bd8dd6a733 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 14:31:56 -0400 Subject: [PATCH 16/25] Update ngen-cal request for composite cfg support. --- .../ngen/ngen_calibration_request.py | 49 +------------------ 1 file changed, 1 insertion(+), 48 deletions(-) diff --git a/python/lib/communication/dmod/communication/maas_request/ngen/ngen_calibration_request.py b/python/lib/communication/dmod/communication/maas_request/ngen/ngen_calibration_request.py index 9337d04c3..07c982eab 100644 --- a/python/lib/communication/dmod/communication/maas_request/ngen/ngen_calibration_request.py +++ b/python/lib/communication/dmod/communication/maas_request/ngen/ngen_calibration_request.py @@ -1,6 +1,4 @@ -from pydantic import PrivateAttr -from dmod.core.meta_data import DataCategory, DataDomain, DataFormat, DataRequirement, DiscreteRestriction -from typing import ClassVar, List, Literal +from typing import ClassVar, Literal from datetime import datetime from ...message import MessageEventType from ...maas_request import ModelExecRequestResponse @@ -20,10 +18,6 @@ class NgenCalibrationRequest(ExternalAbstractNgenRequest): job_type: Literal["ngen-cal"] = "ngen-cal" """ The name for the job type being requested. """ - job_name: str - - _calibration_config_data_requirement = PrivateAttr(None) - @classmethod def factory_init_correct_response_subtype(cls, json_obj: dict) -> 'NgenCalibrationResponse': """ @@ -40,47 +34,6 @@ def factory_init_correct_response_subtype(cls, json_obj: dict) -> 'NgenCalibrati """ return NgenCalibrationResponse.factory_init_from_deserialized_json(json_obj=json_obj) - @property - def calibration_config_data_requirement(self) -> DataRequirement: - """ - A requirement object defining of the ngen-cal configuration needed to execute this request. - - Returns - ------- - DataRequirement - A requirement object defining of the ngen-cal configuration needed to execute this request. - """ - # TODO: #needs_issue - Should all these individual requirement properties (here and elsewhere) be done away with? - if self._calibration_config_data_requirement is None: - if self.request_body.ngen_cal_config_data_id is None: - self.request_body.ngen_cal_config_data_id = "{}-{}".format(self.job_type, self.job_name) - ngen_cal_restrict = [ - DiscreteRestriction( - variable="data_id", values=[self.request_body.ngen_cal_config_data_id] - ) - ] - ngen_cal_config_domain = DataDomain( - data_format=DataFormat.NGEN_CAL_CONFIG, - discrete_restrictions=ngen_cal_restrict, - ) - self._calibration_config_data_requirement = DataRequirement( - domain=ngen_cal_config_domain, is_input=True, category=DataCategory.CONFIG - ) - return self._calibration_config_data_requirement - - @property - def data_requirements(self) -> List[DataRequirement]: - """ - List of all the explicit and implied data requirements for this request, as needed for creating a job object. - - Returns - ------- - List[DataRequirement] - List of all the explicit and implied data requirements for this request. - """ - data_requirements = super().data_requirements - return [self.calibration_config_data_requirement, *data_requirements] - @property def evaluation_start(self) -> datetime: return self.request_body.time_range.begin From efca42520db66be853272ab634eb166560990c26 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 14:33:23 -0400 Subject: [PATCH 17/25] Bump version of dmod.core to 0.10.0. --- python/lib/core/dmod/core/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/core/dmod/core/_version.py b/python/lib/core/dmod/core/_version.py index 1658609d0..fcca93be3 100644 --- a/python/lib/core/dmod/core/_version.py +++ b/python/lib/core/dmod/core/_version.py @@ -1 +1 @@ -__version__ = '0.9.0' \ No newline at end of file +__version__ = '0.10.0' \ No newline at end of file From f7998aeb4ac32543d0be4c0e737365eb45bb81d6 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 14:33:55 -0400 Subject: [PATCH 18/25] Bump dmod.communication dep on dmod.core. --- python/lib/communication/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/communication/setup.py b/python/lib/communication/setup.py index c5da0bf80..ba844b543 100644 --- a/python/lib/communication/setup.py +++ b/python/lib/communication/setup.py @@ -21,7 +21,7 @@ url='', license='', include_package_data=True, - install_requires=['dmod-core>=0.4.2', 'websockets>=8.1', 'jsonschema', 'redis', 'pydantic>=1.10.8,~=1.10', + install_requires=['dmod-core>=0.10.0', 'websockets>=8.1', 'jsonschema', 'redis', 'pydantic>=1.10.8,~=1.10', 'ngen-config@git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_conf'], packages=find_namespace_packages(include=['dmod.*'], exclude=['dmod.test']) ) From fc6f5b974736c13304f134f5a732168dfa1b22fd Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 14:35:28 -0400 Subject: [PATCH 19/25] Add unit tests for NgenCalRequestEstimationConfig. Add new unit test class for this type and some initial tests. --- ...test_ngen_cal_request_estimation_config.py | 200 ++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 python/lib/communication/dmod/test/test_ngen_cal_request_estimation_config.py diff --git a/python/lib/communication/dmod/test/test_ngen_cal_request_estimation_config.py b/python/lib/communication/dmod/test/test_ngen_cal_request_estimation_config.py new file mode 100644 index 000000000..fe1b02a08 --- /dev/null +++ b/python/lib/communication/dmod/test/test_ngen_cal_request_estimation_config.py @@ -0,0 +1,200 @@ +import unittest +from ..communication.maas_request.ngen.ngen_cal_request_estimation_config import NgenCalRequestEstimationConfig +from dmod.core.meta_data import TimeRange + + +class TestNgenCalRequestEstimationConfig(unittest.TestCase): + + def setUp(self) -> None: + self.config_strings = dict() + self.config_jsons = dict() + self.config_objs = dict() + self.time_ranges = dict() + self.calibration_config_ds = dict() + + # Example 0 + ex_idx = 0 + + time_range = TimeRange.parse_from_string("2022-01-01 00:00:00 to 2022-03-01 00:00:00") + iterations = 100 + obj_func = 'nnse' + model_strategy = 'estimation' + cpu_count = 4 + + self.time_ranges[ex_idx] = time_range + self.config_strings[ex_idx] = ( + '{' + ' "bmi_config_data_id": "02468", ' + ' "hydrofabric_data_id": "9876543210", ' + ' "hydrofabric_uid": "0123456789", ' + ' "iterations":' + str(iterations) + ', ' + ' "model_params": {}, ' + ' "model_strategy":' + model_strategy + ',' + ' "objective_function":' + obj_func + ', ' + ' "partition_config_data_id": "part1234", ' + ' "realization_config_data_id": "02468", ' + ' "time_range": ' + time_range.to_json() + + '}' + ) + self.config_jsons[ex_idx] = { + 'time_range': time_range.to_dict(), + 'hydrofabric_data_id': '9876543210', + 'hydrofabric_uid': '0123456789', + 'bmi_config_data_id': '02468', + 'realization_config_data_id': '02468', + 'partition_config_data_id': 'part1234', + 'objective_function': obj_func, + 'iterations': iterations, + 'model_strategy': model_strategy, + 'model_params': {} + } + self.config_objs[ex_idx] = NgenCalRequestEstimationConfig( + time_range=time_range, + hydrofabric_data_id='9876543210', + hydrofabric_uid="0123456789", + bmi_config_data_id='02468', + partition_cfg_data_id='part1234', + realization_config_data_id='02468', + objective_function=obj_func, + iterations=iterations, + model_strategy=model_strategy, + model_params={} + ) + + # Example 1 - example without explicit calibration params + ex_idx = 1 + + config_ds_id = "config_ds_1" + self.calibration_config_ds[1] = config_ds_id + self.time_ranges[ex_idx] = time_range + self.config_strings[ex_idx] = ( + '{' + ' "bmi_config_data_id": "02468", ' + ' "hydrofabric_data_id": "9876543210", ' + ' "hydrofabric_uid": "0123456789", ' + ' "ngen_cal_config_data_id":' + config_ds_id + ',' + ' "partition_config_data_id": "part1234", ' + ' "realization_config_data_id": "02468", ' + ' "time_range": ' + time_range.to_json() + + '}' + ) + self.config_jsons[ex_idx] = { + 'time_range': time_range.to_dict(), + 'hydrofabric_data_id': '9876543210', + 'hydrofabric_uid': '0123456789', + 'bmi_config_data_id': '02468', + 'realization_config_data_id': '02468', + 'partition_config_data_id': 'part1234', + 'ngen_cal_config_data_id': config_ds_id + } + self.config_objs[ex_idx] = NgenCalRequestEstimationConfig( + time_range=time_range, + hydrofabric_data_id='9876543210', + hydrofabric_uid="0123456789", + bmi_config_data_id='02468', + partition_cfg_data_id='part1234', + realization_config_data_id='02468', + ngen_cal_config_data_id=config_ds_id + ) + + # Example 2 (JSON only) - like Example 0, but leaving out objective_function so that init should fail + ex_idx = 2 + + self.config_jsons[ex_idx] = { + 'time_range': time_range.to_dict(), + 'hydrofabric_data_id': '9876543210', + 'hydrofabric_uid': '0123456789', + 'bmi_config_data_id': '02468', + 'realization_config_data_id': '02468', + 'partition_config_data_id': 'part1234', + #'objective_function': obj_func, + 'iterations': iterations, + 'model_strategy': model_strategy, + 'model_params': {} + } + + # Example 3 (JSON only) - like Example 2, but leaving out iterations instead + ex_idx = 3 + + self.config_jsons[ex_idx] = { + 'time_range': time_range.to_dict(), + 'hydrofabric_data_id': '9876543210', + 'hydrofabric_uid': '0123456789', + 'bmi_config_data_id': '02468', + 'realization_config_data_id': '02468', + 'partition_config_data_id': 'part1234', + 'objective_function': obj_func, + #'iterations': iterations, + 'model_strategy': model_strategy, + 'model_params': {} + } + + # Example 4 (JSON only) - like Example 2, but leaving out model_strategy instead + ex_idx = 4 + + self.config_jsons[ex_idx] = { + 'time_range': time_range.to_dict(), + 'hydrofabric_data_id': '9876543210', + 'hydrofabric_uid': '0123456789', + 'bmi_config_data_id': '02468', + 'realization_config_data_id': '02468', + 'partition_config_data_id': 'part1234', + 'objective_function': obj_func, + 'iterations': iterations, + #'model_strategy': model_strategy, + 'model_params': {} + } + + def test_composite_config_source_ids_1_a(self): + """ + Test that example 1 has the calibration dataset id included in the source ids. + """ + ex_idx = 1 + + config_obj = self.config_objs[ex_idx] + expected_ds_id = self.calibration_config_ds[ex_idx] + source_ids = config_obj.composite_config_source_ids + + self.assertTrue(expected_ds_id in source_ids) + + def test_composite_config_source_ids_1_b(self): + """ + Test that example 1 looks like example 0, except for having the calibration config dataset id. + """ + ex_idx = 1 + + config_obj = self.config_objs[ex_idx] + expected_ds_id = self.calibration_config_ds[ex_idx] + source_ids = config_obj.composite_config_source_ids + + other_source_ids = self.config_objs[0].composite_config_source_ids + other_source_ids.sort() + + source_ids.remove(expected_ds_id) + source_ids.sort() + + self.assertEqual(source_ids, other_source_ids) + + def test_init_2_a(self): + """ + Test that example 2 fails to init due to no ``objective_function`` param. + """ + ex_idx = 2 + json_val = self.config_jsons[ex_idx] + self.assertRaises(ValueError, NgenCalRequestEstimationConfig, **json_val) + + def test_init_3_a(self): + """ + Test that example 2 fails to init due to no ``iterations`` param. + """ + ex_idx = 3 + json_val = self.config_jsons[ex_idx] + self.assertRaises(ValueError, NgenCalRequestEstimationConfig, **json_val) + + def test_init_4_a(self): + """ + Test that example 2 fails to init due to no ``model_strategy`` param. + """ + ex_idx = 4 + json_val = self.config_jsons[ex_idx] + self.assertRaises(ValueError, NgenCalRequestEstimationConfig, **json_val) From d8666820ac9134d2e0eafb42d692992a4e67c02b Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 14:35:42 -0400 Subject: [PATCH 20/25] Add unit tests for NgenCalibrationRequest. Add new unit test class for this type and some initial tests. --- .../test/test_ngen_calibration_request.py | 229 ++++++++++++++++++ 1 file changed, 229 insertions(+) create mode 100644 python/lib/communication/dmod/test/test_ngen_calibration_request.py diff --git a/python/lib/communication/dmod/test/test_ngen_calibration_request.py b/python/lib/communication/dmod/test/test_ngen_calibration_request.py new file mode 100644 index 000000000..fa30583f7 --- /dev/null +++ b/python/lib/communication/dmod/test/test_ngen_calibration_request.py @@ -0,0 +1,229 @@ +import unittest +from ..communication.maas_request import NgenCalibrationRequest +from dmod.core.meta_data import TimeRange + + +class TestNgenCalibrationRequest(unittest.TestCase): + + def setUp(self) -> None: + self.request_cpu_counts = dict() + self.request_strings = dict() + self.request_jsons = dict() + self.request_objs = dict() + self.time_ranges = dict() + self.calibration_config_ds = dict() + + # Example 0 + ex_idx = 0 + + time_range = TimeRange.parse_from_string("2022-01-01 00:00:00 to 2022-03-01 00:00:00") + iterations = 100 + obj_func = 'nnse' + model_strategy = 'estimation' + cpu_count = 4 + + self.request_cpu_counts[ex_idx] = cpu_count + self.time_ranges[ex_idx] = time_range + self.request_strings[ex_idx] = ( + '{' + ' "allocation_paradigm": "SINGLE_NODE", ' + ' "cpu_count": ' + str(cpu_count) + ', ' + ' "job_type": "ngen-cal", ' + ' "request_body": {' + ' "bmi_config_data_id": "02468", ' + ' "hydrofabric_data_id": "9876543210", ' + ' "hydrofabric_uid": "0123456789", ' + ' "iterations":' + str(iterations) + ', ' + ' "model_params": {}, ' + ' "model_strategy":' + model_strategy + ',' + ' "objective_function":' + obj_func + ', ' + ' "partition_config_data_id": "part1234", ' + ' "realization_config_data_id": "02468", ' + ' "time_range": ' + time_range.to_json() + + ' }, ' + ' "session_secret": "f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c"' + '}') + + self.request_jsons[ex_idx] = { + 'allocation_paradigm': 'SINGLE_NODE', + 'cpu_count': cpu_count, + 'job_type': 'ngen-cal', + 'request_body': { + 'time_range': time_range.to_dict(), + 'hydrofabric_data_id': '9876543210', + 'hydrofabric_uid': '0123456789', + 'bmi_config_data_id': '02468', + 'realization_config_data_id': '02468', + 'partition_config_data_id': 'part1234', + 'objective_function': obj_func, + 'iterations': iterations, + 'model_strategy': model_strategy, + 'model_params': {} + }, + 'session_secret': 'f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c' + } + self.request_objs[ex_idx] = NgenCalibrationRequest( + request_body={ + 'time_range': time_range, + 'hydrofabric_data_id': '9876543210', + 'hydrofabric_uid': "0123456789", + 'bmi_config_data_id': '02468', + 'partition_cfg_data_id': 'part1234', + 'realization_config_data_id': '02468', + 'objective_function': obj_func, + 'iterations': iterations, + 'model_strategy': model_strategy, + 'model_params': {} + }, + session_secret='f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c', + cpu_count=cpu_count, + allocation_paradigm='SINGLE_NODE') + + # Example 1 - example without explicit calibration params + ex_idx = 1 + + config_ds_id = "config_ds_1" + self.calibration_config_ds[1] = config_ds_id + self.request_cpu_counts[ex_idx] = cpu_count + self.time_ranges[ex_idx] = time_range + self.request_strings[ex_idx] = ( + '{' + ' "allocation_paradigm": "SINGLE_NODE", ' + ' "cpu_count": ' + str(cpu_count) + ', ' + ' "job_type": "ngen-cal", ' + ' "request_body": {' + ' "bmi_config_data_id": "02468", ' + ' "hydrofabric_data_id": "9876543210", ' + ' "hydrofabric_uid": "0123456789", ' + ' "ngen_cal_config_data_id":' + config_ds_id + ',' + ' "partition_config_data_id": "part1234", ' + ' "realization_config_data_id": "02468", ' + ' "time_range": ' + time_range.to_json() + + ' }, ' + ' "session_secret": "f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c"' + '}') + + self.request_jsons[ex_idx] = { + 'allocation_paradigm': 'SINGLE_NODE', + 'cpu_count': cpu_count, + 'job_type': 'ngen-cal', + 'request_body': { + 'time_range': time_range.to_dict(), + 'hydrofabric_data_id': '9876543210', + 'hydrofabric_uid': '0123456789', + 'bmi_config_data_id': '02468', + 'realization_config_data_id': '02468', + 'partition_config_data_id': 'part1234', + 'ngen_cal_config_data_id': config_ds_id + }, + 'session_secret': 'f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c' + } + self.request_objs[ex_idx] = NgenCalibrationRequest( + request_body={ + 'time_range': time_range, + 'hydrofabric_data_id': '9876543210', + 'hydrofabric_uid': "0123456789", + 'bmi_config_data_id': '02468', + 'partition_cfg_data_id': 'part1234', + 'realization_config_data_id': '02468', + 'ngen_cal_config_data_id': config_ds_id + }, + session_secret='f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c', + cpu_count=cpu_count, + allocation_paradigm='SINGLE_NODE') + + # Example 2 (only JSON) - example that should fail to init due no objective function param being supplied + ex_idx = 2 + + self.request_cpu_counts[ex_idx] = cpu_count + self.time_ranges[ex_idx] = time_range + + self.request_jsons[ex_idx] = { + 'allocation_paradigm': 'SINGLE_NODE', + 'cpu_count': cpu_count, + 'job_type': 'ngen-cal', + 'request_body': { + 'time_range': time_range.to_dict(), + 'hydrofabric_data_id': '9876543210', + 'hydrofabric_uid': '0123456789', + 'bmi_config_data_id': '02468', + 'realization_config_data_id': '02468', + 'partition_config_data_id': 'part1234', + 'iterations': iterations, + 'model_strategy': model_strategy, + 'model_params': {} + }, + 'session_secret': 'f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c' + } + + def test_factory_init_from_deserialized_json_0_a(self): + """ + Test that example 0 can be initialized using factory method. + """ + ex_idx = 0 + + json_val = self.request_jsons[ex_idx] + ex_obj = self.request_objs[ex_idx] + + init_obj = NgenCalibrationRequest.factory_init_from_deserialized_json(json_val) + self.assertEqual(ex_obj, init_obj) + + def test_factory_init_from_deserialized_json_1_a(self): + """ + Test that example 1 can be initialized using factory method. + """ + ex_idx = 1 + + json_val = self.request_jsons[ex_idx] + ex_obj = self.request_objs[ex_idx] + + init_obj = NgenCalibrationRequest.factory_init_from_deserialized_json(json_val) + self.assertEqual(ex_obj, init_obj) + + def test_init_0_a(self): + ex_idx = 0 + request_obj = self.request_objs[ex_idx] + self.assertIsNotNone(request_obj) + + def test_init_0_b(self): + """ + Test that example 0 can be initialized from JSON. + """ + ex_idx = 0 + + json_val = self.request_jsons[ex_idx] + ex_obj = self.request_objs[ex_idx] + + init_obj = NgenCalibrationRequest(**json_val) + self.assertEqual(ex_obj, init_obj) + + def test_init_1_a(self): + ex_idx = 1 + request_obj = self.request_objs[ex_idx] + self.assertIsNotNone(request_obj) + + def test_init_1_b(self): + """ + Test that example 1 can be initialized from JSON. + """ + ex_idx = 1 + + json_val = self.request_jsons[ex_idx] + ex_obj = self.request_objs[ex_idx] + + init_obj = NgenCalibrationRequest(**json_val) + self.assertEqual(ex_obj, init_obj) + + def test_init_2_a(self): + """ + Test that example 2 fails to init due to no ``objective_function`` param. + """ + ex_idx = 2 + + json_val = self.request_jsons[ex_idx] + try: + obj = NgenCalibrationRequest(**json_val) + except Exception as e: + r = 1 + + self.assertRaises(ValueError, NgenCalibrationRequest, **json_val) From df873c4dffc8226ca65c7092eb10b54d2fd85c17 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 14:37:36 -0400 Subject: [PATCH 21/25] Update test_ngen_request.py for changes to type. Updating examples for new composite_config_data_id attribute of NGENRequest. --- .../dmod/test/test_ngen_request.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/python/lib/communication/dmod/test/test_ngen_request.py b/python/lib/communication/dmod/test/test_ngen_request.py index a8c20562a..77c6c05a7 100644 --- a/python/lib/communication/dmod/test/test_ngen_request.py +++ b/python/lib/communication/dmod/test/test_ngen_request.py @@ -32,9 +32,9 @@ def create_time_range(begin, end, var=None) -> TimeRange: self.request_strings.append( '{"allocation_paradigm": "SINGLE_NODE", "cpu_count": ' + str(cpu_count_ex_0) + ', "job_type": "ngen", ' '"request_body": ' - '{"bmi_config_data_id": "02468", "hydrofabric_data_id": "9876543210", "hydrofabric_uid": "0123456789", ' - '"partition_config_data_id": "part1234", "realization_config_data_id": "02468", ' - '"time_range": ' + time_range.to_json() + '}, ' + '{"bmi_config_data_id": "02468", "composite_config_data_id": "composite02468", "hydrofabric_data_id": ' + '"9876543210", "hydrofabric_uid": "0123456789", "partition_config_data_id": "part1234", ' + '"realization_config_data_id": "02468", "time_range": ' + time_range.to_json() + '}, ' '"session_secret": "f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c"}') self.request_jsons.append({ 'allocation_paradigm': 'SINGLE_NODE', @@ -45,6 +45,7 @@ def create_time_range(begin, end, var=None) -> TimeRange: 'hydrofabric_data_id': '9876543210', 'hydrofabric_uid': '0123456789', 'bmi_config_data_id': '02468', + 'composite_config_data_id': 'composite02468', 'realization_config_data_id': '02468', 'partition_config_data_id': 'part1234' }, @@ -56,6 +57,7 @@ def create_time_range(begin, end, var=None) -> TimeRange: 'hydrofabric_data_id': '9876543210', 'hydrofabric_uid': "0123456789", 'bmi_config_data_id': '02468', + 'composite_config_data_id': 'composite02468', 'partition_cfg_data_id': 'part1234', 'realization_config_data_id': '02468'}, session_secret='f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c', @@ -74,7 +76,8 @@ def create_time_range(begin, end, var=None) -> TimeRange: self.request_strings.append( '{"allocation_paradigm": "ROUND_ROBIN", "cpu_count": ' + str(cpu_count_ex_1) + ', "job_type": "ngen", ' '"request_body": ' - '{"bmi_config_data_id": "02468", "catchments": ' + cat_ids_str + ', "hydrofabric_data_id": "9876543210", ' + '{"bmi_config_data_id": "02468", "catchments": ' + cat_ids_str + ', ' + '"composite_config_data_id": "composite02468", "hydrofabric_data_id": "9876543210", ' '"hydrofabric_uid": "0123456789", "partition_config_data_id": "part1234", ' '"realization_config_data_id": "02468", "time_range": ' + time_range.to_json() + '}, ' '"session_secret": "f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c"}') @@ -86,6 +89,7 @@ def create_time_range(begin, end, var=None) -> TimeRange: 'time_range': time_range.to_dict(), 'hydrofabric_data_id': '9876543210', 'hydrofabric_uid': '0123456789', + 'composite_config_data_id': 'composite02468', 'realization_config_data_id': '02468', 'bmi_config_data_id': '02468', 'catchments': cat_ids_list, @@ -104,6 +108,7 @@ def create_time_range(begin, end, var=None) -> TimeRange: 'hydrofabric_data_id': '9876543210', 'realization_config_data_id': '02468', 'bmi_config_data_id': '02468', + 'composite_config_data_id': 'composite02468', 'catchments': cat_ids_list, 'partition_cfg_data_id': 'part1234'})) @@ -114,7 +119,7 @@ def create_time_range(begin, end, var=None) -> TimeRange: self.time_ranges.append(time_range) self.request_strings.append( '{"allocation_paradigm": "SINGLE_NODE", "cpu_count": ' + str(cpu_count_ex_2) + ', "job_type": "ngen", ' - '"request_body": {"bmi_config_data_id": "02468", ' + '"request_body": {"bmi_config_data_id": "02468", "composite_config_data_id": "composite02468",' '"hydrofabric_data_id": "9876543210", ' '"hydrofabric_uid": "0123456789", "realization_config_data_id": "02468", "time_range": ' + time_range.to_json() + '}, ' '"session_secret": "f21f27ac3d443c0948aab924bddefc64891c455a756ca77a4d86ec2f697cd13c"}' @@ -127,6 +132,7 @@ def create_time_range(begin, end, var=None) -> TimeRange: 'time_range': time_range.to_dict(), 'hydrofabric_data_id': '9876543210', 'hydrofabric_uid': '0123456789', + 'composite_config_data_id': 'composite02468', 'bmi_config_data_id': '02468', 'realization_config_data_id': '02468' }, @@ -141,6 +147,7 @@ def create_time_range(begin, end, var=None) -> TimeRange: 'time_range': time_range, 'hydrofabric_uid': "0123456789", 'hydrofabric_data_id': '9876543210', + 'composite_config_data_id': 'composite02468', 'bmi_config_data_id': '02468', 'realization_config_data_id': '02468'})) From ef30d0e03cfcb1bec7bf27016f0b8c400378284d Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 14:56:14 -0400 Subject: [PATCH 22/25] Update dmod.scheduler tests that depend on comms. --- python/lib/scheduler/dmod/test/test_job.py | 1 + python/lib/scheduler/dmod/test/test_scheduler.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/python/lib/scheduler/dmod/test/test_job.py b/python/lib/scheduler/dmod/test/test_job.py index a731a641c..698f01507 100644 --- a/python/lib/scheduler/dmod/test/test_job.py +++ b/python/lib/scheduler/dmod/test/test_job.py @@ -87,6 +87,7 @@ def create_time_range(begin, end, var=None) -> TimeRange: "job_type": "ngen", 'request_body': { 'bmi_config_data_id': '02468', + 'composite_config_data_id': 'composite02468', 'hydrofabric_data_id': '9876543210', 'hydrofabric_uid': '0123456789', 'realization_config_data_id': '02468', diff --git a/python/lib/scheduler/dmod/test/test_scheduler.py b/python/lib/scheduler/dmod/test/test_scheduler.py index ffb7f2737..28dbbfbbb 100644 --- a/python/lib/scheduler/dmod/test/test_scheduler.py +++ b/python/lib/scheduler/dmod/test/test_scheduler.py @@ -42,6 +42,7 @@ def setUp(self) -> None: "job_type": "ngen", 'request_body': { 'bmi_config_data_id': '02468', + 'composite_config_data_id': 'composite02468', 'hydrofabric_data_id': '9876543210', 'hydrofabric_uid': '0123456789', 'realization_config_data_id': '02468', @@ -77,6 +78,7 @@ def setUp(self) -> None: "job_type": "ngen", 'request_body': { 'bmi_config_data_id': '02468', + 'composite_config_data_id': 'composite02468', 'hydrofabric_data_id': '9876543210', 'hydrofabric_uid': '0123456789', 'realization_config_data_id': '02468', From 400dc918786954deaf31f1cff729c12257f2f3d0 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 14:56:54 -0400 Subject: [PATCH 23/25] Bump dmod.communication version to 0.14.0. --- python/lib/communication/dmod/communication/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/communication/dmod/communication/_version.py b/python/lib/communication/dmod/communication/_version.py index 2d7893e3d..ef9199407 100644 --- a/python/lib/communication/dmod/communication/_version.py +++ b/python/lib/communication/dmod/communication/_version.py @@ -1 +1 @@ -__version__ = '0.13.0' +__version__ = '0.14.0' From 11e811431cfcf01b69d412af9e2eb9434f1783e5 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 14:57:43 -0400 Subject: [PATCH 24/25] Update core and comms deps for dataservice. Updating to latest versions of dmod.core and dmod.communication. --- python/services/dataservice/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/services/dataservice/setup.py b/python/services/dataservice/setup.py index fbda1a239..8f0daa4b0 100644 --- a/python/services/dataservice/setup.py +++ b/python/services/dataservice/setup.py @@ -17,7 +17,7 @@ author_email='', url='', license='', - install_requires=['dmod-core>=0.8.0', 'dmod-communication>=0.13.0', 'dmod-scheduler>=0.10.0', + install_requires=['dmod-core>=0.10.0', 'dmod-communication>=0.14.0', 'dmod-scheduler>=0.10.0', 'dmod-modeldata>=0.9.0', 'redis', "pydantic>=1.10.8,~=1.10", "fastapi", "uvicorn[standard]", 'ngen-config@git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_conf', 'ngen-cal@git+https://github.com/noaa-owp/ngen-cal@master#egg=ngen-config&subdirectory=python/ngen_cal'], From 92a127afc26f1c33a5263319ec3b55d068e90087 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Mon, 17 Jul 2023 14:58:27 -0400 Subject: [PATCH 25/25] Bump dmod.dataservice version to 0.8.0. --- python/services/dataservice/dmod/dataservice/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/services/dataservice/dmod/dataservice/_version.py b/python/services/dataservice/dmod/dataservice/_version.py index 19442947c..ccf9e6286 100644 --- a/python/services/dataservice/dmod/dataservice/_version.py +++ b/python/services/dataservice/dmod/dataservice/_version.py @@ -1 +1 @@ -__version__ = '0.7.0' \ No newline at end of file +__version__ = '0.8.0' \ No newline at end of file