From bd546065eb150f14103481f54c3bf5629f1711b1 Mon Sep 17 00:00:00 2001 From: rup-narayan-rajbanshi Date: Wed, 12 Feb 2025 17:15:11 +0545 Subject: [PATCH] Add impact items for ifrc-event transformer --- pystac_monty/extension.py | 2 + pystac_monty/sources/ifrc.py | 166 ----------------- pystac_monty/sources/ifrc_events.py | 269 +++++++++++++++++++++++++++ tests/extensions/test_ifrc_events.py | 15 +- 4 files changed, 277 insertions(+), 175 deletions(-) delete mode 100644 pystac_monty/sources/ifrc.py create mode 100644 pystac_monty/sources/ifrc_events.py diff --git a/pystac_monty/extension.py b/pystac_monty/extension.py index b18a33e..4018c8a 100644 --- a/pystac_monty/extension.py +++ b/pystac_monty/extension.py @@ -160,6 +160,7 @@ class MontyImpactType(StringEnum): TOTAL_AFFECTED = "affected_total" DIRECTLY_AFFECTED = "affected_direct" INDIRECTLY_AFFECTED = "affected_indirect" + POTENTIALLY_AFFECTED = "potentially_affected" DEATH = "death" MISSING = "missing" INJURED = "injured" @@ -178,6 +179,7 @@ class MontyImpactType(StringEnum): EXTERNALLY_DISPLACED_PERSONS = "displaced_external" TOTAL_DISPLACED_PERSONS = "displaced_total" ALERTSCORE = "alertscore" + HIGHEST_RISK = "highest_risk" class MontyImpactTypeLabel(Mapping): diff --git a/pystac_monty/sources/ifrc.py b/pystac_monty/sources/ifrc.py deleted file mode 100644 index 2843d5b..0000000 --- a/pystac_monty/sources/ifrc.py +++ /dev/null @@ -1,166 +0,0 @@ -import json -from datetime import datetime -from typing import Any, Dict, List -from pystac import Collection, Item -from pystac_monty.hazard_profiles import MontyHazardProfiles -import pytz - -import requests - -from pystac_monty.extension import ( - MontyExtension, -) - -from pystac_monty.hazard_profiles import HazardProfiles -from pystac_monty.geocoding import MontyGeoCoder -from pystac_monty.sources.common import MontyDataSource - -STAC_EVENT_ID_PREFIX = "ifrcevent-event-" -STAC_IMPACT_ID_PREFIX = "ifrcevent-impact-" - - -class IFRCEventDataSource(MontyDataSource): - def __init__(self, source_url: str, data: str): - super().__init__(source_url, data) - self.source_url = source_url - self.data = json.loads(data)[:5] - - def get_data(self) -> dict: - """Get the event detail data.""" - return self.data - - -class IFRCEventTransformer(): - - ifrc_events_collection_id = "ifrcevent-events" - ifrc_events_collection_url = "https://raw.githubusercontent.com/IFRCGo/monty-stac-extension/refs/heads/feature/collection-ifrc-event/examples/ifrcevent-events/ifrcevent_events.json" - ifrc_hazards_collection_id = "ifrcevent-hazards" - ifrc_hazards_collection_url = "https://raw.githubusercontent.com/IFRCGo/monty-stac-extension/refs/heads/feature/collection-ifrc-event/examples/ifrcevent-hazards/ifrcevent_hazards.json" - ifrc_impacts_collection_id = "ifrcevent-impacts" - ifrc_impacts_collection_url = "https://raw.githubusercontent.com/IFRCGo/monty-stac-extension/refs/heads/feature/collection-ifrc-event/examples/ifrcevent-impacts/ifrcevent_impacts.json" - hazard_profiles = MontyHazardProfiles() - - def __init__(self, data: IFRCEventDataSource, geocoder: MontyGeoCoder): - self.data = data - self.geocoder = geocoder - if not self.geocoder: - raise ValueError("Geocoder is required for IFRC events transformer") - - def get_event_collection(self) -> Collection: - """Get event collection""" - response = requests.get(self.ifrc_events_collection_url) - collection_dict = json.loads(response.text) - return Collection.from_dict(collection_dict) - - def make_items(self) -> List[Item]: - """Create items""" - items = [] - - event_items = self.make_source_event_items() - items.extend(event_items) - - # impact_items = self.make_impact_items() - # items.extend(impact_items) - - return items - - def make_source_event_items(self) -> List[Item]: - """Create ifrc event item""" - items = [] - ifrc_data: List[Dict[str, Any]] = self.data.get_data() - # if not ifrc_data: - # return [] - - for data in ifrc_data: - item = self.make_source_event_item(data=data) - items.append(item) - return items - - def make_source_event_item(self, data: dict) -> Item: - """Create an event item""" - geometry = None - bbox = None - geom_data = self.geocoder.get_geometry_by_country_name(data["countries"][0]["iso3"]) - - # Filter out relevant disaster types - monty_accepted_disaster_types = { - "Earthquake", "Cyclone", "Volcanic Eruption", "Tsunami", "Flood", - "Cold Wave", "Fire", "Heat Wave", "Drought", "Storm Surge", - "Landslide", "Flash Flood" - } - - # if data["dtype"]["name"] not in monty_accepted_disaster_types: - # return [] - - # if data["appeals"]['atype'] not in {0, 1}: - # return [] - - if geom_data: - geometry = geom_data["geometry"] - bbox = geom_data["bbox"] - - start_date = datetime.fromisoformat(data["disaster_start_date"]) - # Create item - item = Item( - id=f"{STAC_EVENT_ID_PREFIX}{data["id"]}", - geometry=geometry, - bbox=bbox, - datetime=start_date, - properties={ - "title": data["name"], - "description": data["summary"] - }, - ) - - # Add Monty extension - MontyExtension.add_to(item) - monty = MontyExtension.ext(item) - monty.episode_number = 1 # IFRC DREF doesn't have episodes - monty.hazard_codes = self.map_ifrc_to_hazard_codes(data["dtype"]["name"]) - # monty.country_codes = data["country"]["iso3"] - monty.country_codes = [country["iso3"] for country in data["countries"]] - # monty.compute_and_set_correlation_id(hazard_profiles=self.hazard_profiles) - - # Set collection and roles - item.set_collection(self.get_event_collection()) - item.properties["roles"] = ["source", "event"] - - return item - - def map_ifrc_to_hazard_codes(self, classification_key: str) -> List[str]: - """ - Map IFRC DREF & EA classification key to UNDRR-ISC 2020 Hazard codes - - Args: - classification_key: dtype name (e.g., 'Flood') - - Returns: - List of UNDRR-ISC hazard codes - """ - - # if not classification_key: - # return [] - - key = classification_key.lower() - - # IFRC DREF hazards classification mapping to UNDRR-ISC codes - mapping = { - "Earthquake": ["GH0001", "GH0002", "GH0003", "GH0004", "GH0005"], - "Cyclone": ["MH0030", "MH0031", "MH0032"], - "Volcanic Eruption": ["GH009", "GH0013", "GH0014", "GH0015", "GH0016"], - "Tsunami": ["MH0029", "GH0006"], - "Flood": ["FL"], # General flood - "Cold Wave": "MH0040", - "Fire": ["FR"], - "Heat Wave": ["MH0047"], - "Drought": ["MH0035"], - "Storm Surge": ["MH0027"], - "Landslide": ["GH0007"], - "Flash Flood": ["MH0006"], - "Epidemic": ["EP"], # General epidemic - } - - if key in mapping: - return mapping[key] - - return [] diff --git a/pystac_monty/sources/ifrc_events.py b/pystac_monty/sources/ifrc_events.py new file mode 100644 index 0000000..a2b287c --- /dev/null +++ b/pystac_monty/sources/ifrc_events.py @@ -0,0 +1,269 @@ +import json +import requests +from datetime import datetime +from typing import Any, Dict, List +from pystac import Collection, Item +from pystac_monty.extension import ( + ImpactDetail, + MontyEstimateType, + MontyExtension, + MontyImpactExposureCategory, + MontyImpactType, +) +from pystac_monty.geocoding import MontyGeoCoder +from pystac_monty.hazard_profiles import MontyHazardProfiles +from pystac_monty.sources.common import MontyDataSource + +STAC_EVENT_ID_PREFIX = "ifrcevent-event-" +STAC_IMPACT_ID_PREFIX = "ifrcevent-impact-" + + +class IFRCEventDataSource(MontyDataSource): + def __init__(self, source_url: str, data: str): + super().__init__(source_url, data) + self.source_url = source_url + self.data = json.loads(data) + + def get_data(self) -> dict: + """Get the event detail data.""" + return self.data + + +class IFRCEventTransformer: + ifrcevent_events_collection_id = "ifrcevent-events" + ifrcevent_events_collection_url = "https://raw.githubusercontent.com/IFRCGo/monty-stac-extension/refs/heads/feature/collection-ifrc-event/examples/ifrcevent-events/ifrcevent_events.json" + ifrcevent_impacts_collection_id = "ifrcevent-impacts" + ifrcevent_impacts_collection_url = "https://raw.githubusercontent.com/IFRCGo/monty-stac-extension/refs/heads/feature/collection-ifrc-event/examples/ifrcevent-impacts/ifrcevent_impacts.json" + + hazard_profiles = MontyHazardProfiles() + + def __init__(self, data: IFRCEventDataSource, geocoder: MontyGeoCoder): + self.data = data + self.geocoder = geocoder + if not self.geocoder: + raise ValueError("Geocoder is required for IFRC events transformer") + + def get_event_collection(self) -> Collection: + """Get event collection""" + response = requests.get(self.ifrcevent_events_collection_url) + collection_dict = json.loads(response.text) + return Collection.from_dict(collection_dict) + + def get_impact_collection(self) -> Collection: + """Get event collection""" + response = requests.get(self.ifrcevent_impacts_collection_url) + collection_dict = json.loads(response.text) + return Collection.from_dict(collection_dict) + + def make_items(self) -> List[Item]: + """Create items""" + items = [] + + event_items = self.make_source_event_items() + items.extend(event_items) + + impact_items = self.make_impact_items() + items.extend(impact_items) + + return items + + def make_source_event_items(self) -> List[Item]: + """Create ifrc event item""" + items = [] + ifrc_data: List[Dict[str, Any]] = self.data.get_data() + + for data in ifrc_data: + item = self.make_source_event_item(data=data) + items.append(item) + return items + + def make_source_event_item(self, data: dict) -> Item: + """Create an event item""" + geometry = None + bbox = None + geom_data = self.geocoder.get_geometry_by_country_name(data["countries"][0]["name"]) + + + if geom_data: + geometry = geom_data["geometry"] + bbox = geom_data["bbox"] + + start_date = datetime.fromisoformat(data["disaster_start_date"]) + # Create item + item = Item( + id=f"{STAC_EVENT_ID_PREFIX}{data["id"]}", + geometry=geometry, + bbox=bbox, + datetime=start_date, + properties={ + "title": data["name"], + "description": data.get("summary") if data.get("summary") != "" else "NA" + }, + ) + + # Add Monty extension + MontyExtension.add_to(item) + monty = MontyExtension.ext(item) + monty.episode_number = 1 # IFRC DREF doesn't have episodes + monty.hazard_codes = self.map_ifrc_to_hazard_codes(data["dtype"]["name"]) + monty.country_codes = [country["iso3"] for country in data["countries"]] + monty.compute_and_set_correlation_id(hazard_profiles=self.hazard_profiles) + + # Set collection and roles + item.set_collection(self.get_event_collection()) + item.properties["roles"] = ["source", "event"] + + return item + + def map_ifrc_to_hazard_codes(self, classification_key: str) -> List[str]: + """ + Map IFRC DREF & EA classification key to UNDRR-ISC 2020 Hazard codes + + Args: + classification_key: dtype name (e.g., 'Flood') + + Returns: + List of UNDRR-ISC hazard codes + """ + + # IFRC DREF hazards classification mapping to UNDRR-ISC codes + mapping = { + "Earthquake": ["GH0001", "GH0002", "GH0003", "GH0004", "GH0005"], + "Cyclone": ["MH0030", "MH0031", "MH0032"], + "Volcanic Eruption": ["GH009", "GH0013", "GH0014", "GH0015", "GH0016"], + "Tsunami": ["MH0029", "GH0006"], + "Flood": ["FL"], # General flood + "Cold Wave": "MH0040", + "Fire": ["FR"], + "Heat Wave": ["MH0047"], + "Drought": ["MH0035"], + "Storm Surge": ["MH0027"], + "Landslide": ["GH0007"], + "Flash Flood": ["MH0006"], + "Epidemic": ["nat-bio-epi-dis"], # General epidemic + } + + if classification_key in mapping: + return mapping[classification_key] + + return [] + + def make_impact_items(self) -> List[Item]: + """Create impact items""" + items = [] + + impact_field_category_map = { + ("num_dead", "gov_num_dead", "other_num_dead"): (MontyImpactExposureCategory.ALL_PEOPLE, MontyImpactType.DEATH), + ("num_displaced", "gov_num_displaced", "other_num_displaced"): ( + MontyImpactExposureCategory.ALL_PEOPLE, + MontyImpactType.TOTAL_DISPLACED_PERSONS, + ), + ("num_injured", "gov_num_injured", "other_num_injured"): ( + MontyImpactExposureCategory.ALL_PEOPLE, + MontyImpactType.INJURED, + ), + ("num_missing", "gov_num_missing", "other_num_missing"): ( + MontyImpactExposureCategory.ALL_PEOPLE, + MontyImpactType.MISSING, + ), + ("num_affected", "gov_num_affected", "other_num_affected"): ( + MontyImpactExposureCategory.ALL_PEOPLE, + MontyImpactType.TOTAL_AFFECTED, + ), + ("num_assisted", "gov_num_assisted", "other_num_assisted"): ( + MontyImpactExposureCategory.ALL_PEOPLE, + MontyImpactType.ASSISTED, + ), + ("num_potentially_affected", "gov_num_potentially_affected", "other_num_potentially_affected"): ( + MontyImpactExposureCategory.ALL_PEOPLE, + MontyImpactType.POTENTIALLY_AFFECTED, + ), + ("num_highest_risk", "gov_num_highest_risk", "other_num_highest_risk"): ( + MontyImpactExposureCategory.ALL_PEOPLE, + MontyImpactType.HIGHEST_RISK, + ), + } + + ifrcevent_data = self.check_and_get_ifrcevent_data() + + event_items = self.make_source_event_items() + for event_item, src_data in zip(event_items, ifrcevent_data): + impact_item = event_item.clone() + + for impact_field, (category, impact_type) in impact_field_category_map.items(): + startdate = datetime.fromisoformat(src_data["disaster_start_date"]) + + impact_item.id = f"{STAC_IMPACT_ID_PREFIX}-{src_data["id"]}" + impact_item.startdate = startdate + impact_item.properties["start_datetime"] = startdate.isoformat() + impact_item.properties["roles"] = ["source", "impact"] + impact_item.set_collection(self.get_impact_collection()) + + monty = MontyExtension.ext(impact_item) + + # only save impact value if not null + value = None + for field in impact_field: + if src_data["field_reports"][0][field]: # take impact value from latest field report + value = src_data["field_reports"][0][field] + break + + if not value: + continue + + monty.impact_detail = self.get_impact_details(category, impact_type, value) + + items.append(impact_item) + return items + + def get_impact_details(self, category, impact_type, value, unit=None): + """Returns the impact details""" + return ImpactDetail( + category=category, + type=impact_type, + value=value, + unit=unit, + estimate_type=MontyEstimateType.PRIMARY, + ) + + def check_accepted_disaster_types(self, disaster): + # Filter out relevant disaster types + monty_accepted_disaster_types = [ + "Earthquake", + "Cyclone", + "Volcanic Eruption", + "Tsunami", + "Flood", + "Cold Wave", + "Fire", + "Heat Wave", + "Drought", + "Storm Surge", + "Landslide", + "Flash Flood", + ] + return disaster in monty_accepted_disaster_types + + def check_and_get_ifrcevent_data(self) -> list[Any]: + """Validate the source fields""" + ifrcevent_data: List[Dict[str, Any]] = self.data.get_data() + + filtered_ifrcevent_data = [] + if not ifrcevent_data: + print(f"No IFRC-Event data found in {self.data.get_source_url()}") + return [] + + for item in ifrcevent_data: + if not item["appeals"]: + continue + + if item["appeals"][0]["atype"] not in {0, 1}: + continue + + if item.get("dtype"): + if not self.check_accepted_disaster_types(item.get("dtype")["name"]): + continue + + filtered_ifrcevent_data.append(item) + + return filtered_ifrcevent_data diff --git a/tests/extensions/test_ifrc_events.py b/tests/extensions/test_ifrc_events.py index fdf9608..a841ecd 100644 --- a/tests/extensions/test_ifrc_events.py +++ b/tests/extensions/test_ifrc_events.py @@ -2,16 +2,13 @@ import json from os import makedirs -from typing import List, TypedDict +from typing import List, TypedDict from unittest import TestCase from parameterized import parameterized -from pystac_monty.geocoding import MockGeocoder -from pystac_monty.sources.ifrc_events import ( - IfrcEventsDataSource, - IfrcEventsTransformer -) +from pystac_monty.geocoding import MockGeocoder +from pystac_monty.sources.ifrc_events import IfrcEventsDataSource, IfrcEventsTransformer from tests.conftest import get_data_file @@ -22,12 +19,12 @@ class IfrcEventsScenario(TypedDict): hurricane_beryl_data: IfrcEventsScenario = { "name": "Hurricane Beryl - Saint Vincent and the Grenadines, Grenada, Barbados and Jamaica", - "event_url": "https://alpha-1-api.ifrc-go.dev.datafriendlyspace.org/api/v2/event/7046/?format=json" + "event_url": "https://alpha-1-api.ifrc-go.dev.datafriendlyspace.org/api/v2/event/7046/?format=json", } food_security_data: IfrcEventsScenario = { "name": "Food Security", - "event_url": "https://alpha-1-api.ifrc-go.dev.datafriendlyspace.org/api/v2/event/2139/?format=json" + "event_url": "https://alpha-1-api.ifrc-go.dev.datafriendlyspace.org/api/v2/event/2139/?format=json", } @@ -37,7 +34,7 @@ def load_scenarios( transformers: List[IfrcEventsTransformer] = [] for scenario in scenarios: - event_url = scenario.get('event_url', None) + event_url = scenario.get("event_url", None) geocoder = MockGeocoder() if event_url is None: