Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Great Lakes Data Assimilation #808

Merged
merged 36 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
959c422
update creation of great lakes climatology
shorvath-noaa Jul 12, 2024
c8c1eec
pass great lakes DA info through to compute
shorvath-noaa Jul 12, 2024
88a966b
update properties handling great lakes info
shorvath-noaa Jul 12, 2024
e02b44b
set default empty df for great lakes info
shorvath-noaa Jul 12, 2024
595f024
preprocess great lakes DA info, create 'update_after_compute()' function
shorvath-noaa Jul 12, 2024
ffe1267
create new get_timeslice_obs function specific for great lakes
shorvath-noaa Jul 12, 2024
53c0202
add option for great lakes DA
shorvath-noaa Jul 12, 2024
5fda225
function for performing great lakes DA
shorvath-noaa Jul 12, 2024
2986bcf
finalize great_lakes.update_after_compute() function
shorvath-noaa Jul 17, 2024
e68dd58
define after compute functions for great lakes DA
shorvath-noaa Jul 19, 2024
c2ac770
update _prep_reservoir_da_dataframes() to include great lakes
shorvath-noaa Jul 19, 2024
f650959
refine passing objects to great lakes da module
shorvath-noaa Jul 19, 2024
5ae2357
finalize da function for great lakes
shorvath-noaa Jul 19, 2024
e617b7e
update creation of great lakes climatology
shorvath-noaa Jul 12, 2024
ae75cb7
pass great lakes DA info through to compute
shorvath-noaa Jul 12, 2024
b48f1c4
update properties handling great lakes info
shorvath-noaa Jul 12, 2024
7b3476c
set default empty df for great lakes info
shorvath-noaa Jul 12, 2024
960aa7a
preprocess great lakes DA info, create 'update_after_compute()' function
shorvath-noaa Jul 12, 2024
2d5975a
create new get_timeslice_obs function specific for great lakes
shorvath-noaa Jul 12, 2024
316edec
add option for great lakes DA
shorvath-noaa Jul 12, 2024
78b0cb2
function for performing great lakes DA
shorvath-noaa Jul 12, 2024
23a6ec2
finalize great_lakes.update_after_compute() function
shorvath-noaa Jul 17, 2024
ba667fe
define after compute functions for great lakes DA
shorvath-noaa Jul 19, 2024
1cbc1d8
update _prep_reservoir_da_dataframes() to include great lakes
shorvath-noaa Jul 19, 2024
aac5702
refine passing objects to great lakes da module
shorvath-noaa Jul 19, 2024
1e9a5b0
finalize da function for great lakes
shorvath-noaa Jul 19, 2024
7e0a7b3
merge changes
shorvath-noaa Jul 19, 2024
c34a95a
add default empty dataframe for gl_climatology_df_sub
shorvath-noaa Jul 19, 2024
14b8cf2
add column names for empty data frame gl_df_sub
shorvath-noaa Jul 19, 2024
09d0129
set default empty data frames for great lakes dfs
shorvath-noaa Jul 19, 2024
7e45245
move _canadian_gage_df slot from HYFeatures to AbstractNetwork
shorvath-noaa Jul 19, 2024
a46d407
check if list of new great_lakes_param_dfs is empty before trying to …
shorvath-noaa Jul 19, 2024
ecbb855
set default _canadian_gage_link_df in NHDNetwork
shorvath-noaa Jul 19, 2024
b4e17dc
update DA settings for test cases
shorvath-noaa Jul 19, 2024
79d4888
add empty placeholders in diffusive results for great lakes DA
shorvath-noaa Jul 19, 2024
efd8ad7
Merge branch 'master' into great_lakes_DA
shorvath-noaa Jul 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/troute-network/troute/AbstractNetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ class AbstractNetwork(ABC):
__slots__ = ["_dataframe", "_waterbody_connections", "_gages",
"_terminal_codes", "_connections", "_waterbody_df",
"_waterbody_types_df", "_waterbody_type_specified", "_link_gage_df",
"_canadian_gage_link_df",
"_independent_networks", "_reaches_by_tw", "_flowpath_dict",
"_reverse_network", "_q0", "_t0", "_link_lake_crosswalk",
"_usgs_lake_gage_crosswalk", "_usace_lake_gage_crosswalk", "_rfc_lake_gage_crosswalk",
"_qlateral", "_break_segments", "_segment_index", "_coastal_boundary_depth_df",
"supernetwork_parameters", "waterbody_parameters","data_assimilation_parameters",
"restart_parameters", "compute_parameters", "forcing_parameters",
"hybrid_parameters", "preprocessing_parameters", "output_parameters",
"verbose", "showtiming", "break_points", "_routing", "_nexus_dict", "_poi_nex_dict"]
"verbose", "showtiming", "break_points", "_routing", "_gl_climatology_df", "_nexus_dict", "_poi_nex_dict"]


def __init__(self, from_files=True, value_dict={}):

