diff --git a/apps/etl/admin.py b/apps/etl/admin.py index d43a7ee..b09040d 100644 --- a/apps/etl/admin.py +++ b/apps/etl/admin.py @@ -6,11 +6,11 @@ @admin.register(ExtractionData) class ExtractionDataAdmin(admin.ModelAdmin): - def get_readonly_fields(self, request, obj=None): - # Use the model's fields to populate readonly_fields - if obj: # If the object exists (edit page) - return [field.name for field in self.model._meta.fields] - return [] + # def get_readonly_fields(self, request, obj=None): + # # Use the model's fields to populate readonly_fields + # if obj: # If the object exists (edit page) + # return [field.name for field in self.model._meta.fields] + # return [] list_display = ( "id", @@ -22,6 +22,7 @@ def get_readonly_fields(self, request, obj=None): "source_validation_status", "hazard_type", "created_at", + "resp_data", ) list_filter = ("status",) autocomplete_fields = ["parent"] diff --git a/apps/etl/etl_tasks/gdacs.py b/apps/etl/etl_tasks/gdacs.py index d24599a..a519ca9 100644 --- a/apps/etl/etl_tasks/gdacs.py +++ b/apps/etl/etl_tasks/gdacs.py @@ -5,8 +5,8 @@ import requests from celery import chain, shared_task -from apps.etl.extraction.sources.base.utils import store_extraction_data from apps.etl.extraction.sources.base.extract import Extraction +from apps.etl.extraction.sources.base.utils import store_extraction_data from apps.etl.extraction.sources.gdacs.extract import ( fetch_event_data, fetch_gdacs_geometry_data, diff --git a/apps/etl/etl_tasks/noaa.py b/apps/etl/etl_tasks/noaa.py new file mode 100644 index 0000000..4be58d8 --- /dev/null +++ b/apps/etl/etl_tasks/noaa.py @@ -0,0 +1,13 @@ +from celery import chain, shared_task + +from apps.etl.extraction.sources.noaa_IBTrACS.extract import ( + import_hazard_data as import_noaa_data, +) +from apps.etl.transform.sources.noaa import transform_event_data + + +@shared_task +def import_noaa_hazard_data(**kwargs): + # exp_id = import_noaa_data() + + transform_event_data(25) diff --git a/apps/etl/extraction/sources/base/utils.py b/apps/etl/extraction/sources/base/utils.py index 6991872..b876275 100644 --- a/apps/etl/extraction/sources/base/utils.py +++ b/apps/etl/extraction/sources/base/utils.py @@ -1,6 +1,7 @@ import hashlib from django.core.files.base import ContentFile + from apps.etl.models import ExtractionData @@ -12,6 +13,7 @@ def hash_file_content(content): file_hash = hashlib.sha256(content).hexdigest() return file_hash + def manage_duplicate_file_content(source, hash_content, instance, response_data, file_name): """ if duplicate file content exists then do not create a new file, but point the url to @@ -73,4 +75,3 @@ def store_extraction_data( file_name=file_name, ) return gdacs_instance - diff --git a/apps/etl/extraction/sources/gdacs/extract.py b/apps/etl/extraction/sources/gdacs/extract.py index 0f014c5..614cb76 100644 --- a/apps/etl/extraction/sources/gdacs/extract.py +++ b/apps/etl/extraction/sources/gdacs/extract.py @@ -7,8 +7,8 @@ from celery import shared_task from pydantic import ValidationError -from apps.etl.extraction.sources.base.utils import store_extraction_data from apps.etl.extraction.sources.base.extract import Extraction +from apps.etl.extraction.sources.base.utils import store_extraction_data from apps.etl.extraction.sources.gdacs.validators.gdacs_eventsdata import ( GDacsEventDataValidator, ) diff --git a/apps/etl/extraction/sources/glide/extract.py b/apps/etl/extraction/sources/glide/extract.py index 06c855b..8025b8d 100644 --- a/apps/etl/extraction/sources/glide/extract.py +++ b/apps/etl/extraction/sources/glide/extract.py @@ -24,7 +24,7 @@ def import_hazard_data(self, hazard_type: str, hazard_type_str: str, **kwargs): # glide_url = f"https://www.glidenumber.net/glide/jsonglideset.jsp?fromyear=2024&frommonth=10&fromday=01&toyear=2024&frommonth=12&today=31&events={hazard_type}" # noqa: E501 glide_url = f"https://www.glidenumber.net/glide/jsonglideset.jsp?fromyear={yesterday.year}&frommonth={yesterday.month}&fromday={yesterday.day}&toyear={today.year}&frommonth={today.month}&today={today.day}&events={hazard_type}" # noqa: E501 - # Create a Extraction object in the begining + # Create a Extraction object in the beginning instance_id = kwargs.get("instance_id", None) retry_count = kwargs.get("retry_count", None) diff --git a/apps/etl/extraction/sources/noaa_IBTrACS/__init__.py b/apps/etl/extraction/sources/noaa_IBTrACS/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/etl/extraction/sources/noaa_IBTrACS/extract.py b/apps/etl/extraction/sources/noaa_IBTrACS/extract.py new file mode 100644 index 0000000..a457715 --- /dev/null +++ b/apps/etl/extraction/sources/noaa_IBTrACS/extract.py @@ -0,0 +1,62 @@ +import logging +from datetime import datetime, timedelta + +import requests +from celery import shared_task + +from apps.etl.extraction.sources.base.extract import Extraction +from apps.etl.extraction.sources.base.utils import store_extraction_data +from apps.etl.models import ExtractionData, HazardType + +logger = logging.getLogger(__name__) + + +@shared_task(bind=True, max_retries=3, default_retry_delay=5) +def import_hazard_data(self, **kwargs): + """ + Import hazard data from glide api + """ + logger.info(f"Importing {HazardType.CYCLONE} data") + + noaa_active_event_data = "https://www.ncei.noaa.gov/data/international-best-track-archive-for-climate-stewardship-ibtracs/v04r01/access/csv/ibtracs.ACTIVE.list.v04r01.csv" # noqa E261 + # Create a Extraction object in the begining + instance_id = kwargs.get("instance_id", None) + retry_count = kwargs.get("retry_count", None) + + noaa_instance = ( + ExtractionData.objects.get(id=instance_id) + if instance_id + else ExtractionData.objects.create( + source=ExtractionData.Source.IBTRACS, + status=ExtractionData.Status.PENDING, + source_validation_status=ExtractionData.ValidationStatus.NO_VALIDATION, + hazard_type=HazardType.CYCLONE, + attempt_no=0, + resp_code=0, + ) + ) + + noaa_url = noaa_active_event_data + + # Extract the data from api. + noaa_extraction = Extraction(url=noaa_url) + response = None + try: + response = noaa_extraction.pull_data( + source=ExtractionData.Source.IBTRACS, + ext_object_id=noaa_instance.id, + retry_count=retry_count if retry_count else 1, + ) + except requests.exceptions.RequestException as exc: + self.retry(exc=exc, kwargs={"instance_id": noaa_instance.id, "retry_count": self.request.retries}) + + if response: + # Save the extracted data into the existing glide object + noaa_instance = store_extraction_data( + response=response, + source=ExtractionData.Source.IBTRACS, + validate_source_func=None, + instance_id=noaa_instance.id, + ) + logger.info(f"{HazardType.CYCLONE} data imported successfully") + return noaa_instance.id diff --git a/apps/etl/load/sources/base.py b/apps/etl/load/sources/base.py index 263ef0b..6562692 100644 --- a/apps/etl/load/sources/base.py +++ b/apps/etl/load/sources/base.py @@ -1,4 +1,5 @@ import requests +import uuid from celery.utils.log import get_task_logger from django.core.management.base import BaseCommand @@ -9,6 +10,7 @@ def send_post_request_to_stac_api(result, collection_id): + print("...........", collection_id) try: # url = f"http://montandon-eoapi-stage.ifrc.org/stac/collections/{collection_id}/items" url = f"https://montandon-eoapi-1.ifrc-go.dev.togglecorp.com/stac/collections/{collection_id}/items" @@ -24,14 +26,15 @@ def load_data(django_command: BaseCommand | None = None): """Load data into STAC""" logger.info("Loading data into Stac") - transformed_items = PyStacLoadData.objects.filter(load_status=PyStacLoadData.LoadStatus.PENDING) + transformed_items = PyStacLoadData.objects.exclude(load_status=PyStacLoadData.LoadStatus.SUCCESS) bulk_mgr = BulkUpdateManager(["load_status"], chunk_size=1000) for item in transformed_items.iterator(): # TODO Remove this after sucessfull testing - # item.item["id"] = f"{item.item['collection']}-{uuid.uuid4()}" + item.item["id"] = f"{item.item['collection']}-{uuid.uuid4()}" response = send_post_request_to_stac_api(item.item, f"{item.collection_id}") + print(response) # Set the loading status of item. if response and response.status_code == 200: @@ -52,7 +55,7 @@ def load_data(django_command: BaseCommand | None = None): ) if django_command is not None: - django_command.stdout.write(django_command.ERROR(f"Fail to load item {item.id}")) + django_command.stdout.write(django_command.style.ERROR(f"Fail to load item {item.id}")) bulk_mgr.done() diff --git a/apps/etl/management/commands/extract_ibtracs.py b/apps/etl/management/commands/extract_ibtracs.py new file mode 100644 index 0000000..52e750f --- /dev/null +++ b/apps/etl/management/commands/extract_ibtracs.py @@ -0,0 +1,18 @@ +import logging + +from django.core.management.base import BaseCommand + +from apps.etl.etl_tasks.noaa import import_noaa_hazard_data +from apps.etl.models import HazardType + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Import data from noaa api" + + def handle(self, *args, **options): + import_noaa_hazard_data() + + + diff --git a/apps/etl/migrations/0014_alter_extractiondata_hazard_type.py b/apps/etl/migrations/0014_alter_extractiondata_hazard_type.py new file mode 100644 index 0000000..5c90c03 --- /dev/null +++ b/apps/etl/migrations/0014_alter_extractiondata_hazard_type.py @@ -0,0 +1,18 @@ +# Generated by Django 5.1.5 on 2025-01-20 11:39 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('etl', '0013_glidetransformation'), + ] + + operations = [ + migrations.AlterField( + model_name='extractiondata', + name='hazard_type', + field=models.CharField(blank=True, choices=[('EQ', 'Earthquake'), ('FL', 'Flood'), ('TC', 'Cyclone'), ('EP', 'Epidemic'), ('FI', 'Food Insecurity'), ('SS', 'Storm Surge'), ('DR', 'Drought'), ('TS', 'Tsunami'), ('CD', 'Cyclonic Wind'), ('WF', 'WildFire'), ('VO', 'Volcano'), ('CW', 'Cold Wave'), ('CE', 'Complex Emergency'), ('EC', 'Extratropical Cyclone'), ('ET', 'Extreme temperature'), ('FA', 'Famine'), ('FR', 'Fire'), ('FF', 'Flash Flood'), ('HT', 'Heat Wave'), ('IN', 'Insect Infestation'), ('LS', 'Land Slide'), ('MS', 'Mud Slide'), ('ST', 'Severe Local Strom'), ('SL', 'Slide'), ('AV', 'Snow Avalanche'), ('AC', 'Tech. Disaster'), ('TO', 'Tornado'), ('VW', 'Violent Wind'), ('WV', 'Wave/Surge')], max_length=100, verbose_name='hazard type'), + ), + ] diff --git a/apps/etl/migrations/0023_merge_20250121_0920.py b/apps/etl/migrations/0023_merge_20250121_0920.py new file mode 100644 index 0000000..3a6f2e1 --- /dev/null +++ b/apps/etl/migrations/0023_merge_20250121_0920.py @@ -0,0 +1,14 @@ +# Generated by Django 5.1.5 on 2025-01-21 09:20 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('etl', '0014_alter_extractiondata_hazard_type'), + ('etl', '0022_alter_transform_is_loaded'), + ] + + operations = [ + ] diff --git a/apps/etl/models.py b/apps/etl/models.py index e006d05..5948427 100644 --- a/apps/etl/models.py +++ b/apps/etl/models.py @@ -102,7 +102,7 @@ class Status(models.IntegerChoices): hazard_type = models.CharField(max_length=100, verbose_name=_("hazard type"), choices=HazardType.choices, blank=True) def __str__(self): - return str(self.id) + return f"{self.get_source_display()} - {self.id}" class Transform(Resource): diff --git a/apps/etl/tasks.py b/apps/etl/tasks.py index f1c6e76..e96e41b 100644 --- a/apps/etl/tasks.py +++ b/apps/etl/tasks.py @@ -35,6 +35,11 @@ def extract_desinventar_data(): call_command("extract_desinventar_data") +@shared_task +def extract_noaa_data(): + call_command("extract_ibtracs") + + @shared_task def load_data(): call_command("load_data_to_stac") diff --git a/apps/etl/transform/sources/noaa.py b/apps/etl/transform/sources/noaa.py new file mode 100644 index 0000000..a840721 --- /dev/null +++ b/apps/etl/transform/sources/noaa.py @@ -0,0 +1,61 @@ +import logging +import uuid +from celery import shared_task +from pystac_monty.sources.noaa import NoaaIbtracsSource, NoaaIbtracsTransformer + +from apps.etl.models import ExtractionData, Transform, PyStacLoadData +from apps.etl.utils import read_file_data +from main.managers import BulkCreateManager + +logger = logging.getLogger(__name__) + +noaa_item_type_map = { + "ibtracs-events": PyStacLoadData.ItemType.EVENT, + "ibtracs-hazards": PyStacLoadData.ItemType.HAZARD, +} + + +@shared_task +def transform_event_data(event_extraction_id): + logger.debug("Transform event data for noaa") + logger.info("Transform started for event data for noaa") + noaa_instance = ExtractionData.objects.get(id=event_extraction_id) + # data = read_file_data(noaa_instance.resp_data) + transform_obj = Transform.objects.create( + extraction=noaa_instance, + status=Transform.Status.PENDING, + ) + + bulk_mgr = BulkCreateManager(chunk_size=1000) + try: + transformer = NoaaIbtracsTransformer(NoaaIbtracsSource(source_url=noaa_instance.url, data=noaa_instance.resp_data)) + transformed_event_items = transformer.make_items() + + transform_obj.status = Transform.Status.SUCCESS + transform_obj.save(update_fields=["status"]) + except Exception as e: + logger.error("Noaa transformation failed", exc_info=True, extra={"extraction_id": noaa_instance.id}) + transform_obj.status = Transform.Status.FAILED + transform_obj.save(update_fields=["status"]) + raise e + for item in transformed_event_items: + item_type = noaa_item_type_map[item.collection_id] + transformed_item_dict = item.to_dict() + transformed_item_dict["properties"]["monty:etl_id"] = str(uuid.uuid4()) + bulk_mgr.add( + PyStacLoadData( + transform_id=transform_obj, + item=transformed_item_dict, + collection_id=item.collection_id, + item_type=item_type, + load_status=PyStacLoadData.LoadStatus.PENDING, + ) + ) + + bulk_mgr.done() + + + transform_obj.is_loaded = True + transform_obj.save(update_fields=["is_loaded"]) + + logger.info("Transformation ended for noaa data") diff --git a/libs/emdat.py b/libs/emdat.py new file mode 100644 index 0000000..2f9a680 --- /dev/null +++ b/libs/emdat.py @@ -0,0 +1,60 @@ +import pandas as pd +from shapely.geometry import Point, mapping +from typing import List, Optional +from pystac import Item + +STAC_EVENT_ID_PREFIX = "emdat-" + +class EmdatProcessor: + def make_source_event_items(self) -> List: + """Create source event items from EMDAT data""" + event_items = [] + # Assuming `self.data` is a DataFrame containing the EMDAT data + for _, row in self.data.iterrows(): + try: + event_item = self._create_event_item_from_row(row) + if event_item: + event_items.append(event_item) + except Exception as e: + print(f"Error creating event item for SID {row.get('SID', 'unknown')}: {str(e)}") + break + + return event_items + + def _create_event_item_from_row(self, row): + if pd.isna(row.get("SID")): + return None + + geometry = None + bbox = None + if geometry is None and not pd.isna(row.get("LAT")) and not pd.isna(row.get("LON")): + point = Point(float(row["LON"]), float(row["LAT"])) + geometry = mapping(point) + bbox = [float(row["LON"]), float(row["LAT"]), float(row["LON"]), float(row["LAT"])] + + item = Item( + id=f"{STAC_EVENT_ID_PREFIX}{row['SID']}", + geometry=geometry, + bbox=bbox, + datetime="2022-02-02", + properties={}, + ) + item.set_collection(self.get_event_collection()) + + return item + + def make_hazard_event_items(self) -> List: + """Create hazard items based on event items""" + hazard_items = [] + event_items = self.make_source_event_items() + + for event_item in event_items: + hazard_item = self._create_hazard_item_from_event(event_item) + if hazard_item: + hazard_items.append(hazard_item) + + return hazard_items + + def _create_hazard_item_from_event(self, event_item: Item) -> Optional[Item]: + # Implementation for creating hazard item from event item + pass \ No newline at end of file diff --git a/libs/pystac-monty b/libs/pystac-monty index 936d495..9720a9e 160000 --- a/libs/pystac-monty +++ b/libs/pystac-monty @@ -1 +1 @@ -Subproject commit 936d495bd02e52849d873205e74632bb590b9ef4 +Subproject commit 9720a9e1d7a6587f473e710d3c4b89d38ae101ad diff --git a/main/settings.py b/main/settings.py index 389bc20..3b7b750 100644 --- a/main/settings.py +++ b/main/settings.py @@ -176,4 +176,8 @@ "task": "apps.etl.tasks.load_data", "schedule": crontab(minute=0, hour=0), # TODO Set time to run this job }, + "import_noaa_data": { + "task": "apps.etl.tasks.extract_noaa_data", + "schedule": crontab(minute=0, hour=0), # This task execute daily at 12 AM (UTC) + }, }