Skip to content

Commit

Permalink
Update pdc transformation
Browse files Browse the repository at this point in the history
  • Loading branch information
sudan45 authored and Rup-Narayan-Rajbanshi committed Feb 14, 2025
1 parent 2922d89 commit b6c8eb8
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 45 deletions.
7 changes: 2 additions & 5 deletions apps/etl/extraction/sources/base/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def store_extraction_data(


def store_pdc_exposure_data(
response, source=None, validate_source_func=None, instance_id=None, parent_id=None, hazard_type=None
response, source=None, validate_source_func=None, instance_id=None, parent_id=None, hazard_type=None, metadata=None
):
file_extension = "json"
file_name = f"{instance_id}pdc.{file_extension}"
Expand All @@ -95,10 +95,7 @@ def store_pdc_exposure_data(
resp_code=200,
status=ExtractionData.Status.SUCCESS,
hazard_type=hazard_type,
metadata={
"uuid": response["uuid"],
"exosure": str(response["exposure"].keys()),
},
metadata=metadata,
)

content_file = ContentFile(data)
Expand Down
66 changes: 33 additions & 33 deletions apps/etl/extraction/sources/pdc/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
store_pdc_exposure_data,
)
from apps.etl.models import ExtractionData, HazardType
from apps.etl.transform.sources.pdc import PDCTransformHandler

logger = logging.getLogger(__name__)

Expand All @@ -20,7 +21,7 @@
"EARTHQUAKE": HazardType.EARTHQUAKE,
"EXTREMETEMPERATURE": HazardType.EXTREME_TEMPERATURE,
"FLOOD": HazardType.FLOOD,
"HIGHSURF": HazardType.OTHER,
# "HIGHSURF": HazardType.OTHER,
"HIGHWIND": HazardType.WIND,
"LANDSLIDE": HazardType.LANDSLIDE,
"SEVEREWEATHER": HazardType.OTHER,
Expand All @@ -35,24 +36,39 @@


@shared_task(bind=True, max_retries=3, default_retry_delay=5)
def get_hazard_details(self, hazard_detail, **kwargs):
r = requests.get(
f"{settings.PDC_BASE_URL}/hazard/{hazard_detail['uuid']}/exposure",
headers={"Authorization": f"Bearer {settings.PDC_AUTHORIZATION_KEY}"},
)
exposure = {}
for exposure_id in r.json():
print(f"Progress: {exposure_id}")
detail_url = f"{settings.PDC_BASE_URL}/hazard/{hazard_detail['uuid']}/exposure/{exposure_id}"
detail_response = requests.get(
url=detail_url,
def get_hazard_details(self, extraction_id, **kwargs):
instance_id = ExtractionData.objects.get(id=extraction_id)
response_data = json.loads(instance_id.resp_data.read())
for hazard in response_data:
if hazard["type_ID"] not in HAZARD_TYPE_MAP.keys():
continue
r = requests.get(
f"{settings.PDC_BASE_URL}/hazard/{hazard['uuid']}/exposure",
headers={"Authorization": f"Bearer {settings.PDC_AUTHORIZATION_KEY}"},
)
print(detail_response.text)
if detail_response.status_code == 200:
# exposure.update({exposure_id: detail_response.json()})
exposure[exposure_id] = detail_response.json()
return {**hazard_detail, "exposure": exposure}
for exposure_id in r.json():
if ExtractionData.objects.filter(
metadata__exposure_id=exposure_id,
source=ExtractionData.Source.PDC,
status=ExtractionData.Status.SUCCESS,
metadata__uuid=hazard["uuid"],
).exists():
continue
detail_url = f"{settings.PDC_BASE_URL}/hazard/{hazard['uuid']}/exposure/{exposure_id}"
detail_response = requests.get(
url=detail_url,
headers={"Authorization": f"Bearer {settings.PDC_AUTHORIZATION_KEY}"},
)
exposure_detail = store_pdc_exposure_data(
response=detail_response.json(),
source=ExtractionData.Source.PDC,
validate_source_func=None,
parent_id=instance_id.id,
hazard_type=HAZARD_TYPE_MAP.get(hazard["type_ID"]),
metadata={"exposure_id": exposure_id, "uuid": hazard["uuid"]},
)
PDCTransformHandler.task(exposure_detail)
return None


@shared_task(bind=True, max_retries=3, default_retry_delay=5)
Expand Down Expand Up @@ -103,22 +119,6 @@ def import_hazard_data(self, **kwargs):
validate_source_func=None,
instance_id=pdc_instance.id,
)
if pdc_instance.resp_code == 200:
response_data = json.loads(pdc_instance.resp_data.read())
for hazard in response_data:
if hazard["type_ID"] not in HAZARD_TYPE_MAP.keys():
continue
hazard_detail = get_hazard_details(hazard_detail=hazard)
store_pdc_exposure_data(
response=hazard_detail,
source=ExtractionData.Source.PDC,
validate_source_func=None,
parent_id=pdc_instance.id,
instance_id=pdc_instance.id,
hazard_type=HAZARD_TYPE_MAP[hazard["type_ID"]],
)

logger.info("PDC data imported sucessfully")
return pdc_instance.id

