Skip to content

Commit

Permalink
Great Lakes Data Assimilation (#808)
Browse files Browse the repository at this point in the history
* update creation of great lakes climatology

* pass great lakes DA info through to compute

* update properties handling great lakes info

* set default empty df for great lakes info

* preprocess great lakes DA info, create 'update_after_compute()' function

* create new get_timeslice_obs function specific for great lakes

* add option for great lakes DA

* function for performing great lakes DA

* finalize great_lakes.update_after_compute() function

* define after compute functions for great lakes DA

* update _prep_reservoir_da_dataframes() to include great lakes

* refine passing objects to great lakes da module

* finalize da function for great lakes

* update creation of great lakes climatology

* pass great lakes DA info through to compute

* update properties handling great lakes info

* set default empty df for great lakes info

* preprocess great lakes DA info, create 'update_after_compute()' function

* create new get_timeslice_obs function specific for great lakes

* add option for great lakes DA

* function for performing great lakes DA

* finalize great_lakes.update_after_compute() function

* define after compute functions for great lakes DA

* update _prep_reservoir_da_dataframes() to include great lakes

* refine passing objects to great lakes da module

* finalize da function for great lakes

* add default empty dataframe for gl_climatology_df_sub

* add column names for empty data frame gl_df_sub

* set default empty data frames for great lakes dfs

* move _canadian_gage_df slot from HYFeatures to AbstractNetwork

* check if list of new great_lakes_param_dfs is empty before trying to concat

* set default _canadian_gage_link_df in NHDNetwork

* update DA settings for test cases

* add empty placeholders in diffusive results for great lakes DA
  • Loading branch information
shorvath-noaa authored Jul 31, 2024
1 parent 0e60959 commit a7260b7
Show file tree
Hide file tree
Showing 12 changed files with 818 additions and 253 deletions.
12 changes: 11 additions & 1 deletion src/troute-network/troute/AbstractNetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ class AbstractNetwork(ABC):
__slots__ = ["_dataframe", "_waterbody_connections", "_gages",
"_terminal_codes", "_connections", "_waterbody_df",
"_waterbody_types_df", "_waterbody_type_specified", "_link_gage_df",
"_canadian_gage_link_df",
"_independent_networks", "_reaches_by_tw", "_flowpath_dict",
"_reverse_network", "_q0", "_t0", "_link_lake_crosswalk",
"_usgs_lake_gage_crosswalk", "_usace_lake_gage_crosswalk", "_rfc_lake_gage_crosswalk",
"_qlateral", "_break_segments", "_segment_index", "_coastal_boundary_depth_df",
"supernetwork_parameters", "waterbody_parameters","data_assimilation_parameters",
"restart_parameters", "compute_parameters", "forcing_parameters",
"hybrid_parameters", "preprocessing_parameters", "output_parameters",
"verbose", "showtiming", "break_points", "_routing", "_nexus_dict", "_poi_nex_dict"]
"verbose", "showtiming", "break_points", "_routing", "_gl_climatology_df", "_nexus_dict", "_poi_nex_dict"]


def __init__(self, from_files=True, value_dict={}):

Expand Down Expand Up @@ -410,6 +412,14 @@ def refactored_reaches(self):
@property
def unrefactored_topobathy_df(self):
return self._routing.unrefactored_topobathy_df

@property
def great_lakes_climatology_df(self):
return self._gl_climatology_df

@property
def canadian_gage_df(self):
return self._canadian_gage_link_df


def set_synthetic_wb_segments(self, synthetic_wb_segments, synthetic_wb_id_offset):
Expand Down
302 changes: 220 additions & 82 deletions src/troute-network/troute/DataAssimilation.py

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions src/troute-network/troute/HYFeaturesNetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class HYFeaturesNetwork(AbstractNetwork):
"""
"""
__slots__ = ["_upstream_terminal", "_nexus_latlon", "_duplicate_ids_df"]
__slots__ = ["_upstream_terminal", "_nexus_latlon", "_duplicate_ids_df",]

def __init__(self,
supernetwork_parameters,
Expand Down Expand Up @@ -340,6 +340,7 @@ def gages(self):
def waterbody_null(self):
return np.nan #pd.NA


def preprocess_network(self, flowpaths, nexus):
self._dataframe = flowpaths

Expand Down Expand Up @@ -541,9 +542,13 @@ def preprocess_waterbodies(self, lakes, nexus):
self.waterbody_dataframe,
gl_wbody_df
]
)
).sort_index()

