Skip to content

Commit

Permalink
Fix pdc.
Browse files Browse the repository at this point in the history
  • Loading branch information
Rup-Narayan-Rajbanshi committed Feb 24, 2025
1 parent 5260b98 commit 3794662
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 16 deletions.
2 changes: 1 addition & 1 deletion apps/etl/extraction/sources/base/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


class Extraction:
def __init__(self, url: str, headers: str):
def __init__(self, url: str, headers: str = None):
self.headers = headers
self.url = url

Expand Down
22 changes: 11 additions & 11 deletions apps/etl/extraction/sources/base/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ def store_extraction_data(
resp_data = response.pop("resp_data")

# save the additional response data after the data is fetched from api.
gdacs_instance = ExtractionData.objects.get(id=instance_id)
extraction_instance = ExtractionData.objects.get(id=instance_id)
for key, value in response.items():
setattr(gdacs_instance, key, value)
gdacs_instance.save()
setattr(extraction_instance, key, value)
extraction_instance.save()

# save parent id if it is child extraction object
if parent_id:
gdacs_instance.parent_id = parent_id
gdacs_instance.save(update_fields=["parent_id"])
extraction_instance.parent_id = parent_id
extraction_instance.save(update_fields=["parent_id"])

# Validate the non empty response data.
if resp_data and not response["resp_code"] == 204:
Expand All @@ -65,22 +65,22 @@ def store_extraction_data(
# if the validate function requires hazard type as argument pass it as argument else don't.
if validate_source_func:
if requires_hazard_type:
gdacs_instance.source_validation_status = validate_source_func(resp_data_content, hazard_type)["status"]
gdacs_instance.content_validation = validate_source_func(resp_data_content, hazard_type)["validation_error"]
extraction_instance.source_validation_status = validate_source_func(resp_data_content, hazard_type)["status"]
extraction_instance.content_validation = validate_source_func(resp_data_content, hazard_type)["validation_error"]
else:
gdacs_instance.source_validation_status = validate_source_func(resp_data_content)["status"]
gdacs_instance.content_validation = validate_source_func(resp_data_content)["validation_error"]
extraction_instance.source_validation_status = validate_source_func(resp_data_content)["status"]
extraction_instance.content_validation = validate_source_func(resp_data_content)["validation_error"]

# manage duplicate file content.
hash_content = hash_file_content(resp_data_content)
manage_duplicate_file_content(
source=source,
hash_content=hash_content,
instance=gdacs_instance,
instance=extraction_instance,
response_data=resp_data_content,
file_name=file_name,
)
return gdacs_instance
return extraction_instance


def store_pdc_exposure_data(
Expand Down
2 changes: 1 addition & 1 deletion apps/etl/extraction/sources/pdc/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def get_hazard_details(self, extraction_id, **kwargs):
hazard_type=HAZARD_TYPE_MAP.get(hazard["type_ID"]),
metadata={"exposure_id": exposure_id, "uuid": hazard["uuid"]},
)
PDCTransformHandler.task(exposure_detail)
PDCTransformHandler.task(exposure_detail.id)
except Exception as exc:
self.retry(exc=exc, kwargs={"instance_id": instance_id.id, "retry_count": self.request.retries})

Expand Down
2 changes: 1 addition & 1 deletion apps/etl/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from apps.etl.etl_tasks.desinventar import import_desinventar_data # noqa: F401
from apps.etl.etl_tasks.emdat import extract_and_transform_emdat_data # noqa: F401
from apps.etl.etl_tasks.gdacs import import_hazard_data # noqa: F401
from apps.etl.etl_tasks.gdacs import ext_and_transform_gdacs_data # noqa: F401
from apps.etl.etl_tasks.glide import import_glide_hazard_data # noqa: F401
from apps.etl.extraction.sources.gdacs.extract import ( # noqa: F401
fetch_event_data,
Expand Down
3 changes: 1 addition & 2 deletions apps/etl/transform/sources/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def get_schema_data(cls, extraction_obj: ExtractionData):
@classmethod
def handle_transformation(cls, extraction_id):
logger.info("Transformation started")
extraction_obj = ExtractionData.objects.filter(id=extraction_id.id).first()
extraction_obj = ExtractionData.objects.filter(id=extraction_id).first()

transform_obj = Transform.objects.create(
extraction=extraction_obj,
Expand Down Expand Up @@ -66,7 +66,6 @@ def load_stac_item_to_queue(cls, transform_items, transform_obj_id):
transform_obj = Transform.objects.filter(id=transform_obj_id).first()
bulk_mgr = BulkCreateManager(chunk_size=1000)
for item in transform_items:
print(item.collection_id)
item_type = ITEM_TYPE_COLLECTION_ID_MAP[item.collection_id]
transformed_item_dict = item.to_dict()
transformed_item_dict["properties"]["monty:etl_id"] = str(uuid.uuid4())
Expand Down

0 comments on commit 3794662

Please sign in to comment.