Skip to content

Commit

Permalink
Requester refactor (#375)
Browse files Browse the repository at this point in the history
* Improvements to error handling

Signed-off-by: Joe Moorhouse <[email protected]>

* Refactor requester for FastAPI / make serialization optional

Signed-off-by: Joe Moorhouse <[email protected]>

* Linting

Signed-off-by: Joe Moorhouse <[email protected]>

* No calc details for empty impact

Signed-off-by: Joe Moorhouse <[email protected]>

---------

Signed-off-by: Joe Moorhouse <[email protected]>
  • Loading branch information
joemoorhouse authored Dec 28, 2024
1 parent 0cd4085 commit fe55378
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 92 deletions.
72 changes: 40 additions & 32 deletions src/physrisk/api/v1/common.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
from typing import Dict, List, Optional, Sequence, Union
from typing import Annotated, Dict, List, Optional, Sequence, Union

import numpy as np
from pydantic import BaseModel, ConfigDict, Field
import numpy.typing as npt
from pydantic import BaseModel, ConfigDict, Field, PlainSerializer


class TypedArray(np.ndarray):
@classmethod
def __get_validators__(cls):
yield cls.validate_type
# def deserialize_list(list: list) -> npt.NDArray:
# """Deserialize a list into a numpy array."""
# return np.array(list)

@classmethod
def validate_type(cls, val):
return np.array(val, dtype=cls.inner_type) # type: ignore

def serialize_array(array: npt.NDArray) -> str:
"""Serialize a numpy array into a list."""
return array.tolist()

class ArrayMeta(type):
def __getitem__(cls, t):
return type("Array", (TypedArray,), {"inner_type": t})


class Array(np.ndarray, metaclass=ArrayMeta):
pass
NDArray = Annotated[
npt.NDArray,
# AfterValidator(deserialize_list),
PlainSerializer(serialize_array, return_type=list),
]


class Asset(BaseModel):
Expand All @@ -32,7 +31,7 @@ class Asset(BaseModel):
(or equivalent value, e.g. by reducing expenses or increasing sales).
"""

model_config = ConfigDict(extra="allow")
# model_config = ConfigDict(extra="allow")

asset_class: str = Field(
description="name of asset class; corresponds to physrisk class names, e.g. PowerGeneratingAsset"
Expand All @@ -44,14 +43,29 @@ class Asset(BaseModel):
)
location: Optional[str] = Field(
None,
description="Location (e.g. Africa, Asia, Europe, Global, Oceania, North America, South America)",
description="Location (e.g. Africa, Asia, Europe, North America, Oceania, South America); ",
)
capacity: Optional[float] = Field(None, description="Power generation capacity")
attributes: Optional[Dict[str, str]] = Field(
None,
description="Bespoke attributes (e.g. number of storeys, structure type, occupancy type)",
)

model_config = {
"extra": "allow",
"json_schema_extra": {
"examples": [
{
"asset_class": "RealEstateAsset",
"type": "Buildings/Industrial",
"latitude": 22.2972,
"longitude": 91.8062,
"location": "Asia",
}
]
},
}


class Assets(BaseModel):
"""Defines a collection of assets."""
Expand Down Expand Up @@ -109,8 +123,8 @@ class ExceedanceCurve(BaseModel):
"""General exceedance curve (e.g. hazazrd, impact)."""

model_config = ConfigDict(arbitrary_types_allowed=True)
values: np.ndarray = Field(default_factory=lambda: np.zeros(10), description="")
exceed_probabilities: np.ndarray = Field(
values: NDArray = Field(default_factory=lambda: np.zeros(10), description="")
exceed_probabilities: NDArray = Field(
default_factory=lambda: np.zeros(10), description=""
)

Expand All @@ -119,22 +133,18 @@ class Distribution(BaseModel):
"""General exceedance curve (e.g. hazazrd, impact)."""

model_config = ConfigDict(arbitrary_types_allowed=True)
bin_edges: np.ndarray = Field(default_factory=lambda: np.zeros(11), description="")
probabilities: np.ndarray = Field(
default_factory=lambda: np.zeros(10), description=""
)
bin_edges: NDArray = Field(default_factory=lambda: np.zeros(11), description="")
probabilities: NDArray = Field(default_factory=lambda: np.zeros(10), description="")


class HazardEventDistrib(BaseModel):
"""Intensity curve of an acute hazard."""

model_config = ConfigDict(arbitrary_types_allowed=True)
intensity_bin_edges: np.ndarray = Field(
default_factory=lambda: np.zeros(10), description=""
)
probabilities: np.ndarray = Field(
intensity_bin_edges: NDArray = Field(
default_factory=lambda: np.zeros(10), description=""
)
probabilities: NDArray = Field(default_factory=lambda: np.zeros(10), description="")
path: List[str] = Field([], description="Path to the hazard indicator data source.")


Expand Down Expand Up @@ -166,12 +176,10 @@ class VulnerabilityDistrib(BaseModel):
"""Defines a vulnerability matrix."""

model_config = ConfigDict(arbitrary_types_allowed=True)
intensity_bin_edges: np.ndarray = Field(
default_factory=lambda: np.zeros(10), description=""
)
impact_bin_edges: np.ndarray = Field(
intensity_bin_edges: NDArray = Field(
default_factory=lambda: np.zeros(10), description=""
)
prob_matrix: np.ndarray = Field(
impact_bin_edges: NDArray = Field(
default_factory=lambda: np.zeros(10), description=""
)
prob_matrix: NDArray = Field(default_factory=lambda: np.zeros(10), description="")
8 changes: 6 additions & 2 deletions src/physrisk/data/pregenerated_hazard_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,13 @@ def _get_hazard_data_batch(
try:
hazard_data_provider = self.hazard_data_providers[hazard_type]
except Exception:
raise Exception(
no_provider_err = Exception(
f"no hazard data provider for hazard type {hazard_type.__name__}."
)
for req in batch:
responses[req] = HazardDataFailedResponse(no_provider_err)
return

if indicator_data(hazard_type, indicator_id) == IndicatorData.EVENT:
intensities, return_periods, units, path = (
hazard_data_provider.get_data(
Expand Down Expand Up @@ -138,7 +142,7 @@ def _get_hazard_data_batch(
)
except Exception as err:
# e.g. the requested data is unavailable
for _, req in enumerate(batch):
for req in batch:
failed_response = HazardDataFailedResponse(err)
responses[req] = failed_response
failures.append(failed_response)
Expand Down
5 changes: 5 additions & 0 deletions src/physrisk/kernel/hazard_event_distrib.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,8 @@ def prob(self) -> np.ndarray:
@property
def path(self) -> List[str]:
return self.__path


class EmptyHazardEventDistrib(HazardEventDistrib):
def __init__(self):
pass
9 changes: 5 additions & 4 deletions src/physrisk/kernel/impact_distrib.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ def __init__(
path: List[str],
impact_type: ImpactType = ImpactType.damage,
):
"""Create a new asset event distribution.
"""Create a new impact distribution.
Args:
hazard_type: Type of hazard.
impact_bins: non-decreasing impact bin bounds
prob: probabilities with size [len(intensity_bins) - 1]
path: path to the hazard indicator data source
impact_bins: Non-decreasing impact bin bounds.
prob: Probabilities with size [len(impact_bins) - 1].
path: Path to the hazard indicator data source.
impact_type: Type of impact: damage or disruption.
"""
self.__hazard_type = hazard_type
self.__impact_bins = np.array(impact_bins)
Expand Down
5 changes: 5 additions & 0 deletions src/physrisk/kernel/vulnerability_distrib.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,8 @@ def intensity_bins(self) -> np.ndarray:
@property
def prob_matrix(self) -> np.ndarray:
return self._prob_matrix


class EmptyVulnerabilityDistrib(VulnerabilityDistrib):
def __init__(self):
pass
6 changes: 4 additions & 2 deletions src/physrisk/kernel/vulnerability_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
from scipy import stats

import physrisk.data.static.vulnerability
from physrisk.kernel.impact_distrib import ImpactDistrib, ImpactType
from physrisk.kernel.impact_distrib import EmptyImpactDistrib, ImpactDistrib, ImpactType

from ..api.v1.common import VulnerabilityCurve, VulnerabilityCurves
from .assets import Asset
from .curve import ExceedanceCurve
from .hazard_event_distrib import HazardEventDistrib
from .hazard_model import HazardDataRequest, HazardDataResponse, HazardEventDataResponse
from .vulnerability_distrib import VulnerabilityDistrib
from .vulnerability_distrib import EmptyVulnerabilityDistrib, VulnerabilityDistrib
from .vulnerability_matrix_provider import VulnMatrixProvider

PLUGINS = dict() # type:ignore
Expand Down Expand Up @@ -187,6 +187,8 @@ def get_impact_details(
event_data_responses: the responses to the requests made by get_data_requests, in the same order.
"""
vulnerability_dist, event_dist = self.get_distributions(asset, data_responses)
if isinstance(vulnerability_dist, EmptyVulnerabilityDistrib):
return EmptyImpactDistrib(), vulnerability_dist, event_dist
impact_prob = vulnerability_dist.prob_matrix.T @ event_dist.prob
return (
ImpactDistrib(
Expand Down
100 changes: 57 additions & 43 deletions src/physrisk/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import json
from importlib import import_module
from pathlib import PosixPath
from typing import Any, Dict, List, Optional, Sequence, Type, cast
from typing import Any, Dict, List, Optional, Sequence, Type, Union, cast

import numpy as np

import physrisk.data.image_creator
import physrisk.data.static.example_portfolios
from physrisk.api.v1.common import Distribution, ExceedanceCurve, VulnerabilityDistrib
from physrisk.api.v1.exposure_req_resp import (
Expand Down Expand Up @@ -100,67 +101,76 @@ def __init__(
self.zarr_reader = reader

def get(self, *, request_id, request_dict):
# the hazard model can depend

if request_id == "get_hazard_data":
request = HazardDataRequest(**request_dict)
hazard_model = self.hazard_model_factory.hazard_model(
interpolation=request.interpolation,
provider_max_requests=request.provider_max_requests,
)
return json.dumps(
_get_hazard_data(request, hazard_model=hazard_model).model_dump()
) # , allow_nan=False)
self.get_hazard_data(request).model_dump() # , allow_nan=False)
)
elif request_id == "get_hazard_data_availability":
request = HazardAvailabilityRequest(**request_dict)
return json.dumps(
_get_hazard_data_availability(
request, self.inventory, self.colormaps
).model_dump()
)
return json.dumps(self.get_hazard_data_availability(request).model_dump())
elif request_id == "get_hazard_data_description":
request = HazardDescriptionRequest(**request_dict)
return json.dumps(_get_hazard_data_description(request).dict())
return json.dumps(self.get_hazard_data_description(request).model_dump())
elif request_id == "get_asset_exposure":
request = AssetExposureRequest(**request_dict)
hazard_model = self.hazard_model_factory.hazard_model(
interpolation=request.calc_settings.hazard_interp,
provider_max_requests=request.provider_max_requests,
)
return json.dumps(
_get_asset_exposures(request, hazard_model).model_dump(
exclude_none=True
)
self.get_asset_exposures(request).model_dump(exclude_none=True)
)
elif request_id == "get_asset_impact":
request = AssetImpactRequest(**request_dict)
hazard_model = self.hazard_model_factory.hazard_model(
interpolation=request.calc_settings.hazard_interp,
provider_max_requests=request.provider_max_requests,
)
vulnerability_models = (
self.vulnerability_models_factory.vulnerability_models()
)
measure_calculators = self.measures_factory.calculators(request.use_case_id)
return dumps(
_get_asset_impacts(
request, hazard_model, vulnerability_models, measure_calculators
).model_dump()
)
return dumps(self.get_asset_impacts(request).model_dump())
elif request_id == "get_example_portfolios":
return dumps(_get_example_portfolios())
else:
raise ValueError(f"request type '{request_id}' not found")

def get_image(self, *, request_dict):
def get_hazard_data(self, request: HazardDataRequest):
hazard_model = self.hazard_model_factory.hazard_model(
interpolation=request.interpolation,
provider_max_requests=request.provider_max_requests,
)
return _get_hazard_data(request, hazard_model=hazard_model)

def get_hazard_data_availability(self, request: HazardAvailabilityRequest):
return _get_hazard_data_availability(request, self.inventory, self.colormaps)

def get_hazard_data_description(self, request: HazardDescriptionRequest):
return _get_hazard_data_description(request, self.inventory_reader)

def get_asset_exposures(self, request: AssetExposureRequest):
hazard_model = self.hazard_model_factory.hazard_model(
interpolation=request.calc_settings.hazard_interp,
provider_max_requests=request.provider_max_requests,
)
return _get_asset_exposures(request, hazard_model)

def get_asset_impacts(self, request: AssetImpactRequest) -> AssetImpactResponse:
hazard_model = self.hazard_model_factory.hazard_model(
interpolation=request.calc_settings.hazard_interp,
provider_max_requests=request.provider_max_requests,
)
vulnerability_models = self.vulnerability_models_factory.vulnerability_models()
measure_calculators = self.measures_factory.calculators(request.use_case_id)
return _get_asset_impacts(
request, hazard_model, vulnerability_models, measure_calculators
)

def get_image(self, request_or_dict: Union[HazardImageRequest, Dict]):
if isinstance(request_or_dict, Dict):
request = HazardImageRequest(**request_or_dict)
else:
request = request_or_dict

inventory = self.inventory
zarr_reader = self.zarr_reader
request = HazardImageRequest(**request_dict)

if not _read_permitted(
request.group_ids, inventory.resources[request.resource]
):
raise PermissionError()
model = inventory.resources[request.resource]
assert model.map is not None
len(PosixPath(model.map.path).parts)
path = (
str(PosixPath(model.path).with_name(model.map.path))
Expand All @@ -170,13 +180,17 @@ def get_image(self, *, request_dict):
colormap = (
request.colormap
if request.colormap is not None
else model.map.colormap.name
else (model.map.colormap.name if model.map.colormap is not None else "None")
)
creator = ImageCreator(zarr_reader) # store=ImageCreator.test_store(path))
return creator.convert(
path,
colormap=colormap,
tile=request.tile,
tile=None
if request.tile is None
else physrisk.data.image_creator.Tile(
request.tile.x, request.tile.y, request.tile.z
),
min_value=request.min_value,
max_value=request.max_value,
)
Expand Down Expand Up @@ -213,7 +227,7 @@ def default(self, obj):


def dumps(dict):
return json.dumps(dict, cls=NumpyArrayEncoder)
return json.dumps(dict) # , cls=NumpyArrayEncoder)


def _read_permitted(group_ids: List[str], resource: HazardResource):
Expand Down Expand Up @@ -472,6 +486,9 @@ def compile_asset_impacts(
ordered_impacts[asset] = []
for k, value in impacts.items():
for v in value:
if isinstance(v.impact, EmptyImpactDistrib):
continue

if include_calc_details:
if v.event is not None and v.vulnerability is not None:
hazard_exceedance = v.event.to_exceedance_curve()
Expand All @@ -496,9 +513,6 @@ def compile_asset_impacts(
else:
calc_details = None

if isinstance(v.impact, EmptyImpactDistrib):
continue

impact_exceedance = v.impact.to_exceedance_curve()
key = APIImpactKey(
hazard_type=k.hazard_type.__name__,
Expand Down
Loading

0 comments on commit fe55378

Please sign in to comment.