Skip to content

Commit

Permalink
Merge pull request #122 from HydrologicEngineeringCenter/CritScript
Browse files Browse the repository at this point in the history
Add shef_critfile_import
  • Loading branch information
tlelv authored Jan 7, 2025
2 parents 5945971 + af8cad9 commit 3dd203a
Show file tree
Hide file tree
Showing 4 changed files with 313 additions and 0 deletions.
1 change: 1 addition & 0 deletions cwms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from cwms.api import *
from cwms.catalog.catalog import *
from cwms.datafile_imports.shef_critfile_import import *
from cwms.forecast.forecast_instance import *
from cwms.forecast.forecast_spec import *
from cwms.levels.location_levels import *
Expand Down
126 changes: 126 additions & 0 deletions cwms/datafile_imports/shef_critfile_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import re
from typing import Dict, List

import pandas as pd

from cwms.timeseries.timeseries import (
timeseries_group_df_to_json,
update_timeseries_groups,
)


def import_critfile_to_ts_group(
file_path: str,
office_id: str,
group_id: str = "SHEF Data Acquisition",
category_id: str = "Data Acquisition",
group_office_id: str = "CWMS",
replace_assigned_ts: bool = False,
) -> None:
"""
Processes a .crit file and saves the information to the SHEF Data Acquisition time series group.
Parameters
----------
file_path : str
Path to the .crit file.
office_id : str
The ID of the office associated with the specified timeseries.
group_id : str, optional
The specified group associated with the timeseries data. Defaults to "SHEF Data Acquisition".
category_id : str, optional
The category ID that contains the timeseries group. Defaults to "Data Acquisition".
group_office_id : str, optional
The specified office group associated with the timeseries data. Defaults to "CWMS".
replace_assigned_ts : bool, optional
Specifies whether to unassign all existing time series before assigning new time series specified in the content body. Default is False.
Returns
-------
None
"""

def parse_crit_file(file_path: str) -> List[Dict[str, str]]:
"""
Parses a .crit file into a dictionary containing timeseries ID and Alias.
Parameters
----------
file_path : str
Path to the .crit file.
Returns
-------
List[Dict[str, str]]
A list of dictionaries with "Alias" and "Timeseries ID" as keys.
"""
parsed_data = []
with open(file_path, "r") as file:
for line in file:
# Ignore comment lines and empty lines
if line.startswith("#") or not line.strip():
continue

# Extract alias, timeseries ID, and TZ
match = re.match(r"([^=]+)=([^;]+);(.+)", line.strip())

if match:
alias = match.group(1).strip()
timeseries_id = match.group(2).strip()
alias2 = match.group(3).strip()

parsed_data.append(
{
"Alias": alias + ":" + alias2,
"Timeseries ID": timeseries_id,
}
)

return parsed_data

def append_df(
df: pd.DataFrame, office_id: str, ts_id: str, alias: str
) -> pd.DataFrame:
"""
Appends a row to the DataFrame.
Parameters
----------
df : pandas.DataFrame
The DataFrame to append to.
office_id : str
The ID of the office associated with the specified timeseries.
tsId : str
The timeseries ID from the file.
alias : str
The alias from the file.
Returns
-------
pandas.DataFrame
The updated DataFrame.
"""
data = {
"office-id": [office_id],
"timeseries-id": [ts_id],
"alias-id": [alias],
}
df = pd.concat([df, pd.DataFrame(data)])
return df

# Parse the file and get the parsed data
parsed_data = parse_crit_file(file_path)

df = pd.DataFrame()
for data in parsed_data:
# Create DataFrame for the current row
df = append_df(df, office_id, data["Timeseries ID"], data["Alias"])

# Generate JSON dictionary
json_dict = timeseries_group_df_to_json(df, group_id, group_office_id, category_id)

update_timeseries_groups(
group_id=group_id,
office_id=office_id,
replace_assigned_ts=replace_assigned_ts,
data=json_dict,
)
115 changes: 115 additions & 0 deletions cwms/timeseries/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,121 @@
from cwms.cwms_types import JSON, Data


def update_timeseries_groups(
data: JSON,
group_id: str,
office_id: str,
replace_assigned_ts: Optional[bool] = False,
) -> None:
"""
Updates the timeseries groups with the provided group ID and office ID.
Parameters
----------
group_id : str
The new specified timeseries ID that will replace the old ID.
office_id : str
The ID of the office associated with the specified timeseries.
replace_assigned_ts : bool, optional
Specifies whether to unassign all existing time series before assigning new time series specified in the content body. Default is False.
data: JSON dictionary
Time Series data to be stored.
````````````````````````````````````````
Returns
-------
None
"""
if not group_id:
raise ValueError("Cannot update a specified level without an id")
if not office_id:
raise ValueError("Cannot update a specified level without an office id")