logger.info("PDC data import failed")
2 changes: 2 additions & 0 deletions apps/etl/load/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def send_post_request_to_stac_api(result, collection_id):

response = requests.post(url, json=result, headers={"Content-Type": "application/json"})
response.raise_for_status()
print(response)
return response
except requests.exceptions.RequestException as e:
print(f"Error posting data for {collection_id}: {e}")
Expand All @@ -34,6 +35,7 @@ def load_data(django_command: BaseCommand | None = None):
bulk_mgr = BulkUpdateManager(["load_status"], chunk_size=1000)
for item in transformed_items.iterator():
# TODO Remove this after sucessfull testing
# import uuid
# item.item["id"] = f"{item.item['collection']}-{uuid.uuid4()}"

response = send_post_request_to_stac_api(item.item, f"{item.collection_id}")
Expand Down
10 changes: 7 additions & 3 deletions apps/etl/transform/sources/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@
logger = logging.getLogger(__name__)

ITEM_TYPE_COLLECTION_ID_MAP = {
"idu-events": PyStacLoadData.ItemType.EVENT,
"idu-impacts": PyStacLoadData.ItemType.IMPACT,
"idmc-idu-events": PyStacLoadData.ItemType.EVENT,
"idmc-idu-impacts": PyStacLoadData.ItemType.IMPACT,
"idmc-events": PyStacLoadData.ItemType.EVENT,
"idmc-gidd-impacts": PyStacLoadData.ItemType.IMPACT,
"gfd-events": PyStacLoadData.ItemType.EVENT,
"gfd-impacts": PyStacLoadData.ItemType.IMPACT,
"gfd-hazards": PyStacLoadData.ItemType.HAZARD,
"ifrcevent-events": PyStacLoadData.ItemType.EVENT,
"ifrcevent-impacts": PyStacLoadData.ItemType.IMPACT,
"pdc-events": PyStacLoadData.ItemType.EVENT,
"pdc-hazards": PyStacLoadData.ItemType.HAZARD,
"pdc-impacts": PyStacLoadData.ItemType.IMPACT,
}


Expand All @@ -30,7 +33,7 @@ def get_schema_data(cls, extraction_obj: ExtractionData):
@classmethod
def handle_transformation(cls, extraction_id):
logger.info("Transformation started")
extraction_obj = ExtractionData.objects.filter(id=extraction_id).first()
extraction_obj = ExtractionData.objects.filter(id=extraction_id.id).first()

transform_obj = Transform.objects.create(
extraction=extraction_obj,
Expand Down Expand Up @@ -63,6 +66,7 @@ def load_stac_item_to_queue(cls, transform_items, transform_obj_id):
transform_obj = Transform.objects.filter(id=transform_obj_id).first()
bulk_mgr = BulkCreateManager(chunk_size=1000)
for item in transform_items:
print(item.collection_id)
item_type = ITEM_TYPE_COLLECTION_ID_MAP[item.collection_id]
transformed_item_dict = item.to_dict()
transformed_item_dict["properties"]["monty:etl_id"] = str(uuid.uuid4())
Expand Down
20 changes: 18 additions & 2 deletions apps/etl/transform/sources/pdc.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
import json

from pystac_monty.sources.pdc import PDCDataSource, PDCTransformer

from apps.etl.models import ExtractionData
from main.celery import app

from .handler import BaseTransformerHandler
Expand All @@ -9,11 +14,22 @@ class PDCTransformHandler(BaseTransformerHandler):

@classmethod
def get_schema_data(cls, extraction_obj: ExtractionData):
data = extraction_obj.resp_data.file.url
source_url = extraction_obj.url
print("---------->", extraction_obj)
data = {
"hazards_file_path": extraction_obj.parent.resp_data.path,
"exposure_timestamp": extraction_obj.metadata["exposure_id"],
"uuid": extraction_obj.metadata["uuid"],
"exposure_detail_file_path": extraction_obj.resp_data.path,
"geojson_file_path": "",
}

return cls.transformer_schema(source_url=extraction_obj.url, data=data)
return cls.transformer_schema(source_url=source_url, data=json.dumps(data))

@staticmethod
@app.task
def task(extraction_id):
return PDCTransformHandler().handle_transformation(extraction_id)


# from apps.etl.extraction.sources.pdc.extract import get_hazard_details
4 changes: 2 additions & 2 deletions main/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@
EMDAT_AUTHORIZATION_KEY=str,
IDMC_CLIENT_ID=str,
IDMC_DATA_URL=(str, "https://helix-tools-api.idmcdb.org"),
# ETL Load configs
EOAPI_DOMAIN=str, # http://montandon-eoapi.ifrc.org
GFD_CREDENTIAL=str,
GFD_SERVICE_ACCOUNT=str,
IFRC_DATA_URL=str,
PDC_BASE_URL=(str, "https://sentry.pdc.org/hp_srv/services"),
PDC_AUTHORIZATION_KEY=str,
# ETL Load configs
EOAPI_DOMAIN=str, # http://montandon-eoapi.ifrc.org
)

GFD_SERVICE_ACCOUNT = env("GFD_SERVICE_ACCOUNT")
Expand Down

0 comments on commit b6c8eb8

Please sign in to comment.