Skip to content

Commit

Permalink
Merge branch 'main' into task/vite-update
Browse files Browse the repository at this point in the history
  • Loading branch information
brettedw authored Jan 30, 2025
2 parents b22b01a + d27e0c8 commit 7b8c7b9
Show file tree
Hide file tree
Showing 8 changed files with 625 additions and 605 deletions.
8 changes: 7 additions & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,13 @@
"type": "python",
"request": "launch",
"module": "app.auto_spatial_advisory.critical_hours",
"console": "integratedTerminal"
"console": "integratedTerminal",
"args": [
"-r",
"3517"
// "-z",
// "85"
]
},
{
"name": "sfms raster processor job",
Expand Down
119 changes: 109 additions & 10 deletions api/app/auto_spatial_advisory/critical_hours.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
import argparse
import asyncio
from collections import defaultdict
from datetime import date, datetime, timedelta
import logging
import math
from typing import Dict, List, Tuple, Any
import numpy as np
import os
import json
import sys
from collections import defaultdict
from datetime import date, datetime, timedelta, timezone
from time import perf_counter
import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Tuple

import numpy as np
from aiohttp import ClientSession
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from aiobotocore.client import AioBaseClient

from app import configure_logging
from app.auto_spatial_advisory.debug_critical_hours import get_critical_hours_json_from_s3
from app.auto_spatial_advisory.run_type import RunType
from app.db.crud.auto_spatial_advisory import (
get_all_sfms_fuel_type_records,
Expand All @@ -33,17 +38,20 @@
from app.schemas.observations import WeatherStationHourlyReadings
from app.stations import get_stations_asynchronously
from app.utils.geospatial import PointTransformer
from app.utils.s3 import get_client
from app.utils.time import get_hour_20_from_date, get_julian_date
from app.wildfire_one import wfwx_api
from app.wildfire_one.schema_parsers import WFWXWeatherStation
from pydantic_core import to_jsonable_python

logger = logging.getLogger(__name__)

DAYS_TO_RETAIN = 21


@dataclass(frozen=True)
class CriticalHoursInputs:
class CriticalHoursInputs(BaseModel):
"""
Encapsulates the dailies, yesterday dailies and hourlies for a set of stations required for calculating critical hours.
Encapsulates the dailies, yesterday dailies, and hourlies for a set of stations required for calculating critical hours.
Since daily data comes from WF1 as JSON, we treat the values as Any types for now.
"""

Expand All @@ -52,6 +60,17 @@ class CriticalHoursInputs:
hourly_observations_by_station_code: Dict[int, WeatherStationHourlyReadings]


class CriticalHoursIO(BaseModel):
fuel_types_by_area: Dict[str, float]
wfwx_stations: List[WFWXWeatherStation]
critical_hours_inputs: CriticalHoursInputs
critical_hours_by_zone_and_fuel_type: Dict[int, Dict[str, List]]


class CriticalHoursIOByZone(BaseModel):
critical_hours_by_zone: Dict[int, CriticalHoursIO]


def determine_start_time(times: list[float]) -> float:
"""
Returns a single start time based on a naive heuristic.
Expand Down Expand Up @@ -132,6 +151,35 @@ async def save_critical_hours(db_session: AsyncSession, zone_unit_id: int, criti
await save_all_critical_hours(db_session, critical_hours_to_save)


async def apply_retention_policy(client: AioBaseClient, bucket: str):
today = datetime.now(timezone.utc).date()
retention_date = today - timedelta(days=DAYS_TO_RETAIN)
logger.info(f"Applying critical hours s3 retention policy. Data older than {DAYS_TO_RETAIN} days is being deleted.")

prefix = "critical_hours/"

res = await client.list_objects_v2(Bucket=bucket, Prefix=prefix, Delimiter="/")

if "CommonPrefixes" in res:
for folder in res["CommonPrefixes"]:
folder_name = folder["Prefix"].split("/")[-2] # folder name (YYYY-MM-DD)

try:
folder_date = datetime.strptime(folder_name, "%Y-%m-%d").date()
except ValueError:
logger.error(f"{folder["Prefix"]} - {folder_name} - could not be parsed to datetime object")
continue

if folder_date < retention_date:
# Delete the objects in this folder
res_objects = await client.list_objects_v2(Bucket=bucket, Prefix=folder["Prefix"])
if "Contents" in res_objects:
for obj in res_objects["Contents"]:
s3_key = obj["Key"]
await client.delete_object(Bucket=bucket, Key=s3_key)
logger.info(f"Deleted folder {folder_name}")


def calculate_wind_speed_result(yesterday: dict, raw_daily: dict) -> WindResult:
"""
Calculates new FWIs based on observed and forecast daily data from WF1.
Expand Down Expand Up @@ -368,6 +416,7 @@ async def calculate_critical_hours_by_zone(db_session: AsyncSession, header: dic
:param for_date: The date critical hours are being calculated for.
"""
critical_hours_by_zone_and_fuel_type = defaultdict(str, defaultdict(list))
critical_hours_inputs_by_zone: Dict[int, CriticalHoursIO] = {}
for zone_key in stations_by_zone.keys():
advisory_fuel_stats = await get_fuel_type_stats_in_advisory_area(db_session, zone_key, run_parameters_id)
fuel_types_by_area = get_fuel_types_by_area(advisory_fuel_stats)
Expand All @@ -383,10 +432,44 @@ async def calculate_critical_hours_by_zone(db_session: AsyncSession, header: dic
if len(critical_hours_by_fuel_type) > 0:
critical_hours_by_zone_and_fuel_type[zone_key] = critical_hours_by_fuel_type

critical_hours_input_output = CriticalHoursIO(
fuel_types_by_area=fuel_types_by_area,
wfwx_stations=wfwx_stations,
critical_hours_inputs=critical_hours_inputs,
critical_hours_by_zone_and_fuel_type=critical_hours_by_zone_and_fuel_type,
)
critical_hours_inputs_by_zone[zone_key] = critical_hours_input_output

await store_critical_hours_inputs_outputs(critical_hours_inputs_by_zone, for_date, run_parameters_id)

for zone_id, critical_hours_by_fuel_type in critical_hours_by_zone_and_fuel_type.items():
await save_critical_hours(db_session, zone_id, critical_hours_by_fuel_type, run_parameters_id)


async def store_critical_hours_inputs_outputs(critical_hours_data: Dict[int, CriticalHoursIO], for_date: date, run_parameters_id: int):
async with get_client() as (client, bucket):
await apply_retention_policy(client, bucket)

if critical_hours_data:
try:
json_data = json.dumps(to_jsonable_python(critical_hours_data), indent=2)

# json input/output stored by {for_date}/{run_parameter_id}_critical_hours.json
key = f"critical_hours/{for_date.isoformat()}/{run_parameters_id}_critical_hours.json"

logger.info(f"Writing {key} to s3")
(
await client.put_object(
Bucket=bucket,
Key=key,
Body=json_data,
ContentType="application/json",
)
)
except Exception as e:
logger.error(f"Error converting critical hours data to json - {e}")


async def calculate_critical_hours(run_type: RunType, run_datetime: datetime, for_date: date):
"""
Entry point for calculating critical hours.
Expand Down Expand Up @@ -434,7 +517,22 @@ async def calculate_critical_hours(run_type: RunType, run_datetime: datetime, fo
async def start_critical_hours(args: argparse.Namespace):
async with get_async_write_session_scope() as db_session:
run_parameters = await get_run_parameters_by_id(db_session, int(args.run_parameters_id))
await calculate_critical_hours(run_parameters[0].run_type, run_parameters[0].run_datetime, run_parameters[0].for_date)

if not run_parameters:
return

critical_hours_json = await get_critical_hours_json_from_s3(run_parameters)
if not critical_hours_json:
await calculate_critical_hours(run_parameters[0].run_type, run_parameters[0].run_datetime, run_parameters[0].for_date)
return

critical_hours_data = CriticalHoursIOByZone(critical_hours_by_zone=critical_hours_json).critical_hours_by_zone
zones = [int(args.fire_zone)] if args.fire_zone else critical_hours_data.keys()

for zone in zones:
zone_data = critical_hours_data.get(zone)
if zone_data:
calculate_critical_hours_by_fuel_type(zone_data.wfwx_stations, zone_data.critical_hours_inputs, zone_data.fuel_types_by_area, run_parameters[0].for_date)


def main():
Expand All @@ -443,6 +541,7 @@ def main():
logger.debug("Begin calculating critical hours.")
parser = argparse.ArgumentParser(description="Process critical hours from command line")
parser.add_argument("-r", "--run_parameters_id", help="The id of the run parameters of interest from the run_parameters table")
parser.add_argument("-z", "--fire_zone", help="Fire zone to process if there is data stored in s3")
args = parser.parse_args()

loop = asyncio.new_event_loop()
Expand Down
16 changes: 16 additions & 0 deletions api/app/auto_spatial_advisory/debug_critical_hours.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import json
from app.db.models.auto_spatial_advisory import RunParameters
from app.utils.s3 import get_client, object_exists_v2


async def get_critical_hours_json_from_s3(run_params: RunParameters):
async with get_client() as (client, bucket):
key = f"critical_hours/{run_params[0].for_date.isoformat()}/{run_params[0].id}_critical_hours.json"
exists = await object_exists_v2(key)
if exists:
res = await client.get_object(Bucket=bucket, Key=key)
body = await res["Body"].read()
json_content = json.loads(body)
return json_content
else:
print("Critical hours json not found")
13 changes: 6 additions & 7 deletions api/app/tests/auto_spatial_advisory/test_critical_hours.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
dirname = os.path.dirname(__file__)
dailies_fixture = os.path.join(dirname, "wf1-dailies.json")
hourlies_fixture = os.path.join(dirname, "wf1-hourlies.json")
mock_station = WFWXWeatherStation(wfwx_id="bb7cb089-286a-4734-e053-1d09228eeca8", code=169, name="UPPER FULTON", latitude=55.03395, longitude=-126.799667, elevation=900, zone_code=45)
mock_station = WFWXWeatherStation(wfwx_id="bb7cb089-286a-4734-e053-1d09228eeca8", code=169, name="UPPER FULTON", latitude=55.03395, longitude=-126.799667, elevation=900, zone_code="45")


def test_check_station_valid():
with open(dailies_fixture, "r") as dailies:
with open(dailies_fixture, "r") as dailies, open(hourlies_fixture, "r") as hourlies:
raw_dailies = json.load(dailies)["_embedded"]["dailies"]
dailies_by_station_id = {raw_dailies[0]["stationId"]: raw_dailies[0]}
hourlies_by_station_code = {raw_dailies[0]["stationData"]["stationCode"]: []}
hourlies_by_station_code = json.load(hourlies)
assert (
check_station_valid(
mock_station,
Expand All @@ -39,12 +39,12 @@ def test_check_station_invalid_missing_indices(index_key):
:param index_key: DMC, DC or FFMC key for WF1 daily
"""
with open(dailies_fixture, "r") as dailies:
with open(dailies_fixture, "r") as dailies, open(hourlies_fixture, "r") as hourlies:
raw_dailies = json.load(dailies)["_embedded"]["dailies"]
daily = raw_dailies[0]
daily[index_key] = None
dailies_by_station_id = {raw_dailies[0]["stationId"]: daily}
hourlies_by_station_code = {raw_dailies[0]["stationData"]["stationCode"]: []}
hourlies_by_station_code = json.load(hourlies)
assert (
check_station_valid(
mock_station,
Expand All @@ -61,9 +61,8 @@ def test_check_station_invalid_missing_daily():
When a station daily is missing for a station it is invalid
"""
with open(hourlies_fixture, "r") as hourlies:
raw_hourlies = json.load(hourlies)["_embedded"]["hourlies"]
dailies_by_station_id = {}
hourlies_by_station_code = {raw_hourlies[0]["stationData"]["stationCode"]: raw_hourlies[0]}
hourlies_by_station_code = json.load(hourlies)
assert (
check_station_valid(
mock_station,
Expand Down
Loading

0 comments on commit 7b8c7b9

Please sign in to comment.