endpoint = f"timeseries/group/{group_id}"
params = {
"replace-assigned-ts": replace_assigned_ts,
"office": office_id,
}

api.patch(endpoint=endpoint, data=data, params=params, api_version=1)


def timeseries_group_df_to_json(
data: pd.DataFrame,
group_id: str,
office_id: str,
category_id: str,
) -> JSON:
"""
Converts a dataframe to a json dictionary in the correct format.
Parameters
----------
data: pd.DataFrame
Dataframe containing timeseries information.
group_id: str
The group ID for the timeseries.
office_id: str
The ID of the office associated with the specified timeseries.
category_id: str
The ID of the category associated with the group
Returns
-------
JSON
JSON dictionary of the timeseries data.
"""
df = data.copy()
required_columns = ["office-id", "timeseries-id"]
optional_columns = ["alias-id", "attribute", "ts-code"]
for column in required_columns:
if column not in df.columns:
raise TypeError(
f"{column} is a required column in data when posting as a dataframe"
)

if df[required_columns].isnull().any().any():
raise ValueError(
f"Null/NaN values found in required columns: {required_columns}. "
)

# Fill optional columns with default values if missing
if "alias-id" not in df.columns:
df["alias-id"] = None
if "attribute" not in df.columns:
df["attribute"] = 0

# Replace NaN with None for optional columns
for column in optional_columns:
if column in df.columns:
data[column] = df[column].where(pd.notnull(df[column]), None)

# Build the list of time-series entries
assigned_time_series = df.apply(
lambda entry: {
"office-id": entry["office-id"],
"timeseries-id": entry["timeseries-id"],
"alias-id": entry["alias-id"],
"attribute": entry["attribute"],
**(
{"ts-code": entry["ts-code"]}
if "ts-code" in entry and pd.notna(entry["ts-code"])
else {}
),
},
axis=1,
).tolist()

# Construct the final JSON dictionary
json_dict = {
"office-id": office_id,
"id": group_id,
"time-series-category": {"office-id": office_id, "id": category_id},
"assigned-time-series": assigned_time_series,
}

return json_dict


def get_timeseries_group(group_id: str, category_id: str, office_id: str) -> Data:
"""Retreives time series stored in the requested time series group
Expand Down
71 changes: 71 additions & 0 deletions tests/timeseries/timeseries_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,77 @@ def init_session():
cwms.api.init_session(api_root=_MOCK_ROOT)


def test_update_timeseries_groups(requests_mock):
group_id = "USGS TS Data Acquisition"
office_id = "CWMS"
replace_assigned_ts = True
data = _TS_GROUP

requests_mock.patch(
f"{_MOCK_ROOT}/timeseries/group/USGS%20TS%20Data%20Acquisition?replace-assigned-ts=True&office=CWMS",
status_code=200,
)

timeseries.update_timeseries_groups(
data=data,
group_id=group_id,
office_id=office_id,
replace_assigned_ts=replace_assigned_ts,
)

assert requests_mock.called
assert requests_mock.call_count == 1


def test_timeseries_group_df_to_json_valid_data():
data = pd.DataFrame(
{
"office-id": ["office123", "office456"],
"timeseries-id": ["ts1", "ts2"],
"alias-id": [None, "alias2"],
"attribute": [0, 10],
"ts-code": ["code1", None],
}
)

# Clean DataFrame by removing NaN from required columns and fix optional ones
required_columns = ["office-id", "timeseries-id"]
data = data.dropna(subset=required_columns)
optional_columns = ["alias-id", "ts-code"]
for col in optional_columns:
if col in data.columns:
data[col] = data[col].where(pd.notnull(data[col]), None)

expected_json = {
"office-id": "office123",
"id": "group123",
"time-series-category": {
"office-id": "office123",
"id": "cat123",
},
"assigned-time-series": [
{
"office-id": "office123",
"timeseries-id": "ts1",
"alias-id": None,
"attribute": 0,
"ts-code": "code1",
},
{
"office-id": "office456",
"timeseries-id": "ts2",
"alias-id": "alias2",
"attribute": 10,
},
],
}

result = timeseries.timeseries_group_df_to_json(
data, "group123", "office123", "cat123"
)
assert result == expected_json


def test_timeseries_df_to_json():
test_json = {
"name": "TestLoc.Stage.Inst.1Hour.0.Testing",
Expand Down

0 comments on commit 3dd203a

Please sign in to comment.