self._gl_climatology_df = get_great_lakes_climatology()

else:
gl_dict = {}
self._gl_climatology_df = pd.DataFrame()

self._waterbody_types_df = pd.DataFrame(
data = 1,
Expand All @@ -569,6 +574,7 @@ def preprocess_waterbodies(self, lakes, nexus):
self._waterbody_type_specified = False
self._link_lake_crosswalk = None
self._duplicate_ids_df = pd.DataFrame()
self._gl_climatology_df = pd.DataFrame()

self._dataframe = self.dataframe.drop('waterbody', axis=1).drop_duplicates()

Expand Down Expand Up @@ -605,6 +611,10 @@ def preprocess_data_assimilation(self, network):
.rename_axis(None, axis=0).to_dict()
)

#FIXME: temporary solution, add canadian gage crosswalk dataframe. This should come from
# the hydrofabric.
self._canadian_gage_link_df = pd.DataFrame(columns=['gages','link']).set_index('link')

# Find furthest downstream gage and create our lake_gage_df to make crosswalk dataframes.
lake_gage_hydroseq_df = gages_df[~gages_df['lake_id'].isnull()][['lake_id', 'value', 'hydroseq']].rename(columns={'value': 'gages'})
lake_gage_hydroseq_df['lake_id'] = lake_gage_hydroseq_df['lake_id'].astype(int)
Expand Down
2 changes: 2 additions & 0 deletions src/troute-network/troute/NHDNetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def __init__(
print("... in %s seconds." % (time.time() - start_time))

self._flowpath_dict = {}
self._gl_climatology_df = pd.DataFrame()
self._canadian_gage_link_df = pd.DataFrame(columns=['gages','link']).set_index('link')

super().__init__()

Expand Down
90 changes: 90 additions & 0 deletions src/troute-network/troute/nhd_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,96 @@ def get_obs_from_timeslices(
return observation_df_new


def get_GL_obs_from_timeslices(
crosswalk_df,
timeslice_files,
crosswalk_gage_field='gages',
crosswalk_dest_field='link',
qc_threshold=1,
cpu_pool=1,
):
"""
Read observations from TimeSlice files and organize into a Pandas DataFrame.
This is specific to observations needed for Great Lakes data assimilation,
so no interpolation happens.
Aguments
--------
- crosswalk_gage_field (str): fieldname of gage ID data in crosswalk dataframe
- crosswalk_dest_field (str): fieldname of destination data in link_gage_df.
For streamflow DA, this is the field
containing segment IDs. For reservoir DA,
this is the field containing waterbody IDs.
- timeslice_files (list of PosixPath): Full paths to existing TimeSlice files
- qc_threshold (int): Numerical observation quality
threshold. Observations with quality
flags less than this value will be
removed and replaced witn nan.
- cpu_pool (int): Number of CPUs used for parallel
TimeSlice reading and interolation
Returns
-------
- observation_df_new (Pandas DataFrame):
Notes
-----
"""
# open TimeSlce files, organize data into dataframes
with Parallel(n_jobs=cpu_pool) as parallel:
jobs = []
for f in timeslice_files:
jobs.append(delayed(_read_timeslice_file)(f))
timeslice_dataframes = parallel(jobs)

all_empty = all(df.empty for tuple in timeslice_dataframes for df in tuple)
if all_empty:
LOG.debug(f'{crosswalk_gage_field} DataFrames is empty, check timeslice files.')
return pd.DataFrame()

# create lists of observations and obs quality dataframes returned
# from _read_timeslice_file
timeslice_obs_frames = []
timeslice_qual_frames = []
for d in timeslice_dataframes:
timeslice_obs_frames.append(d[0]) # TimeSlice gage observation
timeslice_qual_frames.append(d[1]) # TimeSlice observation qual

# concatenate dataframes
timeslice_obs_df = pd.concat(timeslice_obs_frames, axis = 1)
timeslice_qual_df = pd.concat(timeslice_qual_frames, axis = 1)

# Link <> gage crosswalk data
df = crosswalk_df.reset_index()
df = df.set_index(crosswalk_gage_field)
df.index = df.index.str.strip()
# join crosswalk data with timeslice data, indexed on crosswalk destination field
observation_df = (df.join(timeslice_obs_df).
reset_index().
set_index(crosswalk_dest_field).
select_dtypes(include='number'))

observation_qual_df = (df.join(timeslice_qual_df).
reset_index().
set_index(crosswalk_dest_field).
select_dtypes(include='number'))

# ---- Laugh testing ------
# screen-out erroneous qc flags
observation_qual_df = (observation_qual_df.
mask(observation_qual_df < 0, np.nan).
mask(observation_qual_df > 1, np.nan)
)

# screen-out poor quality flow observations
observation_df = (observation_df.
mask(observation_qual_df < qc_threshold, np.nan).
mask(observation_df <= 0, np.nan)
)

return observation_df


def get_param_str(target_file, param):
# TODO: remove this duplicate function
return get_attribute(target_file, param)
Expand Down
17 changes: 9 additions & 8 deletions src/troute-network/troute/rfc_lake_gage_crosswalk.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pandas as pd
import numpy as np

def get_rfc_lake_gage_crosswalk():
"""
Expand Down Expand Up @@ -77,14 +78,14 @@ def get_rfc_lake_gage_crosswalk():

def get_great_lakes_climatology():

df = pd.DataFrame(
{
'Month': ["Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"],
'SMR': [1946, 1907, 1879, 1929, 2105, 2202, 2306, 2378, 2343, 2282, 2235, 2046],
'SCR': [4623, 4518, 4946, 5217, 5410, 5494, 5546, 5555, 5519, 5475, 5424, 5253],
'NIR': [5630, 5523, 5673, 5921, 6179, 6172, 6089, 5977, 5839, 5751, 5757, 5771],
'SLR': [6380, 6561, 6875, 7159, 7418, 7547, 7500, 7360, 7161, 6954, 6852, 6725]
}
outflow_data = np.array([[1946, 1907, 1879, 1929, 2105, 2202, 2306, 2378, 2343, 2282, 2235, 2046],
[4623, 4518, 4946, 5217, 5410, 5494, 5546, 5555, 5519, 5475, 5424, 5253],
[5630, 5523, 5673, 5921, 6179, 6172, 6089, 5977, 5839, 5751, 5757, 5771],
[6380, 6561, 6875, 7159, 7418, 7547, 7500, 7360, 7161, 6954, 6852, 6725]])
df = (
pd.DataFrame(data=outflow_data, index=[4800002,4800004,4800006,4800007],
columns=["Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"])
.astype('float32')
)

return df
12 changes: 12 additions & 0 deletions src/troute-nwm/src/nwm_routing/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ def main_v04(argv):
data_assimilation.reservoir_usace_param_df,
data_assimilation.reservoir_rfc_df,
data_assimilation.reservoir_rfc_param_df,
data_assimilation.great_lakes_df,
data_assimilation.great_lakes_param_df,
network.great_lakes_climatology_df,
data_assimilation.assimilation_parameters,
assume_short_ts,
return_courant,
Expand Down Expand Up @@ -1141,6 +1144,9 @@ def nwm_route(
reservoir_usace_param_df,
reservoir_rfc_df,
reservoir_rfc_param_df,
great_lakes_df,
great_lakes_param_df,
great_lakes_climatology_df,
da_parameter_dict,
assume_short_ts,
return_courant,
Expand Down Expand Up @@ -1231,6 +1237,9 @@ def nwm_route(
reservoir_usace_param_df,
reservoir_rfc_df,
reservoir_rfc_param_df,
great_lakes_df,
great_lakes_param_df,
great_lakes_climatology_df,
da_parameter_dict,
assume_short_ts,
return_courant,
Expand Down Expand Up @@ -1675,6 +1684,9 @@ def main_v03(argv):
reservoir_usace_param_df,
pd.DataFrame(), #empty dataframe for RFC data...not needed unless running via BMI
pd.DataFrame(), #empty dataframe for RFC param data...not needed unless running via BMI
pd.DataFrame(), #empty dataframe for great lakes data...
pd.DataFrame(), #empty dataframe for great lakes param data...
pd.DataFrame(), #empty dataframe for great lakes climatology data...
da_parameter_dict,
assume_short_ts,
return_courant,
Expand Down
Loading

0 comments on commit a7260b7

Please sign in to comment.