Expand Down Expand Up @@ -410,6 +412,14 @@ def refactored_reaches(self):
@property
def unrefactored_topobathy_df(self):
return self._routing.unrefactored_topobathy_df

@property
def great_lakes_climatology_df(self):
return self._gl_climatology_df

@property
def canadian_gage_df(self):
return self._canadian_gage_link_df


def set_synthetic_wb_segments(self, synthetic_wb_segments, synthetic_wb_id_offset):
Expand Down
302 changes: 220 additions & 82 deletions src/troute-network/troute/DataAssimilation.py

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions src/troute-network/troute/HYFeaturesNetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class HYFeaturesNetwork(AbstractNetwork):
"""
"""
__slots__ = ["_upstream_terminal", "_nexus_latlon", "_duplicate_ids_df"]
__slots__ = ["_upstream_terminal", "_nexus_latlon", "_duplicate_ids_df",]

def __init__(self,
supernetwork_parameters,
Expand Down Expand Up @@ -340,6 +340,7 @@ def gages(self):
def waterbody_null(self):
return np.nan #pd.NA


def preprocess_network(self, flowpaths, nexus):
self._dataframe = flowpaths

Expand Down Expand Up @@ -541,9 +542,13 @@ def preprocess_waterbodies(self, lakes, nexus):
self.waterbody_dataframe,
gl_wbody_df
]
)
).sort_index()

self._gl_climatology_df = get_great_lakes_climatology()

else:
gl_dict = {}
self._gl_climatology_df = pd.DataFrame()

self._waterbody_types_df = pd.DataFrame(
data = 1,
Expand All @@ -569,6 +574,7 @@ def preprocess_waterbodies(self, lakes, nexus):
self._waterbody_type_specified = False
self._link_lake_crosswalk = None
self._duplicate_ids_df = pd.DataFrame()
self._gl_climatology_df = pd.DataFrame()

self._dataframe = self.dataframe.drop('waterbody', axis=1).drop_duplicates()

Expand Down Expand Up @@ -605,6 +611,10 @@ def preprocess_data_assimilation(self, network):
.rename_axis(None, axis=0).to_dict()
)

#FIXME: temporary solution, add canadian gage crosswalk dataframe. This should come from
# the hydrofabric.
self._canadian_gage_link_df = pd.DataFrame(columns=['gages','link']).set_index('link')

# Find furthest downstream gage and create our lake_gage_df to make crosswalk dataframes.
lake_gage_hydroseq_df = gages_df[~gages_df['lake_id'].isnull()][['lake_id', 'value', 'hydroseq']].rename(columns={'value': 'gages'})
lake_gage_hydroseq_df['lake_id'] = lake_gage_hydroseq_df['lake_id'].astype(int)
Expand Down
2 changes: 2 additions & 0 deletions src/troute-network/troute/NHDNetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ def __init__(
print("... in %s seconds." % (time.time() - start_time))

self._flowpath_dict = {}
self._gl_climatology_df = pd.DataFrame()
self._canadian_gage_link_df = pd.DataFrame(columns=['gages','link']).set_index('link')

super().__init__()

Expand Down
90 changes: 90 additions & 0 deletions src/troute-network/troute/nhd_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,96 @@ def get_obs_from_timeslices(
return observation_df_new


def get_GL_obs_from_timeslices(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is very similar to get_obs_from_timeslices function. Is it possible to merge these two?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's actually identical just without the interpolation step. We could merge them, but I do want to avoid interpolating for the Great Lakes, so we'd have to alter the original function to only interpolate under certain conditions. I chose this method for now for simplicity, but I'm open to re-working this function.

crosswalk_df,
timeslice_files,
crosswalk_gage_field='gages',
crosswalk_dest_field='link',
qc_threshold=1,
cpu_pool=1,
):
"""
Read observations from TimeSlice files and organize into a Pandas DataFrame.
This is specific to observations needed for Great Lakes data assimilation,
so no interpolation happens.
Aguments
--------
- crosswalk_gage_field (str): fieldname of gage ID data in crosswalk dataframe
- crosswalk_dest_field (str): fieldname of destination data in link_gage_df.
For streamflow DA, this is the field
containing segment IDs. For reservoir DA,
this is the field containing waterbody IDs.
- timeslice_files (list of PosixPath): Full paths to existing TimeSlice files
- qc_threshold (int): Numerical observation quality
threshold. Observations with quality
flags less than this value will be
removed and replaced witn nan.
- cpu_pool (int): Number of CPUs used for parallel
TimeSlice reading and interolation
Returns
-------
- observation_df_new (Pandas DataFrame):
Notes
-----
"""
# open TimeSlce files, organize data into dataframes
with Parallel(n_jobs=cpu_pool) as parallel:
jobs = []
for f in timeslice_files:
jobs.append(delayed(_read_timeslice_file)(f))
timeslice_dataframes = parallel(jobs)

all_empty = all(df.empty for tuple in timeslice_dataframes for df in tuple)
if all_empty:
LOG.debug(f'{crosswalk_gage_field} DataFrames is empty, check timeslice files.')
return pd.DataFrame()

# create lists of observations and obs quality dataframes returned
# from _read_timeslice_file
timeslice_obs_frames = []
timeslice_qual_frames = []
for d in timeslice_dataframes:
timeslice_obs_frames.append(d[0]) # TimeSlice gage observation
timeslice_qual_frames.append(d[1]) # TimeSlice observation qual

# concatenate dataframes
timeslice_obs_df = pd.concat(timeslice_obs_frames, axis = 1)
timeslice_qual_df = pd.concat(timeslice_qual_frames, axis = 1)

# Link <> gage crosswalk data
df = crosswalk_df.reset_index()
df = df.set_index(crosswalk_gage_field)
df.index = df.index.str.strip()
# join crosswalk data with timeslice data, indexed on crosswalk destination field
observation_df = (df.join(timeslice_obs_df).
reset_index().
set_index(crosswalk_dest_field).
select_dtypes(include='number'))

observation_qual_df = (df.join(timeslice_qual_df).
reset_index().
set_index(crosswalk_dest_field).
select_dtypes(include='number'))

# ---- Laugh testing ------
# screen-out erroneous qc flags
observation_qual_df = (observation_qual_df.
mask(observation_qual_df < 0, np.nan).
mask(observation_qual_df > 1, np.nan)
)

# screen-out poor quality flow observations
observation_df = (observation_df.
mask(observation_qual_df < qc_threshold, np.nan).
mask(observation_df <= 0, np.nan)
)

return observation_df


def get_param_str(target_file, param):
# TODO: remove this duplicate function
return get_attribute(target_file, param)
Expand Down
17 changes: 9 additions & 8 deletions src/troute-network/troute/rfc_lake_gage_crosswalk.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pandas as pd
import numpy as np

def get_rfc_lake_gage_crosswalk():
"""
Expand Down Expand Up @@ -77,14 +78,14 @@ def get_rfc_lake_gage_crosswalk():

def get_great_lakes_climatology():

df = pd.DataFrame(
{
'Month': ["Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"],
'SMR': [1946, 1907, 1879, 1929, 2105, 2202, 2306, 2378, 2343, 2282, 2235, 2046],
'SCR': [4623, 4518, 4946, 5217, 5410, 5494, 5546, 5555, 5519, 5475, 5424, 5253],
'NIR': [5630, 5523, 5673, 5921, 6179, 6172, 6089, 5977, 5839, 5751, 5757, 5771],
'SLR': [6380, 6561, 6875, 7159, 7418, 7547, 7500, 7360, 7161, 6954, 6852, 6725]
}
outflow_data = np.array([[1946, 1907, 1879, 1929, 2105, 2202, 2306, 2378, 2343, 2282, 2235, 2046],
[4623, 4518, 4946, 5217, 5410, 5494, 5546, 5555, 5519, 5475, 5424, 5253],
[5630, 5523, 5673, 5921, 6179, 6172, 6089, 5977, 5839, 5751, 5757, 5771],
[6380, 6561, 6875, 7159, 7418, 7547, 7500, 7360, 7161, 6954, 6852, 6725]])
df = (
pd.DataFrame(data=outflow_data, index=[4800002,4800004,4800006,4800007],
columns=["Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"])
.astype('float32')
)

return df
12 changes: 12 additions & 0 deletions src/troute-nwm/src/nwm_routing/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ def main_v04(argv):
data_assimilation.reservoir_usace_param_df,
data_assimilation.reservoir_rfc_df,
data_assimilation.reservoir_rfc_param_df,
data_assimilation.great_lakes_df,
data_assimilation.great_lakes_param_df,
network.great_lakes_climatology_df,
data_assimilation.assimilation_parameters,
assume_short_ts,
return_courant,
Expand Down Expand Up @@ -1141,6 +1144,9 @@ def nwm_route(
reservoir_usace_param_df,
reservoir_rfc_df,
reservoir_rfc_param_df,
great_lakes_df,
great_lakes_param_df,
great_lakes_climatology_df,
da_parameter_dict,
assume_short_ts,
return_courant,
Expand Down Expand Up @@ -1231,6 +1237,9 @@ def nwm_route(
reservoir_usace_param_df,
reservoir_rfc_df,
reservoir_rfc_param_df,
great_lakes_df,
great_lakes_param_df,
great_lakes_climatology_df,
da_parameter_dict,
assume_short_ts,
return_courant,
Expand Down Expand Up @@ -1675,6 +1684,9 @@ def main_v03(argv):
reservoir_usace_param_df,
pd.DataFrame(), #empty dataframe for RFC data...not needed unless running via BMI
pd.DataFrame(), #empty dataframe for RFC param data...not needed unless running via BMI
pd.DataFrame(), #empty dataframe for great lakes data...
pd.DataFrame(), #empty dataframe for great lakes param data...
pd.DataFrame(), #empty dataframe for great lakes climatology data...
da_parameter_dict,
assume_short_ts,
return_courant,
Expand Down
Loading
Loading