Skip to content

Commit

Permalink
Merge pull request #8 from RENCI/getStationData
Browse files Browse the repository at this point in the history
Made changes to get_station_data and associated function, including n…
  • Loading branch information
PhillipsOwen authored Jan 24, 2024
2 parents 67cc01a + a8cf70a commit bc68321
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 13 deletions.
3 changes: 2 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ max-args=15
max-locals=20
min-public-methods=0
fail-under=9.5
extension-pkg-allow-list=pydantic
extension-pkg-allow-list=pydantic
max-branches=20
63 changes: 54 additions & 9 deletions src/common/pg_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def get_station_data(self, **kwargs) -> str:
:return:
"""
# get forecast data
forecast_data = self.get_forecast_station_data(kwargs['station_name'], kwargs['time_mark'], kwargs['data_source'])
forecast_data = self.get_forecast_station_data(kwargs['station_name'], kwargs['time_mark'], kwargs['data_source'], kwargs['instance_name'])

# derive start date from the time mark
start_date = (datetime.fromisoformat(kwargs['time_mark']) - timedelta(4)).isoformat()
Expand All @@ -290,7 +290,7 @@ def get_station_data(self, **kwargs) -> str:

# get nowcast data_source from forecast data_source
# check if data_source is tropical
if kwargs['data_source'][:2] == 'al':
if kwargs['forcing_metaclass'] == 'tropical':
# if tropical split data source and replace second value (OFCL) with NOWCAST
source_parts = kwargs['data_source'].split('_')
source_parts[1] = 'NOWCAST'
Expand All @@ -299,17 +299,30 @@ def get_station_data(self, **kwargs) -> str:
# if synoptic split data source and replace fist value (GFSFORECAST) with NOWCAST
nowcast_source = 'NOWCAST_' + "_".join(kwargs['data_source'].split('_')[1:])

# get obs and nowcast data
obs_data = self.get_obs_station_data(kwargs['station_name'], start_date, end_date, nowcast_source)
# get obs data
obs_data = self.get_obs_station_data(kwargs['station_name'], start_date, end_date)

# drop empty columns
empty_cols = [col for col in obs_data.columns if obs_data[col].isnull().all()]
obs_data.drop(empty_cols, axis=1, inplace=True)

# get nowcast data
nowcast_data = self.get_nowcast_station_data(kwargs['station_name'], start_date, end_date, nowcast_source, kwargs['instance_name'])

# If nowcast data exists merge it with obs data
if not nowcast_data.empty:
obs_data = obs_data.merge(nowcast_data, on='time_stamp', how='outer')

# replace any None values with np.nan, in both DataFrames
forecast_data.fillna(value=np.nan)
obs_data.fillna(value=np.nan)

# replace any -99999 values with np.nan, in both DataFrames
fcols = forecast_data.columns.tolist()
forecast_data[fcols] = forecast_data[fcols].replace([-99999], np.nan)
ocols = obs_data.columns.tolist()
obs_data[ocols] = obs_data[ocols].replace([-99999], np.nan)

# convert all values after the time mark to nan, in obs data, except in the time_stamp and tidal_predictions columns
for col in obs_data.columns:
if col not in ('time_stamp', 'tidal_predictions'):
Expand Down Expand Up @@ -353,21 +366,53 @@ def get_station_data(self, **kwargs) -> str:
# return the data to the caller
return station_df.to_csv(index=False)

def get_forecast_station_data(self, station_name, time_mark, data_source) -> pd.DataFrame:
def get_forecast_station_data(self, station_name, time_mark, data_source, instance_name) -> pd.DataFrame:
"""
Gets the forcast station data
:param station_name:
:param time_mark:
:param data_source:
:param instance_name:
:return:
"""
# init the return value:
ret_val: pd.DataFrame = pd.DataFrame()

# Run query
sql = f"SELECT * FROM get_forecast_timeseries_station_data(_station_name := '{station_name}', _timemark := '{time_mark}', " \
f"_data_source := '{data_source}')"
f"_data_source := '{data_source}', _source_instance := '{instance_name}')"

# get the info
station_data = self.exec_sql('apsviz_gauges', sql)

# was it successful
if station_data != -1:
# convert query output to Pandas dataframe
ret_val = pd.DataFrame.from_dict(station_data, orient='columns')
else:
ret_val = pd.DataFrame(None)

# Return Pandas dataframe
return ret_val

def get_nowcast_station_data(self, station_name, start_date, end_date, data_source, instance_name) -> pd.DataFrame:
"""
Gets the forcast station data
:param station_name:
:param start_date:
:param end_data:
:param data_source:
:param instance_name:
:return:
"""
# init the return value:
ret_val: pd.DataFrame = pd.DataFrame()

# Run query
sql = f"SELECT * FROM get_nowcast_timeseries_station_data(_station_name := '{station_name}', _start_date := '{start_date}', " \
f"_end_date := '{end_date}', _data_source := '{data_source}', _source_instance := '{instance_name}')"

# get the info
station_data = self.exec_sql('apsviz_gauges', sql)
Expand All @@ -382,7 +427,7 @@ def get_forecast_station_data(self, station_name, time_mark, data_source) -> pd.
# Return Pandas dataframe
return ret_val

def get_obs_station_data(self, station_name, start_date, end_date, nowcast_source) -> pd.DataFrame:
def get_obs_station_data(self, station_name, start_date, end_date) -> pd.DataFrame:
"""
Gets the observed station data.
Expand All @@ -396,8 +441,8 @@ def get_obs_station_data(self, station_name, start_date, end_date, nowcast_sourc
ret_val: pd.DataFrame = pd.DataFrame()

# build the query
sql = f"SELECT * FROM get_obs_timeseries_station_data(_station_name := '{station_name}', _start_date := '{start_date}', _end_date := " \
f"'{end_date}', _nowcast_source := '{nowcast_source}')"
sql = f"SELECT * FROM get_obs_timeseries_station_data(_station_name := '{station_name}', _start_date := '{start_date}', " \
f"_end_date := '{end_date}')"

# get the info
station_data = self.exec_sql('apsviz_gauges', sql)
Expand Down
7 changes: 4 additions & 3 deletions src/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,8 @@ async def get_terria_map_catalog_data_file(file_name: Union[str, None] = Query(d

@APP.get('/get_station_data', status_code=200, response_model=None, response_class=PlainTextResponse)
def get_station_data(station_name: Union[str, None] = Query(default=None), time_mark: Union[str, None] = Query(default=None),
data_source: Union[str, None] = Query(default=None)) -> PlainTextResponse:
data_source: Union[str, None] = Query(default=None), instance_name: Union[str, None] = Query(default=None),
forcing_metaclass: Union[str, None] = Query(default=None)) -> PlainTextResponse:
"""
Returns the CSV formatted observational station.
Expand All @@ -412,12 +413,12 @@ def get_station_data(station_name: Union[str, None] = Query(default=None), time_
# example input - station name: 8651370, timemark: 2023-08-24T00:00:00, data_source: GFSFORECAST_WNAT_53K_V1.0
try:
# validate the input. nothing is optional
if station_name or time_mark or data_source:
if station_name or time_mark or data_source or instance_name or forcing_metaclass:
# init the kwargs variable
kwargs: dict = {}

# create the param list
params: list = ['station_name', 'time_mark', 'data_source']
params: list = ['station_name', 'time_mark', 'data_source', 'instance_name', 'forcing_metaclass']

# loop through the SP params passed in
for param in params:
Expand Down

0 comments on commit bc68321

Please sign in to comment.