diff --git a/src/troute-network/troute/AbstractNetwork.py b/src/troute-network/troute/AbstractNetwork.py index 5bbdffa4b..830be578e 100644 --- a/src/troute-network/troute/AbstractNetwork.py +++ b/src/troute-network/troute/AbstractNetwork.py @@ -27,6 +27,7 @@ class AbstractNetwork(ABC): __slots__ = ["_dataframe", "_waterbody_connections", "_gages", "_terminal_codes", "_connections", "_waterbody_df", "_waterbody_types_df", "_waterbody_type_specified", "_link_gage_df", + "_canadian_gage_link_df", "_independent_networks", "_reaches_by_tw", "_flowpath_dict", "_reverse_network", "_q0", "_t0", "_link_lake_crosswalk", "_usgs_lake_gage_crosswalk", "_usace_lake_gage_crosswalk", "_rfc_lake_gage_crosswalk", @@ -34,7 +35,8 @@ class AbstractNetwork(ABC): "supernetwork_parameters", "waterbody_parameters","data_assimilation_parameters", "restart_parameters", "compute_parameters", "forcing_parameters", "hybrid_parameters", "preprocessing_parameters", "output_parameters", - "verbose", "showtiming", "break_points", "_routing", "_nexus_dict", "_poi_nex_dict"] + "verbose", "showtiming", "break_points", "_routing", "_gl_climatology_df", "_nexus_dict", "_poi_nex_dict"] + def __init__(self, from_files=True, value_dict={}): @@ -410,6 +412,14 @@ def refactored_reaches(self): @property def unrefactored_topobathy_df(self): return self._routing.unrefactored_topobathy_df + + @property + def great_lakes_climatology_df(self): + return self._gl_climatology_df + + @property + def canadian_gage_df(self): + return self._canadian_gage_link_df def set_synthetic_wb_segments(self, synthetic_wb_segments, synthetic_wb_id_offset): diff --git a/src/troute-network/troute/DataAssimilation.py b/src/troute-network/troute/DataAssimilation.py index 7dd89a8bd..0c7971616 100644 --- a/src/troute-network/troute/DataAssimilation.py +++ b/src/troute-network/troute/DataAssimilation.py @@ -35,7 +35,7 @@ class AbstractDA(ABC): "_reservoir_usgs_df", "_reservoir_usgs_param_df", "_reservoir_usace_df", "_reservoir_usace_param_df", "_reservoir_rfc_df", "_reservoir_rfc_synthetic", - "_reservoir_rfc_param_df", + "_reservoir_rfc_param_df", "_great_lakes_df", "_great_lakes_param_df", "_dateNull", "_datesSecondsArray_usgs", "_nDates_usgs", "_stationArray_usgs", "_stationStringLengthArray_usgs", "_nStations_usgs", @@ -213,7 +213,7 @@ def __init__(self, network, from_files, value_dict, da_run=[]): self._last_obs_df = _reindex_link_to_lake_id(self._last_obs_df, network.link_lake_crosswalk) self._usgs_df = _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run) - if 'canada_timeslice_files' in da_run: + if ('canada_timeslice_files' in da_run) & (not network.canadian_gage_df.empty): self._canada_df = _create_canada_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run) self._canada_is_created = True LOG.debug("NudgingDA class is completed in %s seconds." % (time.time() - main_start_time)) @@ -272,7 +272,7 @@ def update_for_next_loop(self, network, da_run,): if streamflow_da_parameters.get('streamflow_nudging', False): self._usgs_df = _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run) - if 'canada_timeslice_files' in da_run: + if ('canada_timeslice_files' in da_run) & (not network.canadian_gage_df.empty): self._canada_df = _create_canada_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run) else: self._canada_df = pd.DataFrame() @@ -543,6 +543,7 @@ def __init__(self, network, from_files, value_dict, da_run=[]): if not self._usgs_df.empty: self._usgs_df = self._usgs_df.loc[:,network.t0:] LOG.debug("PersistenceDA class is completed in %s seconds." % (time.time() - PersistenceDA_start_time)) + def update_after_compute(self, run_results,): ''' Function to update data assimilation object after running routing module. @@ -592,12 +593,12 @@ def update_for_next_loop(self, network, da_run,): run_parameters = self._run_parameters # update usgs_df if it is not empty - streamflow_da_parameters = data_assimilation_parameters.get('streamflow_da', None) - reservoir_da_parameters = data_assimilation_parameters.get('reservoir_da', None) + streamflow_da_parameters = data_assimilation_parameters.get('streamflow_da', {}) + reservoir_da_parameters = data_assimilation_parameters.get('reservoir_da', {}) - if not self._usgs_df.empty: - - if reservoir_da_parameters.get('reservoir_persistence_da').get('reservoir_persistence_usgs', False): + if not self.usgs_df.empty: + + if reservoir_da_parameters.get('reservoir_persistence_da',{}).get('reservoir_persistence_usgs', False): gage_lake_df = ( network.usgs_lake_gage_crosswalk. @@ -630,17 +631,16 @@ def update_for_next_loop(self, network, da_run,): # subset and re-index `usgs_df`, using the segID <> lakeID crosswalk self._reservoir_usgs_df = ( - usgs_df_15min.join(link_lake_df, how = 'inner'). - reset_index(). - set_index('usgs_lake_id'). - drop(['link'], axis = 1) - ) - + usgs_df_15min.join(link_lake_df, how = 'inner'). + reset_index(drop=True). + set_index('usgs_lake_id') + ) + # replace link ids with lake ids, for gages at waterbody outlets, # otherwise, gage data will not be assimilated at waterbody outlet # segments. if network.link_lake_crosswalk: - self._usgs_df = _reindex_link_to_lake_id(self._usgs_df, network.link_lake_crosswalk) + self._usgs_df = _reindex_link_to_lake_id(self.usgs_df, network.link_lake_crosswalk) elif reservoir_da_parameters.get('reservoir_persistence_usgs', False): ( @@ -655,6 +655,12 @@ def update_for_next_loop(self, network, da_run,): da_run, lake_gage_crosswalk = network.usgs_lake_gage_crosswalk, res_source = 'usgs') + + # replace link ids with lake ids, for gages at waterbody outlets, + # otherwise, gage data will not be assimilated at waterbody outlet + # segments. + if network.link_lake_crosswalk: + usgs_df = _reindex_link_to_lake_id(usgs_df, network.link_lake_crosswalk) # USACE if reservoir_da_parameters.get('reservoir_persistence_da').get('reservoir_persistence_usace', False): @@ -717,62 +723,114 @@ def __init__(self, network, from_files, value_dict, da_run): run_parameters = self._run_parameters reservoir_persistence_da = data_assimilation_parameters.get('reservoir_da', {}).get('reservoir_persistence_da', {}) + self._great_lakes_df = pd.DataFrame() + self._great_lakes_param_df = pd.DataFrame() + if reservoir_persistence_da: greatLake = reservoir_persistence_da.get('reservoir_persistence_greatLake', False) if greatLake: - streamflow_da_parameters = data_assimilation_parameters.get('streamflow_da', {}) + GL_crosswalk_df = pd.DataFrame( + { + 'link': [4800002,4800004,4800006], + 'gages': ['04127885','04159130','02HA013'] + } + ).set_index('link') - if not self._canada_is_created and ('canada_timeslice_files' in da_run): - self._canada_df = _create_canada_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run) - self._canada_is_created = True - - if 'LakeOntario_outflow' in da_run: - self._lake_ontario_df = _create_LakeOntario_df(run_parameters, network, da_run) - else: - self._lake_ontario_df = pd.DataFrame() - - lake_ontario_df = self._lake_ontario_df - canada_df = self._canada_df - - ids_to_check = {'4800002': '04127885', '4800004': '13196034'} - # the segment corresponding to 04127885 gages isn't exist as of now. Should be replaced in future - # Initialize an empty DataFrame with the same columns as the usgs DataFrame - if self._usgs_df.empty: - self._usgs_df = _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run) - - usgs_df_GL = pd.DataFrame(columns=self._usgs_df.columns) - - # Check if any ids are present in the index - for key, value in ids_to_check.items(): - if value in self._usgs_df.index: - temp_df = (self._usgs_df.loc[[value]] - .transpose() - .resample('15min') - .asfreq() - .transpose()) - temp_df.index = [key] - else: - temp_df = pd.DataFrame(index=[key], columns=self._usgs_df.columns) - - usgs_df_GL = pd.concat([usgs_df_GL, temp_df], axis=0) + self._great_lakes_df, self._great_lakes_param_df = _create_GL_dfs( + GL_crosswalk_df, + data_assimilation_parameters, + run_parameters, + da_run, + network.t0, + ) - if not lake_ontario_df.empty: - lake_ontario_df = lake_ontario_df.T.reset_index().drop('index', axis = 1) - else: - lake_ontario_df = pd.DataFrame(columns=self._usgs_df.columns) - lake_ontario_df['link'] = 4800007 - lake_ontario_df.set_index('link', inplace=True) + LOG.debug("great_lake class is completed in %s seconds." % (time.time() - great_lake_start_time)) + + def update_after_compute(self, run_results, time_increment): + ''' + Function to update data assimilation object after running routing module. + + Arguments: + ---------- + - run_results (list): output from the compute kernel sequence, + organized (because that is how it comes + out of the kernel) by network. + For each item in the result, there are + 10 elements, the 9th of which are lists of + four elements containing: + 1) a list of the segments ids where data + assimilation was performed (if any) in that network; + 2) a list of the previously persisted outflow; + 3) a list of the previously assimilated observation times; + 4) a list of the update time. + + Returns: + -------- + - data_assimilation (Object): Object containing all data assimilation information + - _great_lakes_param_df (DataFrame): Great Lakes reservoir DA parameters + ''' + # get reservoir DA initial parameters for next loop iteration + great_lakes_param_df = pd.DataFrame() + tmp_list = [] + for r in run_results: - if canada_df.empty: - canada_df = pd.DataFrame(columns=self._usgs_df.columns, index=pd.Index([4800006], name='link')) + if len(r[9][0]) > 0: + tmp_df = pd.DataFrame(data = r[9][0], columns = ['lake_id']) + tmp_df['previous_assimilated_outflows'] = r[9][1] + tmp_df['previous_assimilated_time'] = r[9][2] + tmp_df['update_time'] = r[9][3] + tmp_list.append(tmp_df) + + if tmp_list: + great_lakes_param_df = pd.concat(tmp_list) + great_lakes_param_df['previous_assimilated_time'] = great_lakes_param_df['previous_assimilated_time'] - time_increment + great_lakes_param_df['update_time'] = great_lakes_param_df['update_time'] - time_increment + + self._great_lakes_param_df = great_lakes_param_df + + def update_for_next_loop(self, network, da_run,): + ''' + Function to update data assimilation object for the next loop iteration. + + Arguments: + ---------- + - network (Object): network object created from abstract class + - da_run (list): list of data assimilation files separated + by for loop chunks + + Returns: + -------- + - data_assimilation (Object): Object containing all data assimilation information + - reservoir_usgs_df (DataFrame): USGS reservoir observations + - reservoir_usace_df (DataFrame): USACE reservoir observations + ''' + greatLake = False + data_assimilation_parameters = self._data_assimilation_parameters + run_parameters = self._run_parameters + reservoir_persistence_da = data_assimilation_parameters.get('reservoir_da', {}).get('reservoir_persistence_da', {}) - # List of DataFrames - dfs = [lake_ontario_df, canada_df, usgs_df_GL] + if reservoir_persistence_da: + greatLake = reservoir_persistence_da.get('reservoir_persistence_greatLake', False) + + if greatLake: + + GL_crosswalk_df = pd.DataFrame( + { + 'link': [4800002,4800004,4800006], + 'gages': ['04127885','04159130','02HA013'] + } + ).set_index('link') + + self._great_lakes_df, _ = _create_GL_dfs( + GL_crosswalk_df, + data_assimilation_parameters, + run_parameters, + da_run, + network.t0, + ) - self.great_lake_all = pd.concat(dfs, axis=0, join='outer', ignore_index=False) - LOG.debug("great_lake class is completed in %s seconds." % (time.time() - great_lake_start_time)) class RFCDA(AbstractDA): """ @@ -877,6 +935,7 @@ def __init__(self, network, from_files, value_dict): self._reservoir_rfc_df = pd.DataFrame() self._reservoir_rfc_param_df = pd.DataFrame() LOG.debug("RFCDA class is completed in %s seconds." % (time.time() - RFCDA_start_time)) + def update_after_compute(self, run_results): ''' Function to update data assimilation object after running routing module. @@ -939,6 +998,7 @@ def update_after_compute(self, run_results, time_increment): NudgingDA.update_after_compute(self, run_results, time_increment) PersistenceDA.update_after_compute(self, run_results) RFCDA.update_after_compute(self, run_results) + great_lake.update_after_compute(self, run_results, time_increment) def update_for_next_loop(self, network, da_run,): ''' @@ -947,6 +1007,7 @@ def update_for_next_loop(self, network, da_run,): NudgingDA.update_for_next_loop(self, network, da_run) PersistenceDA.update_for_next_loop(self, network, da_run) RFCDA.update_for_next_loop(self) + great_lake.update_for_next_loop(self, network, da_run) @property @@ -984,6 +1045,14 @@ def reservoir_rfc_df(self): @property def reservoir_rfc_param_df(self): return self._reservoir_rfc_param_df + + @property + def great_lakes_df(self): + return self._great_lakes_df + + @property + def great_lakes_param_df(self): + return self._great_lakes_param_df # -------------------------------------------------------------- @@ -1083,19 +1152,13 @@ def _create_usgs_df(data_assimilation_parameters, streamflow_da_parameters, run_ LOG.debug("Reading and preprocessing usgs timeslice files is completed in %s seconds." % (time.time() - usgs_df_start_time)) return usgs_df -def _create_LakeOntario_df(run_parameters, network, da_run): +def _create_LakeOntario_df(run_parameters, t0, da_run): LOG.info("Creating Lake Ontario dataframe is started.") LakeOntario_df_start_time = time.time() - t0 = network.t0 + start_time = t0 - pd.Timedelta(weeks = 10) nts = run_parameters.get('nts') dt = run_parameters.get('dt') - end_time = pd.to_datetime(t0) + pd.Timedelta(hours = nts/(3600/dt)) - time_total = [] - t = t0 - while t < end_time: - time_total.append(t) - t += pd.Timedelta(minutes=15) - + end_time = t0 + pd.Timedelta(hours = nts/(3600/dt)) lake_ontario_df = pd.read_csv(da_run.get('LakeOntario_outflow')) @@ -1113,17 +1176,22 @@ def _create_LakeOntario_df(run_parameters, network, da_run): lake_ontario_df = lake_ontario_df.drop_duplicates() lake_ontario_df = lake_ontario_df.drop(['Date', 'Hour'], axis=1) - # Filter DataFrame based on extracted times - time_total_df = pd.DataFrame(time_total, columns=['Datetime']) - time_total_df['Outflow(m3/s)'] = None # Initialize with None or NaN - time_total_df = time_total_df.set_index('Datetime') + # Rename outflow column to discharge + lake_ontario_df = lake_ontario_df.rename(columns={'Outflow(m3/s)': 'Discharge'}) + # Filter for needed time stamps + lake_ontario_df = lake_ontario_df[(lake_ontario_df.index>=start_time) & (lake_ontario_df.index<=end_time)] + + # Add 'link' column with waterbody ID + lake_ontario_df['link'] = 4800007 + + # Reset index and convert Datetimes to strings + lake_ontario_df.reset_index(inplace=True) + lake_ontario_df['Datetime'] = lake_ontario_df['Datetime'].dt.strftime('%Y-%m-%d_%H:%M:%S') - filtered_df = lake_ontario_df.loc[(lake_ontario_df.index >= t0) & (lake_ontario_df.index < end_time)] - total_df = pd.merge(time_total_df, filtered_df, left_index=True, right_index=True, how='left') - total_df = total_df.rename(columns={'Outflow(m3/s)_y': 'Outflow(m3/s)'}).drop(columns='Outflow(m3/s)_x') LOG.debug("Creating Lake Ontario dataframe is completed in %s seconds." % (time.time() - LakeOntario_df_start_time)) - return total_df + + return lake_ontario_df def _create_canada_df(data_assimilation_parameters, streamflow_da_parameters, run_parameters, network, da_run): ''' @@ -1141,7 +1209,7 @@ def _create_canada_df(data_assimilation_parameters, streamflow_da_parameters, ru Returns: -------- - - usgs_df (DataFrame): dataframe of USGS gage observations + - canada_df (DataFrame): dataframe of Canadian gage observations ''' canada_timeslices_folder = data_assimilation_parameters.get("canada_timeslices_folder", None) #lastobs_file = streamflow_da_parameters.get("wrf_hydro_lastobs_file", None) @@ -1159,11 +1227,10 @@ def _create_canada_df(data_assimilation_parameters, streamflow_da_parameters, ru canada_df_start_time = time.time() canada_files = [canada_timeslices_folder.joinpath(f) for f in da_run['canada_timeslice_files']] - if canada_files: canada_df = ( nhd_io.get_obs_from_timeslices( - network.link_gage_df, + network.canadian_gage_df, crosswalk_gage_field, crosswalk_segID_field, canada_files, @@ -1173,7 +1240,7 @@ def _create_canada_df(data_assimilation_parameters, streamflow_da_parameters, ru network.t0, run_parameters.get("cpu_pool", None) ). - loc[network.link_gage_df.index] + loc[network.canadian_gage_df.index] ) else: @@ -1982,3 +2049,74 @@ def _read_lastobs_file( LOG.debug(f"Reading last observation file is completed in %s seconds." % (time.time() - read_lastobs_start_time)) return lastobs_df +def _create_GL_dfs(GL_crosswalk_df, data_assimilation_parameters, run_parameters, + da_run, t0): + + # USGS gages: + usgs_timeslices_folder = data_assimilation_parameters.get("usgs_timeslices_folder", None) + + # TODO: join timeslice folder and files into complete path upstream + usgs_timeslices_folder = pathlib.Path(usgs_timeslices_folder) + usgs_files = [usgs_timeslices_folder.joinpath(f) for f in + da_run['usgs_timeslice_files']] + + usgs_GL_crosswalk_df = GL_crosswalk_df[GL_crosswalk_df.index.isin([4800002,4800004])] + if usgs_files: + usgs_GL_df = ( + nhd_io.get_GL_obs_from_timeslices( + usgs_GL_crosswalk_df, + usgs_files, + cpu_pool=run_parameters.get("cpu_pool", 1), + ) + ) + usgs_GL_df = pd.melt(usgs_GL_df, + var_name='Datetime', + value_name='Discharge', + ignore_index=False).dropna().reset_index() + + else: + usgs_GL_df = pd.DataFrame() + + # Canadian gages: + canadian_timeslices_folder = data_assimilation_parameters.get("canada_timeslices_folder", None) + canadian_files = [canadian_timeslices_folder.joinpath(f) for f in + da_run['canada_timeslice_files']] + + canadian_GL_crosswalk_df = GL_crosswalk_df.loc[[4800006]] + + if canadian_files: + canadian_GL_df = ( + nhd_io.get_GL_obs_from_timeslices( + canadian_GL_crosswalk_df, + canadian_files, + cpu_pool=run_parameters.get("cpu_pool", 1), + ) + ) + canadian_GL_df = pd.melt(canadian_GL_df, + var_name='Datetime', + value_name='Discharge', + ignore_index=False).dropna().reset_index() + + else: + canadian_GL_df = pd.DataFrame() + + # Lake Ontario data: + if 'LakeOntario_outflow' in da_run: + lake_ontario_df = _create_LakeOntario_df(run_parameters, t0, da_run) + else: + lake_ontario_df = pd.DataFrame() + + great_lakes_df = pd.concat( + [usgs_GL_df, canadian_GL_df, lake_ontario_df] + ).rename(columns={'link': 'lake_id'}).sort_values(by=['lake_id','Datetime']) + + great_lakes_df['time'] = pd.to_datetime(great_lakes_df['Datetime'], format='%Y-%m-%d_%H:%M:%S') - t0 + great_lakes_df['time'] = great_lakes_df.time.dt.total_seconds().astype(int) + great_lakes_df.drop('Datetime', axis=1, inplace=True) + + great_lakes_param_df = pd.DataFrame(great_lakes_df.lake_id.unique(), columns=['lake_id']).sort_values('lake_id') + great_lakes_param_df['previous_assimilated_outflows'] = np.nan + great_lakes_param_df['previous_assimilated_time'] = 0 + great_lakes_param_df['update_time'] = 0 + + return great_lakes_df, great_lakes_param_df \ No newline at end of file diff --git a/src/troute-network/troute/HYFeaturesNetwork.py b/src/troute-network/troute/HYFeaturesNetwork.py index f5eb91bc7..85644f3d1 100644 --- a/src/troute-network/troute/HYFeaturesNetwork.py +++ b/src/troute-network/troute/HYFeaturesNetwork.py @@ -216,7 +216,7 @@ class HYFeaturesNetwork(AbstractNetwork): """ """ - __slots__ = ["_upstream_terminal", "_nexus_latlon", "_duplicate_ids_df"] + __slots__ = ["_upstream_terminal", "_nexus_latlon", "_duplicate_ids_df",] def __init__(self, supernetwork_parameters, @@ -340,6 +340,7 @@ def gages(self): def waterbody_null(self): return np.nan #pd.NA + def preprocess_network(self, flowpaths, nexus): self._dataframe = flowpaths @@ -541,9 +542,13 @@ def preprocess_waterbodies(self, lakes, nexus): self.waterbody_dataframe, gl_wbody_df ] - ) + ).sort_index() + + self._gl_climatology_df = get_great_lakes_climatology() + else: gl_dict = {} + self._gl_climatology_df = pd.DataFrame() self._waterbody_types_df = pd.DataFrame( data = 1, @@ -569,6 +574,7 @@ def preprocess_waterbodies(self, lakes, nexus): self._waterbody_type_specified = False self._link_lake_crosswalk = None self._duplicate_ids_df = pd.DataFrame() + self._gl_climatology_df = pd.DataFrame() self._dataframe = self.dataframe.drop('waterbody', axis=1).drop_duplicates() @@ -605,6 +611,10 @@ def preprocess_data_assimilation(self, network): .rename_axis(None, axis=0).to_dict() ) + #FIXME: temporary solution, add canadian gage crosswalk dataframe. This should come from + # the hydrofabric. + self._canadian_gage_link_df = pd.DataFrame(columns=['gages','link']).set_index('link') + # Find furthest downstream gage and create our lake_gage_df to make crosswalk dataframes. lake_gage_hydroseq_df = gages_df[~gages_df['lake_id'].isnull()][['lake_id', 'value', 'hydroseq']].rename(columns={'value': 'gages'}) lake_gage_hydroseq_df['lake_id'] = lake_gage_hydroseq_df['lake_id'].astype(int) diff --git a/src/troute-network/troute/NHDNetwork.py b/src/troute-network/troute/NHDNetwork.py index ee4afcf52..189393200 100644 --- a/src/troute-network/troute/NHDNetwork.py +++ b/src/troute-network/troute/NHDNetwork.py @@ -64,6 +64,8 @@ def __init__( print("... in %s seconds." % (time.time() - start_time)) self._flowpath_dict = {} + self._gl_climatology_df = pd.DataFrame() + self._canadian_gage_link_df = pd.DataFrame(columns=['gages','link']).set_index('link') super().__init__() diff --git a/src/troute-network/troute/nhd_io.py b/src/troute-network/troute/nhd_io.py index de50b0847..539e661aa 100644 --- a/src/troute-network/troute/nhd_io.py +++ b/src/troute-network/troute/nhd_io.py @@ -1249,6 +1249,96 @@ def get_obs_from_timeslices( return observation_df_new +def get_GL_obs_from_timeslices( + crosswalk_df, + timeslice_files, + crosswalk_gage_field='gages', + crosswalk_dest_field='link', + qc_threshold=1, + cpu_pool=1, +): + """ + Read observations from TimeSlice files and organize into a Pandas DataFrame. + This is specific to observations needed for Great Lakes data assimilation, + so no interpolation happens. + + Aguments + -------- + - crosswalk_gage_field (str): fieldname of gage ID data in crosswalk dataframe + - crosswalk_dest_field (str): fieldname of destination data in link_gage_df. + For streamflow DA, this is the field + containing segment IDs. For reservoir DA, + this is the field containing waterbody IDs. + - timeslice_files (list of PosixPath): Full paths to existing TimeSlice files + - qc_threshold (int): Numerical observation quality + threshold. Observations with quality + flags less than this value will be + removed and replaced witn nan. + - cpu_pool (int): Number of CPUs used for parallel + TimeSlice reading and interolation + + Returns + ------- + - observation_df_new (Pandas DataFrame): + + Notes + ----- + """ + # open TimeSlce files, organize data into dataframes + with Parallel(n_jobs=cpu_pool) as parallel: + jobs = [] + for f in timeslice_files: + jobs.append(delayed(_read_timeslice_file)(f)) + timeslice_dataframes = parallel(jobs) + + all_empty = all(df.empty for tuple in timeslice_dataframes for df in tuple) + if all_empty: + LOG.debug(f'{crosswalk_gage_field} DataFrames is empty, check timeslice files.') + return pd.DataFrame() + + # create lists of observations and obs quality dataframes returned + # from _read_timeslice_file + timeslice_obs_frames = [] + timeslice_qual_frames = [] + for d in timeslice_dataframes: + timeslice_obs_frames.append(d[0]) # TimeSlice gage observation + timeslice_qual_frames.append(d[1]) # TimeSlice observation qual + + # concatenate dataframes + timeslice_obs_df = pd.concat(timeslice_obs_frames, axis = 1) + timeslice_qual_df = pd.concat(timeslice_qual_frames, axis = 1) + + # Link <> gage crosswalk data + df = crosswalk_df.reset_index() + df = df.set_index(crosswalk_gage_field) + df.index = df.index.str.strip() + # join crosswalk data with timeslice data, indexed on crosswalk destination field + observation_df = (df.join(timeslice_obs_df). + reset_index(). + set_index(crosswalk_dest_field). + select_dtypes(include='number')) + + observation_qual_df = (df.join(timeslice_qual_df). + reset_index(). + set_index(crosswalk_dest_field). + select_dtypes(include='number')) + + # ---- Laugh testing ------ + # screen-out erroneous qc flags + observation_qual_df = (observation_qual_df. + mask(observation_qual_df < 0, np.nan). + mask(observation_qual_df > 1, np.nan) + ) + + # screen-out poor quality flow observations + observation_df = (observation_df. + mask(observation_qual_df < qc_threshold, np.nan). + mask(observation_df <= 0, np.nan) + ) + + return observation_df + + def get_param_str(target_file, param): # TODO: remove this duplicate function return get_attribute(target_file, param) diff --git a/src/troute-network/troute/rfc_lake_gage_crosswalk.py b/src/troute-network/troute/rfc_lake_gage_crosswalk.py index 082b03d3a..7d4b002ef 100644 --- a/src/troute-network/troute/rfc_lake_gage_crosswalk.py +++ b/src/troute-network/troute/rfc_lake_gage_crosswalk.py @@ -1,4 +1,5 @@ import pandas as pd +import numpy as np def get_rfc_lake_gage_crosswalk(): """ @@ -77,14 +78,14 @@ def get_rfc_lake_gage_crosswalk(): def get_great_lakes_climatology(): - df = pd.DataFrame( - { - 'Month': ["Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"], - 'SMR': [1946, 1907, 1879, 1929, 2105, 2202, 2306, 2378, 2343, 2282, 2235, 2046], - 'SCR': [4623, 4518, 4946, 5217, 5410, 5494, 5546, 5555, 5519, 5475, 5424, 5253], - 'NIR': [5630, 5523, 5673, 5921, 6179, 6172, 6089, 5977, 5839, 5751, 5757, 5771], - 'SLR': [6380, 6561, 6875, 7159, 7418, 7547, 7500, 7360, 7161, 6954, 6852, 6725] - } + outflow_data = np.array([[1946, 1907, 1879, 1929, 2105, 2202, 2306, 2378, 2343, 2282, 2235, 2046], + [4623, 4518, 4946, 5217, 5410, 5494, 5546, 5555, 5519, 5475, 5424, 5253], + [5630, 5523, 5673, 5921, 6179, 6172, 6089, 5977, 5839, 5751, 5757, 5771], + [6380, 6561, 6875, 7159, 7418, 7547, 7500, 7360, 7161, 6954, 6852, 6725]]) + df = ( + pd.DataFrame(data=outflow_data, index=[4800002,4800004,4800006,4800007], + columns=["Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"]) + .astype('float32') ) return df \ No newline at end of file diff --git a/src/troute-nwm/src/nwm_routing/__main__.py b/src/troute-nwm/src/nwm_routing/__main__.py index 01dbd4bad..3cc6bba73 100644 --- a/src/troute-nwm/src/nwm_routing/__main__.py +++ b/src/troute-nwm/src/nwm_routing/__main__.py @@ -230,6 +230,9 @@ def main_v04(argv): data_assimilation.reservoir_usace_param_df, data_assimilation.reservoir_rfc_df, data_assimilation.reservoir_rfc_param_df, + data_assimilation.great_lakes_df, + data_assimilation.great_lakes_param_df, + network.great_lakes_climatology_df, data_assimilation.assimilation_parameters, assume_short_ts, return_courant, @@ -1141,6 +1144,9 @@ def nwm_route( reservoir_usace_param_df, reservoir_rfc_df, reservoir_rfc_param_df, + great_lakes_df, + great_lakes_param_df, + great_lakes_climatology_df, da_parameter_dict, assume_short_ts, return_courant, @@ -1231,6 +1237,9 @@ def nwm_route( reservoir_usace_param_df, reservoir_rfc_df, reservoir_rfc_param_df, + great_lakes_df, + great_lakes_param_df, + great_lakes_climatology_df, da_parameter_dict, assume_short_ts, return_courant, @@ -1675,6 +1684,9 @@ def main_v03(argv): reservoir_usace_param_df, pd.DataFrame(), #empty dataframe for RFC data...not needed unless running via BMI pd.DataFrame(), #empty dataframe for RFC param data...not needed unless running via BMI + pd.DataFrame(), #empty dataframe for great lakes data... + pd.DataFrame(), #empty dataframe for great lakes param data... + pd.DataFrame(), #empty dataframe for great lakes climatology data... da_parameter_dict, assume_short_ts, return_courant, diff --git a/src/troute-routing/troute/routing/compute.py b/src/troute-routing/troute/routing/compute.py index e96aedf3b..55f529596 100644 --- a/src/troute-routing/troute/routing/compute.py +++ b/src/troute-routing/troute/routing/compute.py @@ -145,6 +145,9 @@ def _prep_reservoir_da_dataframes(reservoir_usgs_df, reservoir_usace_param_df, reservoir_rfc_df, reservoir_rfc_param_df, + great_lakes_df, + great_lakes_param_df, + great_lakes_climatology_df, waterbody_types_df_sub, t0, from_files, @@ -258,11 +261,36 @@ def _prep_reservoir_da_dataframes(reservoir_usgs_df, if not from_files: if not waterbody_types_df_sub.empty: waterbody_types_df_sub.loc[waterbody_types_df_sub['reservoir_type'] == 4] = 1 + + # Great Lakes + if not great_lakes_df.empty: + gl_wbodies_sub = waterbody_types_df_sub[ + waterbody_types_df_sub['reservoir_type']==6 + ].index + if exclude_segments: + gl_wbodies_sub = list(set(gl_wbodies_sub).difference(set(exclude_segments))) + gl_df_sub = great_lakes_df[great_lakes_df['lake_id'].isin(gl_wbodies_sub)] + gl_climatology_df_sub = great_lakes_climatology_df.loc[gl_wbodies_sub] + gl_param_df_sub = great_lakes_param_df[great_lakes_param_df['lake_id'].isin(gl_wbodies_sub)] + gl_parm_lake_id_sub = gl_param_df_sub.lake_id.to_numpy() + gl_param_flows_sub = gl_param_df_sub.previous_assimilated_outflows.to_numpy() + gl_param_time_sub = gl_param_df_sub.previous_assimilated_time.to_numpy() + gl_param_update_time_sub = gl_param_df_sub.update_time.to_numpy() + else: + gl_df_sub = pd.DataFrame(columns=['lake_id','time','Discharge']) + gl_climatology_df_sub = pd.DataFrame() + gl_parm_lake_id_sub = pd.DataFrame().to_numpy().reshape(0,) + gl_param_flows_sub = pd.DataFrame().to_numpy().reshape(0,) + gl_param_time_sub = pd.DataFrame().to_numpy().reshape(0,) + gl_param_update_time_sub = pd.DataFrame().to_numpy().reshape(0,) + if not waterbody_types_df_sub.empty: + waterbody_types_df_sub.loc[waterbody_types_df_sub['reservoir_type'] == 6] = 1 return ( reservoir_usgs_df_sub, reservoir_usgs_df_time, reservoir_usgs_update_time, reservoir_usgs_prev_persisted_flow, reservoir_usgs_persistence_update_time, reservoir_usgs_persistence_index, reservoir_usace_df_sub, reservoir_usace_df_time, reservoir_usace_update_time, reservoir_usace_prev_persisted_flow, reservoir_usace_persistence_update_time, reservoir_usace_persistence_index, reservoir_rfc_df_sub, reservoir_rfc_totalCounts, reservoir_rfc_file, reservoir_rfc_use_forecast, reservoir_rfc_timeseries_idx, reservoir_rfc_update_time, reservoir_rfc_da_timestep, reservoir_rfc_persist_days, + gl_df_sub, gl_parm_lake_id_sub, gl_param_flows_sub, gl_param_time_sub, gl_param_update_time_sub, gl_climatology_df_sub, waterbody_types_df_sub ) @@ -501,6 +529,9 @@ def compute_nhd_routing_v02( reservoir_usace_param_df, reservoir_rfc_df, reservoir_rfc_param_df, + great_lakes_df, + great_lakes_param_df, + great_lakes_climatology_df, da_parameter_dict, assume_short_ts, return_courant, @@ -744,6 +775,12 @@ def compute_nhd_routing_v02( reservoir_rfc_update_time, reservoir_rfc_da_timestep, reservoir_rfc_persist_days, + gl_df_sub, + gl_parm_lake_id_sub, + gl_param_flows_sub, + gl_param_time_sub, + gl_param_update_time_sub, + gl_climatology_df_sub, waterbody_types_df_sub, ) = _prep_reservoir_da_dataframes( reservoir_usgs_df, @@ -752,6 +789,9 @@ def compute_nhd_routing_v02( reservoir_usace_param_df, reservoir_rfc_df, reservoir_rfc_param_df, + great_lakes_df, + great_lakes_param_df, + great_lakes_climatology_df, waterbody_types_df_sub, t0, from_files, @@ -818,6 +858,15 @@ def compute_nhd_routing_v02( reservoir_rfc_update_time.astype("float32"), reservoir_rfc_da_timestep.astype("int32"), reservoir_rfc_persist_days.astype("int32"), + # Great Lakes DA data + gl_df_sub.lake_id.values.astype("int32"), + gl_df_sub.time.values.astype("int32"), + gl_df_sub.Discharge.values.astype("float32"), + gl_parm_lake_id_sub.astype("int32"), + gl_param_flows_sub.astype("float32"), + gl_param_time_sub.astype("int32"), + gl_param_update_time_sub.astype("int32"), + gl_climatology_df_sub.values.astype("float32"), { us: fvd for us, fvd in flowveldepth_interorder.items() @@ -1032,6 +1081,12 @@ def compute_nhd_routing_v02( reservoir_rfc_update_time, reservoir_rfc_da_timestep, reservoir_rfc_persist_days, + gl_df_sub, + gl_parm_lake_id_sub, + gl_param_flows_sub, + gl_param_time_sub, + gl_param_update_time_sub, + gl_climatology_df_sub, waterbody_types_df_sub, ) = _prep_reservoir_da_dataframes( reservoir_usgs_df, @@ -1040,6 +1095,9 @@ def compute_nhd_routing_v02( reservoir_usace_param_df, reservoir_rfc_df, reservoir_rfc_param_df, + great_lakes_df, + great_lakes_param_df, + great_lakes_climatology_df, waterbody_types_df_sub, t0, from_files, @@ -1104,6 +1162,15 @@ def compute_nhd_routing_v02( reservoir_rfc_update_time.astype("float32"), reservoir_rfc_da_timestep.astype("int32"), reservoir_rfc_persist_days.astype("int32"), + # Great Lakes DA data + gl_df_sub.lake_id.values.astype("int32"), + gl_df_sub.time.values.astype("int32"), + gl_df_sub.Discharge.values.astype("float32"), + gl_parm_lake_id_sub.astype("int32"), + gl_param_flows_sub.astype("float32"), + gl_param_time_sub.astype("int32"), + gl_param_update_time_sub.astype("int32"), + gl_climatology_df_sub.values.astype("float32"), { us: fvd for us, fvd in flowveldepth_interorder.items() @@ -1235,6 +1302,12 @@ def compute_nhd_routing_v02( reservoir_rfc_update_time, reservoir_rfc_da_timestep, reservoir_rfc_persist_days, + gl_df_sub, + gl_parm_lake_id_sub, + gl_param_flows_sub, + gl_param_time_sub, + gl_param_update_time_sub, + gl_climatology_df_sub, waterbody_types_df_sub, ) = _prep_reservoir_da_dataframes( reservoir_usgs_df, @@ -1243,6 +1316,9 @@ def compute_nhd_routing_v02( reservoir_usace_param_df, reservoir_rfc_df, reservoir_rfc_param_df, + great_lakes_df, + great_lakes_param_df, + great_lakes_climatology_df, waterbody_types_df_sub, t0, from_files, @@ -1300,6 +1376,15 @@ def compute_nhd_routing_v02( reservoir_rfc_update_time.astype("float32"), reservoir_rfc_da_timestep.astype("int32"), reservoir_rfc_persist_days.astype("int32"), + # Great Lakes DA data + gl_df_sub.lake_id.values.astype("int32"), + gl_df_sub.time.values.astype("int32"), + gl_df_sub.Discharge.values.astype("float32"), + gl_parm_lake_id_sub.astype("int32"), + gl_param_flows_sub.astype("float32"), + gl_param_time_sub.astype("int32"), + gl_param_update_time_sub.astype("int32"), + gl_climatology_df_sub.values.astype("float32"), {}, assume_short_ts, return_courant, @@ -1402,6 +1487,12 @@ def compute_nhd_routing_v02( reservoir_rfc_update_time, reservoir_rfc_da_timestep, reservoir_rfc_persist_days, + gl_df_sub, + gl_parm_lake_id_sub, + gl_param_flows_sub, + gl_param_time_sub, + gl_param_update_time_sub, + gl_climatology_df_sub, waterbody_types_df_sub, ) = _prep_reservoir_da_dataframes( reservoir_usgs_df, @@ -1410,6 +1501,9 @@ def compute_nhd_routing_v02( reservoir_usace_param_df, reservoir_rfc_df, reservoir_rfc_param_df, + great_lakes_df, + great_lakes_param_df, + great_lakes_climatology_df, waterbody_types_df_sub, t0, from_files, @@ -1466,6 +1560,15 @@ def compute_nhd_routing_v02( reservoir_rfc_update_time.astype("float32"), reservoir_rfc_da_timestep.astype("int32"), reservoir_rfc_persist_days.astype("int32"), + # Great Lakes DA data + gl_df_sub.lake_id.values.astype("int32"), + gl_df_sub.time.values.astype("int32"), + gl_df_sub.Discharge.values.astype("float32"), + gl_parm_lake_id_sub.astype("int32"), + gl_param_flows_sub.astype("float32"), + gl_param_time_sub.astype("int32"), + gl_param_update_time_sub.astype("int32"), + gl_climatology_df_sub.values.astype("float32"), {}, assume_short_ts, return_courant, @@ -1764,7 +1867,7 @@ def compute_diffusive_routing( rch_list[~x], dat_all[~x,3:], 0, # place-holder for streamflow DA parameters (np.asarray([]), np.asarray([]), np.asarray([])), - # place-holder for reservoir DA paramters + # place-holder for reservoir DA parameters (np.asarray([]), np.asarray([]), np.asarray([]), np.asarray([]), np.asarray([])), (np.asarray([]), np.asarray([]), np.asarray([]), np.asarray([]), np.asarray([])), # place holder for reservoir inflows @@ -1773,6 +1876,8 @@ def compute_diffusive_routing( (np.asarray([]), np.asarray([]), np.asarray([])), # place-holder for nudge values (np.empty(shape=(0, nts + 1), dtype='float32')), + # place-holder for great lakes DA values/parameters + (np.asarray([]), np.asarray([]), np.asarray([]), np.asarray([])), ) ) diff --git a/src/troute-routing/troute/routing/fast_reach/mc_reach.pyx b/src/troute-routing/troute/routing/fast_reach/mc_reach.pyx index 5d37e3f70..0d205d98e 100644 --- a/src/troute-routing/troute/routing/fast_reach/mc_reach.pyx +++ b/src/troute-routing/troute/routing/fast_reach/mc_reach.pyx @@ -19,6 +19,7 @@ from troute.network.reservoirs.levelpool.levelpool cimport MC_Levelpool, run_lp_ from troute.network.reservoirs.rfc.rfc cimport MC_RFC, run_rfc_c from troute.routing.fast_reach.reservoir_hybrid_da import reservoir_hybrid_da from troute.routing.fast_reach.reservoir_RFC_da import reservoir_RFC_da +from troute.routing.fast_reach.reservoir_GL_da import great_lakes_da from cython.parallel import prange #import cProfile @@ -207,6 +208,14 @@ cpdef object compute_network_structured( const float[:] reservoir_rfc_update_time, const int[:] reservoir_rfc_da_timestep, const int[:] reservoir_rfc_persist_days, + const int[:] great_lakes_idx, + const int[:] great_lakes_times, + const float[:] great_lakes_discharge, + const int[:] great_lakes_param_idx, + const float[:] great_lakes_param_prev_assim_flow, + const int[:] great_lakes_param_prev_assim_times, + const int[:] great_lakes_param_update_times, + const float[:,:] great_lakes_climatology, dict upstream_results={}, bint assume_short_ts=False, bint return_courant=False, @@ -425,6 +434,17 @@ cpdef object compute_network_structured( cdef np.ndarray[float, ndim=1] usace_prev_persistence_index = np.asarray(reservoir_usace_persistence_index) cdef np.ndarray[int, ndim=1] rfc_timeseries_idx = np.asarray(reservoir_rfc_timeseries_idx) + # great lakes arrays + cdef np.ndarray[int, ndim=1] gl_idx = np.asarray(great_lakes_idx) + cdef np.ndarray[float, ndim=1] gl_obs = np.asarray(great_lakes_discharge) + cdef np.ndarray[int, ndim=1] gl_times = np.asarray(great_lakes_times) + cdef np.ndarray[int, ndim=1] gl_param_idx = np.asarray(great_lakes_param_idx) + cdef np.ndarray[int, ndim=1] gl_update_time = np.asarray(great_lakes_param_update_times) + cdef np.ndarray[float, ndim=1] gl_prev_assim_ouflow = np.asarray(great_lakes_param_prev_assim_flow) + cdef np.ndarray[int, ndim=1] gl_prev_assim_timestamp = np.asarray(great_lakes_param_prev_assim_times) + cdef np.ndarray[float, ndim=2] gl_climatology = np.asarray(great_lakes_climatology) + + #--------------------------------------------------------------------------------------------- #--------------------------------------------------------------------------------------------- @@ -486,168 +506,208 @@ cpdef object compute_network_structured( if r.type == compute_type.RESERVOIR_LP: - # water elevation before levelpool calculation - initial_water_elevation = r.reach.lp.water_elevation - - # levelpool reservoir storage/outflow calculation - run_lp_c(r, upstream_flows, 0.0, routing_period, &reservoir_outflow, &reservoir_water_elevation) - - # USGS reservoir hybrid DA inputs - if r.reach.lp.wbody_type_code == 2: - # find index location of waterbody in reservoir_usgs_obs - # and reservoir_usgs_time - res_idx = np.where(usgs_idx == r.reach.lp.lake_number) - wbody_gage_obs = reservoir_usgs_obs[res_idx[0][0],:] - wbody_gage_time = reservoir_usgs_time - prev_persisted_outflow = usgs_prev_persisted_ouflow[res_idx[0][0]] - persistence_update_time = usgs_persistence_update_time[res_idx[0][0]] - persistence_index = usgs_prev_persistence_index[res_idx[0][0]] - update_time = usgs_update_time[res_idx[0][0]] - - # USACE reservoir hybrid DA inputs - if r.reach.lp.wbody_type_code == 3: - # find index location of waterbody in reservoir_usgs_obs - # and reservoir_usgs_time - res_idx = np.where(usace_idx == r.reach.lp.lake_number) - wbody_gage_obs = reservoir_usace_obs[res_idx[0][0],:] - wbody_gage_time = reservoir_usace_time - prev_persisted_outflow = usace_prev_persisted_ouflow[res_idx[0][0]] - persistence_update_time = usace_persistence_update_time[res_idx[0][0]] - persistence_index = usace_prev_persistence_index[res_idx[0][0]] - update_time = usace_update_time[res_idx[0][0]] - - # Execute reservoir DA - both USGS(2) and USACE(3) types - if r.reach.lp.wbody_type_code == 2 or r.reach.lp.wbody_type_code == 3: - - #print('***********************************************************') - #print('calling reservoir DA code for lake_id:', r.reach.lp.lake_number) - #print('before DA, simulated outflow = ', reservoir_outflow) - #print('before DA, simulated water elevation = ', r.reach.lp.water_elevation) - + # Great Lake waterbody: doesn't actually route anything, default outflows + # are from climatology. + if r.reach.lp.wbody_type_code == 6: + # find index location of waterbody in great_lakes_df + # and great_lakes_param_df + res_idx = np.where(gl_idx == r.reach.lp.lake_number) + wbody_gage_obs = gl_obs[res_idx[0]] + wbody_gage_time = gl_times[res_idx[0]] + res_param_idx = np.where(gl_param_idx == r.reach.lp.lake_number) + param_prev_assim_flow = gl_prev_assim_ouflow[res_param_idx[0][0]] + param_prev_assim_timestamp = gl_prev_assim_timestamp[res_param_idx[0][0]] + param_update_time = gl_update_time[res_param_idx[0][0]] + climatology = gl_climatology[res_param_idx[0][0],:] + (new_outflow, - new_persisted_outflow, - new_water_elevation, - new_update_time, - new_persistence_index, - new_persistence_update_time - ) = reservoir_hybrid_da( - r.reach.lp.lake_number, # lake identification number - wbody_gage_obs, # gage observation values (cms) - wbody_gage_time, # gage observation times (sec) - dt * timestep, # model time (sec) - prev_persisted_outflow, # previously persisted outflow (cms) - persistence_update_time, - persistence_index, # number of sequentially persisted update cycles - reservoir_outflow, # levelpool simulated outflow (cms) - upstream_flows, # waterbody inflow (cms) - dt, # model timestep (sec) - r.reach.lp.area, # waterbody surface area (km2) - r.reach.lp.max_depth, # max waterbody depth (m) - r.reach.lp.orifice_elevation, # orifice elevation (m) - initial_water_elevation, # water surface el., previous timestep (m) - 48.0, # gage lookback hours (hrs) - update_time # waterbody update time (sec) + new_assimilated_outflow, + new_assimilated_timestamp, + new_update_time + ) = great_lakes_da( + wbody_gage_obs, # gage observations (cms) + wbody_gage_time, # timestamps of gage observations (datetime str) + param_prev_assim_flow, # last used observation (cms) + param_prev_assim_timestamp, # timestamp of last used observation (datetime str) + param_update_time, # timestamp to determine whether to look for new observation (datetime str) + model_start_time, # model initilaization time (datetime str) + dt * timestep, # model time (sec) + climatology, # climatology outflows (cms) ) + + gl_update_time[res_param_idx[0][0]] = new_update_time + gl_prev_assim_ouflow[res_param_idx[0][0]] = new_assimilated_outflow + gl_prev_assim_timestamp[res_param_idx[0][0]] = new_assimilated_timestamp + + # populate flowveldepth array with levelpool or hybrid DA results + flowveldepth[r.id, timestep, 0] = new_outflow + flowveldepth[r.id, timestep, 1] = 0.0 + flowveldepth[r.id, timestep, 2] = 0.0 + upstream_array[r.id, timestep, 0] = upstream_flows + + else: + # water elevation before levelpool calculation + initial_water_elevation = r.reach.lp.water_elevation - #print('After DA, outflow = ', new_outflow) - #print('After DA, water elevation =', new_water_elevation) - - # update levelpool water elevation state - update_lp_c(r, new_water_elevation, &reservoir_water_elevation) - - # change reservoir_outflow - reservoir_outflow = new_outflow - - #print('confirming DA elevation replacement:', reservoir_water_elevation) - #print('===========================================================') - - # update USGS DA reservoir state arrays - if r.reach.lp.wbody_type_code == 2: - usgs_update_time[res_idx[0][0]] = new_update_time - usgs_prev_persisted_ouflow[res_idx[0][0]] = new_persisted_outflow - usgs_prev_persistence_index[res_idx[0][0]] = new_persistence_index - usgs_persistence_update_time[res_idx[0][0]] = new_persistence_update_time - - # update USACE DA reservoir state arrays - if r.reach.lp.wbody_type_code == 3: - usace_update_time[res_idx[0][0]] = new_update_time - usace_prev_persisted_ouflow[res_idx[0][0]] = new_persisted_outflow - usace_prev_persistence_index[res_idx[0][0]] = new_persistence_index - usace_persistence_update_time[res_idx[0][0]] = new_persistence_update_time - - - # RFC reservoir hybrid DA inputs - if r.reach.lp.wbody_type_code == 4: - # find index location of waterbody in reservoir_rfc_obs - # and reservoir_rfc_time - res_idx = np.where(rfc_idx == r.reach.lp.lake_number) - wbody_gage_obs = reservoir_rfc_obs[res_idx[0][0],:] - totalCounts = reservoir_rfc_totalCounts[res_idx[0][0]] - rfc_file = reservoir_rfc_file[res_idx[0][0]] - use_RFC = reservoir_rfc_use_forecast[res_idx[0][0]] - current_timeseries_idx = rfc_timeseries_idx[res_idx[0][0]] - update_time = rfc_update_time[res_idx[0][0]] - rfc_timestep = reservoir_rfc_da_timestep[res_idx[0][0]] - rfc_persist_days = reservoir_rfc_persist_days[res_idx[0][0]] - - # Execute RFC reservoir DA - both RFC(4) and Glacially Dammed Lake(5) types - if r.reach.lp.wbody_type_code == 4 or r.reach.lp.wbody_type_code == 5: + # levelpool reservoir storage/outflow calculation + run_lp_c(r, upstream_flows, 0.0, routing_period, &reservoir_outflow, &reservoir_water_elevation) - #print('***********************************************************') - #print('calling reservoir DA code for lake_id:', r.reach.lp.lake_number) - #print('before DA, simulated outflow = ', reservoir_outflow) - #print('before DA, simulated water elevation = ', r.reach.lp.water_elevation) + # USGS reservoir hybrid DA inputs + if r.reach.lp.wbody_type_code == 2: + # find index location of waterbody in reservoir_usgs_obs + # and reservoir_usgs_time + res_idx = np.where(usgs_idx == r.reach.lp.lake_number) + wbody_gage_obs = reservoir_usgs_obs[res_idx[0][0],:] + wbody_gage_time = reservoir_usgs_time + prev_persisted_outflow = usgs_prev_persisted_ouflow[res_idx[0][0]] + persistence_update_time = usgs_persistence_update_time[res_idx[0][0]] + persistence_index = usgs_prev_persistence_index[res_idx[0][0]] + update_time = usgs_update_time[res_idx[0][0]] - ( - new_outflow, + # USACE reservoir hybrid DA inputs + if r.reach.lp.wbody_type_code == 3: + # find index location of waterbody in reservoir_usgs_obs + # and reservoir_usgs_time + res_idx = np.where(usace_idx == r.reach.lp.lake_number) + wbody_gage_obs = reservoir_usace_obs[res_idx[0][0],:] + wbody_gage_time = reservoir_usace_time + prev_persisted_outflow = usace_prev_persisted_ouflow[res_idx[0][0]] + persistence_update_time = usace_persistence_update_time[res_idx[0][0]] + persistence_index = usace_prev_persistence_index[res_idx[0][0]] + update_time = usace_update_time[res_idx[0][0]] + + # Execute reservoir DA - both USGS(2) and USACE(3) types + if r.reach.lp.wbody_type_code == 2 or r.reach.lp.wbody_type_code == 3: + + #print('***********************************************************') + #print('calling reservoir DA code for lake_id:', r.reach.lp.lake_number) + #print('before DA, simulated outflow = ', reservoir_outflow) + #print('before DA, simulated water elevation = ', r.reach.lp.water_elevation) + + (new_outflow, + new_persisted_outflow, new_water_elevation, - new_update_time, - new_timeseries_idx, - dynamic_reservoir_type, - assimilated_value, - assimilated_source_file, - ) = reservoir_RFC_da( - use_RFC, # boolean whether to use RFC values or not - wbody_gage_obs, # gage observation values (cms) - current_timeseries_idx, # index of for current time series observation - totalCounts, # total number of observations in RFC timeseries - routing_period, # routing period (sec) - dt * timestep, # model time (sec) - update_time, # time to advance to next time series index - rfc_timestep, # frequency of DA observations (sec) - rfc_persist_days*24*60*60, # max seconds RFC forecasts will be used/persisted (days -> seconds) - r.reach.lp.wbody_type_code, # reservoir type - upstream_flows, # waterbody inflow (cms) - initial_water_elevation, # water surface el., previous timestep (m) - reservoir_outflow, # levelpool simulated outflow (cms) - reservoir_water_elevation, # levelpool simulated water elevation (m) - r.reach.lp.area*1.0e6, # waterbody surface area (km2 -> m2) - r.reach.lp.max_depth, # max waterbody depth (m) - rfc_file, # RFC file name - ) + new_update_time, + new_persistence_index, + new_persistence_update_time + ) = reservoir_hybrid_da( + r.reach.lp.lake_number, # lake identification number + wbody_gage_obs, # gage observation values (cms) + wbody_gage_time, # gage observation times (sec) + dt * timestep, # model time (sec) + prev_persisted_outflow, # previously persisted outflow (cms) + persistence_update_time, + persistence_index, # number of sequentially persisted update cycles + reservoir_outflow, # levelpool simulated outflow (cms) + upstream_flows, # waterbody inflow (cms) + dt, # model timestep (sec) + r.reach.lp.area, # waterbody surface area (km2) + r.reach.lp.max_depth, # max waterbody depth (m) + r.reach.lp.orifice_elevation, # orifice elevation (m) + initial_water_elevation, # water surface el., previous timestep (m) + 48.0, # gage lookback hours (hrs) + update_time # waterbody update time (sec) + ) + + #print('After DA, outflow = ', new_outflow) + #print('After DA, water elevation =', new_water_elevation) + + # update levelpool water elevation state + update_lp_c(r, new_water_elevation, &reservoir_water_elevation) + + # change reservoir_outflow + reservoir_outflow = new_outflow + + #print('confirming DA elevation replacement:', reservoir_water_elevation) + #print('===========================================================') + + # update USGS DA reservoir state arrays + if r.reach.lp.wbody_type_code == 2: + usgs_update_time[res_idx[0][0]] = new_update_time + usgs_prev_persisted_ouflow[res_idx[0][0]] = new_persisted_outflow + usgs_prev_persistence_index[res_idx[0][0]] = new_persistence_index + usgs_persistence_update_time[res_idx[0][0]] = new_persistence_update_time + + # update USACE DA reservoir state arrays + if r.reach.lp.wbody_type_code == 3: + usace_update_time[res_idx[0][0]] = new_update_time + usace_prev_persisted_ouflow[res_idx[0][0]] = new_persisted_outflow + usace_prev_persistence_index[res_idx[0][0]] = new_persistence_index + usace_persistence_update_time[res_idx[0][0]] = new_persistence_update_time + + + # RFC reservoir hybrid DA inputs + if r.reach.lp.wbody_type_code == 4: + # find index location of waterbody in reservoir_rfc_obs + # and reservoir_rfc_time + res_idx = np.where(rfc_idx == r.reach.lp.lake_number) + wbody_gage_obs = reservoir_rfc_obs[res_idx[0][0],:] + totalCounts = reservoir_rfc_totalCounts[res_idx[0][0]] + rfc_file = reservoir_rfc_file[res_idx[0][0]] + use_RFC = reservoir_rfc_use_forecast[res_idx[0][0]] + current_timeseries_idx = rfc_timeseries_idx[res_idx[0][0]] + update_time = rfc_update_time[res_idx[0][0]] + rfc_timestep = reservoir_rfc_da_timestep[res_idx[0][0]] + rfc_persist_days = reservoir_rfc_persist_days[res_idx[0][0]] + + # Execute RFC reservoir DA - both RFC(4) and Glacially Dammed Lake(5) types + if r.reach.lp.wbody_type_code == 4 or r.reach.lp.wbody_type_code == 5: + + #print('***********************************************************') + #print('calling reservoir DA code for lake_id:', r.reach.lp.lake_number) + #print('before DA, simulated outflow = ', reservoir_outflow) + #print('before DA, simulated water elevation = ', r.reach.lp.water_elevation) + + ( + new_outflow, + new_water_elevation, + new_update_time, + new_timeseries_idx, + dynamic_reservoir_type, + assimilated_value, + assimilated_source_file, + ) = reservoir_RFC_da( + use_RFC, # boolean whether to use RFC values or not + wbody_gage_obs, # gage observation values (cms) + current_timeseries_idx, # index of for current time series observation + totalCounts, # total number of observations in RFC timeseries + routing_period, # routing period (sec) + dt * timestep, # model time (sec) + update_time, # time to advance to next time series index + rfc_timestep, # frequency of DA observations (sec) + rfc_persist_days*24*60*60, # max seconds RFC forecasts will be used/persisted (days -> seconds) + r.reach.lp.wbody_type_code, # reservoir type + upstream_flows, # waterbody inflow (cms) + initial_water_elevation, # water surface el., previous timestep (m) + reservoir_outflow, # levelpool simulated outflow (cms) + reservoir_water_elevation, # levelpool simulated water elevation (m) + r.reach.lp.area*1.0e6, # waterbody surface area (km2 -> m2) + r.reach.lp.max_depth, # max waterbody depth (m) + rfc_file, # RFC file name + ) - #print('After DA, outflow = ', new_outflow) - #print('After DA, water elevation =', new_water_elevation) - - # update levelpool water elevation state - update_lp_c(r, new_water_elevation, &reservoir_water_elevation) - - # change reservoir_outflow - reservoir_outflow = new_outflow - - #print('confirming DA elevation replacement:', reservoir_water_elevation) - #print('===========================================================') - - # update RFC DA reservoir state arrays - rfc_update_time[res_idx[0][0]] = new_update_time - rfc_timeseries_idx[res_idx[0][0]] = new_timeseries_idx + #print('After DA, outflow = ', new_outflow) + #print('After DA, water elevation =', new_water_elevation) + + # update levelpool water elevation state + update_lp_c(r, new_water_elevation, &reservoir_water_elevation) + + # change reservoir_outflow + reservoir_outflow = new_outflow + + #print('confirming DA elevation replacement:', reservoir_water_elevation) + #print('===========================================================') + + # update RFC DA reservoir state arrays + rfc_update_time[res_idx[0][0]] = new_update_time + rfc_timeseries_idx[res_idx[0][0]] = new_timeseries_idx + - - # populate flowveldepth array with levelpool or hybrid DA results - flowveldepth[r.id, timestep, 0] = reservoir_outflow - flowveldepth[r.id, timestep, 1] = 0.0 - flowveldepth[r.id, timestep, 2] = reservoir_water_elevation - upstream_array[r.id, timestep, 0] = upstream_flows + # populate flowveldepth array with levelpool or hybrid DA results + flowveldepth[r.id, timestep, 0] = reservoir_outflow + flowveldepth[r.id, timestep, 1] = 0.0 + flowveldepth[r.id, timestep, 2] = reservoir_water_elevation + upstream_array[r.id, timestep, 0] = upstream_flows elif r.type == compute_type.RESERVOIR_RFC: run_rfc_c(r, upstream_flows, 0.0, routing_period, &reservoir_outflow, &reservoir_water_elevation) @@ -775,5 +835,11 @@ cpdef object compute_network_structured( rfc_idx, rfc_update_time-((timestep-1)*dt), rfc_timeseries_idx ), - np.asarray(nudge) + np.asarray(nudge), + ( + gl_param_idx, + gl_prev_assim_ouflow, + gl_prev_assim_timestamp, + gl_update_time + ) ) \ No newline at end of file diff --git a/src/troute-routing/troute/routing/fast_reach/reservoir_GL_da.py b/src/troute-routing/troute/routing/fast_reach/reservoir_GL_da.py new file mode 100644 index 000000000..6683e9ee3 --- /dev/null +++ b/src/troute-routing/troute/routing/fast_reach/reservoir_GL_da.py @@ -0,0 +1,131 @@ +import numpy as np +import logging +from datetime import datetime, timedelta +LOG = logging.getLogger('') + +def great_lakes_da( + gage_obs, + gage_time, + previous_assimilated_outflow, + previous_assimilated_time, + update_time, + t0, + now, + climatology_outflows, + update_time_interval = 3600, + persistence_limit = 11, +): + + """ + Perform persistence reservoir data assimilation for the Great Lakes. + + Arguments + --------- + - lake_number (int): unique lake identification + number + - gage_obs (memoryview slice): array of gage observations for + at the gage associated with this + waterbody + - gage_time (memoryview slice): array of observation times + (secs) relative to the model + initialization time (t0). + - t0 (str): Initialization time (t0). + - now (float): Current time, seconds since in- + itialization time (t0). + - previous_persisted_outflow (numpy.float32): Persisted outflow value from + last timestep. + - obs_lookback_hours (int): Maximum allowable lookback time + when searching for new outflow + observations + - update_time (numpy.float32): time to search for new observ- + ations, seconds since model + initialization time (t0) + - update_time_interval (float): Time interval from current to + next update time (secs) + + Returns + ------- + - outflow (float): Persisted reservoir outflow rate (m3/sec) + """ + + # LOG.debug('Great Lakes data assimilation for lake_id: %s at time %s from run start' % (lake_number, now)) + + # set new values as old values to start: + new_assimilated_outflow = previous_assimilated_outflow + new_assimilated_time = previous_assimilated_time + new_update_time = update_time + + # determine which climatology value to use based on model time + t0_datetime = datetime.strptime(t0, '%Y-%m-%d_%H:%M:%S') + now_datetime = t0_datetime + timedelta(seconds=now) + month_idx = now_datetime.month - 1 # subtract 1 for python indexing + climatology_outflow = climatology_outflows[month_idx] + + if np.isnan(previous_assimilated_outflow): + previous_assimilated_outflow = climatology_outflow + + if now >= update_time: + # LOG.debug( + # 'Looking for observation to assimilate...' + # ) + + # initialize variable to store assimilated observations. We initialize + # as np.nan, so that if no good quality observations are found, we can + # easily catch it. + obs = np.nan + + # identify location of gage_time that is nearest to, but not greater + # than the update time + t_idxs = np.nonzero(((now - gage_time) >= 0))[0] + if len(t_idxs)>0: + t_idx = t_idxs[-1] + + # record good observation to obs + obs = gage_obs[t_idx] + + # determine how many seconds prior to the update_time the + # observation was taken + t_obs = gage_time[t_idx] + gage_lookback_seconds = now - t_obs + + if np.isnan(obs): + ''' + - If no good observation is found, then we do not update the + update time. Consequently we will continue to search for a good + observation at each model timestep, before updating the update + time. + ''' + # LOG.debug( + # 'No good observation found, persisting previously assimilated flow' + # ) + outflow = previous_assimilated_outflow + + elif gage_lookback_seconds > (persistence_limit*60*60*24): + ''' + If a good observation was found, but the time difference between + the current model time and the observation timestamp is greater than + the persistence limit, default to climatology. + ''' + outflow = climatology_outflow + + else: + ''' + A good observation is found and it is within the persistence limit. + ''' + outflow = obs + new_assimilated_outflow = obs + new_assimilated_time = t_obs + new_update_time = update_time + update_time_interval + + else: + outflow = previous_assimilated_outflow + + if (now - previous_assimilated_time) > (persistence_limit*60*60*24): + ''' + If the time difference between the current model time and the + observation timestamp is greater than + the persistence limit, default to climatology. + ''' + outflow = climatology_outflow + + return outflow, new_assimilated_outflow, new_assimilated_time, new_update_time \ No newline at end of file diff --git a/test/LowerColorado_TX/test_AnA_V4_NHD.yaml b/test/LowerColorado_TX/test_AnA_V4_NHD.yaml index b8a906517..c53cd8412 100644 --- a/test/LowerColorado_TX/test_AnA_V4_NHD.yaml +++ b/test/LowerColorado_TX/test_AnA_V4_NHD.yaml @@ -95,9 +95,9 @@ compute_parameters: reservoir_parameter_file : domain/reservoir_index_AnA.nc reservoir_persistence_da: #---------- - reservoir_persistence_usgs : False - reservoir_persistence_usace : False - reservoir_persistence_greatLake : True + reservoir_persistence_usgs : True + reservoir_persistence_usace : True + reservoir_persistence_greatLake : False reservoir_rfc_da: #---------- reservoir_rfc_forecasts : False diff --git a/test/LowerColorado_TX_v4/test_AnA_V4_HYFeature.yaml b/test/LowerColorado_TX_v4/test_AnA_V4_HYFeature.yaml index 0e00ef5cd..6105673af 100644 --- a/test/LowerColorado_TX_v4/test_AnA_V4_HYFeature.yaml +++ b/test/LowerColorado_TX_v4/test_AnA_V4_HYFeature.yaml @@ -113,7 +113,7 @@ compute_parameters: #---------- reservoir_persistence_usgs : True reservoir_persistence_usace : True - reservoir_persistence_greatLake : True + reservoir_persistence_greatLake : False reservoir_rfc_da: #---------- reservoir_rfc_forecasts : True