Skip to content

Commit

Permalink
Save each hazard detail as a instance
Browse files Browse the repository at this point in the history
  • Loading branch information
sudan45 authored and Rup-Narayan-Rajbanshi committed Feb 14, 2025
1 parent 04aa30e commit 2922d89
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 59 deletions.
15 changes: 13 additions & 2 deletions apps/etl/extraction/sources/base/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,24 @@ def store_extraction_data(
return gdacs_instance


def store_pdc_exposure_data(response, source=None, validate_source_func=None, instance_id=None, parent_id=None):
def store_pdc_exposure_data(
response, source=None, validate_source_func=None, instance_id=None, parent_id=None, hazard_type=None
):
file_extension = "json"
file_name = f"{instance_id}pdc.{file_extension}"
data = json.dumps(response).encode("utf-8")

instance = ExtractionData.objects.create(
parent_id=parent_id, source=source, attempt_no=1, resp_code=200, status=ExtractionData.Status.SUCCESS
parent_id=parent_id,
source=source,
attempt_no=1,
resp_code=200,
status=ExtractionData.Status.SUCCESS,
hazard_type=hazard_type,
metadata={
"uuid": response["uuid"],
"exosure": str(response["exposure"].keys()),
},
)

content_file = ContentFile(data)
Expand Down
105 changes: 50 additions & 55 deletions apps/etl/extraction/sources/pdc/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,61 +10,49 @@
store_extraction_data,
store_pdc_exposure_data,
)
from apps.etl.models import ExtractionData
from apps.etl.models import ExtractionData, HazardType

logger = logging.getLogger(__name__)

HAZARD_TYPES = [
"AVALANCHE",
"DROUGHT",
"EARTHQUAKE",
"EXTREMETEMPERATURE",
"FLOOD",
"HIGHSURF",
"HIGHWIND",
"LANDSLIDE",
"SEVEREWEATHER",
"STORM",
"TORNADO",
"CYCLONE",
"TSUNAMI",
"VOLCANO",
"WILDFIRE",
"WINTERSTORM",
]

HAZARD_TYPE_MAP = {
"AVALANCHE": HazardType.OTHER,
"DROUGHT": HazardType.DROUGHT,
"EARTHQUAKE": HazardType.EARTHQUAKE,
"EXTREMETEMPERATURE": HazardType.EXTREME_TEMPERATURE,
"FLOOD": HazardType.FLOOD,
"HIGHSURF": HazardType.OTHER,
"HIGHWIND": HazardType.WIND,
"LANDSLIDE": HazardType.LANDSLIDE,
"SEVEREWEATHER": HazardType.OTHER,
"STORM": HazardType.STORM,
"TORNADO": HazardType.TORNADO,
"CYCLONE": HazardType.CYCLONE,
"TSUNAMI": HazardType.TSUNAMI,
"VOLCANO": HazardType.VOLCANO,
"WILDFIRE": HazardType.WILDFIRE,
"WINTERSTORM": HazardType.OTHER,
}


@shared_task(bind=True, max_retries=3, default_retry_delay=5)
def get_exposure_data(self, hazard_list, **kwargs):
timestamps = []
exposure_data = {}
for count, hazard in enumerate(hazard_list[190:220]):
print(f'Progress: {count+1}/{len(hazard_list) } hazard uuid is {hazard["uuid"]}')
# for hazard in hazard_list:
# TODO we need to handle category id RESPONSE in future for now ignore this
if hazard["type_ID"] not in HAZARD_TYPES:
print("Skipping EXERCISE and RESPONSE category")
continue
exposure_url = f"{settings.PDC_BASE_URL}/hazard/{hazard['uuid']}/exposure"
response = requests.get(
url=exposure_url,
def get_hazard_details(self, hazard_detail, **kwargs):
r = requests.get(
f"{settings.PDC_BASE_URL}/hazard/{hazard_detail['uuid']}/exposure",
headers={"Authorization": f"Bearer {settings.PDC_AUTHORIZATION_KEY}"},
)
exposure = {}
for exposure_id in r.json():
print(f"Progress: {exposure_id}")
detail_url = f"{settings.PDC_BASE_URL}/hazard/{hazard_detail['uuid']}/exposure/{exposure_id}"
detail_response = requests.get(
url=detail_url,
headers={"Authorization": f"Bearer {settings.PDC_AUTHORIZATION_KEY}"},
)
if response.status_code == 200:
for count, exposure_id in enumerate(response.json()):
# for exposure_id in response.json():
print(
f'Progress: {count+1}/{len(response.json())} exposure id is {exposure_id} hazard uuid is {hazard["uuid"]} hazard event is {hazard["type_ID"]}'
)
detail_url = f"{settings.PDC_BASE_URL}/hazard/{hazard['uuid']}/exposure/{exposure_id}"
detail_response = requests.get(
url=detail_url,
headers={"Authorization": f"Bearer {settings.PDC_AUTHORIZATION_KEY}"},
)
timestamps.append({exposure_id: detail_response.json()})
exposure_data[hazard["uuid"]] = timestamps
return exposure_data
print(detail_response.text)
if detail_response.status_code == 200:
# exposure.update({exposure_id: detail_response.json()})
exposure[exposure_id] = detail_response.json()
return {**hazard_detail, "exposure": exposure}


@shared_task(bind=True, max_retries=3, default_retry_delay=5)
Expand Down Expand Up @@ -117,13 +105,20 @@ def import_hazard_data(self, **kwargs):
)
if pdc_instance.resp_code == 200:
response_data = json.loads(pdc_instance.resp_data.read())
data = get_exposure_data(response_data)
store_pdc_exposure_data(
response=data,
source=ExtractionData.Source.PDC,
validate_source_func=None,
parent_id=pdc_instance.id,
instance_id=pdc_instance.id,
)
for hazard in response_data:
if hazard["type_ID"] not in HAZARD_TYPE_MAP.keys():
continue
hazard_detail = get_hazard_details(hazard_detail=hazard)
store_pdc_exposure_data(
response=hazard_detail,
source=ExtractionData.Source.PDC,
validate_source_func=None,
parent_id=pdc_instance.id,
instance_id=pdc_instance.id,
hazard_type=HAZARD_TYPE_MAP[hazard["type_ID"]],
)

logger.info("PDC data imported sucessfully")
return pdc_instance.id

logger.info("PDC data import failed")
1 change: 1 addition & 0 deletions apps/etl/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def extract_desinventar_data():
def extract_emdat_data():
call_command("extract_emdat_data")


@shared_task
def extract_pdc_data():
call_command("extract_pdc_data")
Expand Down
5 changes: 3 additions & 2 deletions apps/etl/transform/sources/pdc.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from main.celery import app

from .handler import BaseTransformerHandler


Expand All @@ -7,8 +9,7 @@ class PDCTransformHandler(BaseTransformerHandler):

@classmethod
def get_schema_data(cls, extraction_obj: ExtractionData):
with extraction_obj.resp_data.open() as file_data:
data = file_data.read()
data = extraction_obj.resp_data.file.url

return cls.transformer_schema(source_url=extraction_obj.url, data=data)

Expand Down

0 comments on commit 2922d89

Please sign in to comment.