Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extraction of ibracs #127

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions apps/etl/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@

@admin.register(ExtractionData)
class ExtractionDataAdmin(admin.ModelAdmin):
def get_readonly_fields(self, request, obj=None):
# Use the model's fields to populate readonly_fields
if obj: # If the object exists (edit page)
return [field.name for field in self.model._meta.fields]
return []
# def get_readonly_fields(self, request, obj=None):
# # Use the model's fields to populate readonly_fields
# if obj: # If the object exists (edit page)
# return [field.name for field in self.model._meta.fields]
# return []

list_display = (
"id",
Expand All @@ -22,6 +22,7 @@ def get_readonly_fields(self, request, obj=None):
"source_validation_status",
"hazard_type",
"created_at",
"resp_data",
)
list_filter = ("status",)
autocomplete_fields = ["parent"]
Expand Down
2 changes: 1 addition & 1 deletion apps/etl/etl_tasks/gdacs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import requests
from celery import chain, shared_task

from apps.etl.extraction.sources.base.utils import store_extraction_data
from apps.etl.extraction.sources.base.extract import Extraction
from apps.etl.extraction.sources.base.utils import store_extraction_data
from apps.etl.extraction.sources.gdacs.extract import (
fetch_event_data,
fetch_gdacs_geometry_data,
Expand Down
13 changes: 13 additions & 0 deletions apps/etl/etl_tasks/noaa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from celery import chain, shared_task

from apps.etl.extraction.sources.noaa_IBTrACS.extract import (
import_hazard_data as import_noaa_data,
)
from apps.etl.transform.sources.noaa import transform_event_data


@shared_task
def import_noaa_hazard_data(**kwargs):
# exp_id = import_noaa_data()

transform_event_data(25)
3 changes: 2 additions & 1 deletion apps/etl/extraction/sources/base/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import hashlib

from django.core.files.base import ContentFile

from apps.etl.models import ExtractionData


Expand All @@ -12,6 +13,7 @@ def hash_file_content(content):
file_hash = hashlib.sha256(content).hexdigest()
return file_hash


def manage_duplicate_file_content(source, hash_content, instance, response_data, file_name):
"""
if duplicate file content exists then do not create a new file, but point the url to
Expand Down Expand Up @@ -73,4 +75,3 @@ def store_extraction_data(
file_name=file_name,
)
return gdacs_instance

2 changes: 1 addition & 1 deletion apps/etl/extraction/sources/gdacs/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from celery import shared_task
from pydantic import ValidationError

from apps.etl.extraction.sources.base.utils import store_extraction_data
from apps.etl.extraction.sources.base.extract import Extraction
from apps.etl.extraction.sources.base.utils import store_extraction_data
from apps.etl.extraction.sources.gdacs.validators.gdacs_eventsdata import (
GDacsEventDataValidator,
)
Expand Down
2 changes: 1 addition & 1 deletion apps/etl/extraction/sources/glide/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def import_hazard_data(self, hazard_type: str, hazard_type_str: str, **kwargs):
# glide_url = f"https://www.glidenumber.net/glide/jsonglideset.jsp?fromyear=2024&frommonth=10&fromday=01&toyear=2024&frommonth=12&today=31&events={hazard_type}" # noqa: E501
glide_url = f"https://www.glidenumber.net/glide/jsonglideset.jsp?fromyear={yesterday.year}&frommonth={yesterday.month}&fromday={yesterday.day}&toyear={today.year}&frommonth={today.month}&today={today.day}&events={hazard_type}" # noqa: E501

# Create a Extraction object in the begining
# Create a Extraction object in the beginning
instance_id = kwargs.get("instance_id", None)
retry_count = kwargs.get("retry_count", None)

Expand Down
Empty file.
62 changes: 62 additions & 0 deletions apps/etl/extraction/sources/noaa_IBTrACS/extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import logging
from datetime import datetime, timedelta

import requests
from celery import shared_task

from apps.etl.extraction.sources.base.extract import Extraction
from apps.etl.extraction.sources.base.utils import store_extraction_data
from apps.etl.models import ExtractionData, HazardType

logger = logging.getLogger(__name__)


@shared_task(bind=True, max_retries=3, default_retry_delay=5)
def import_hazard_data(self, **kwargs):
"""
Import hazard data from glide api
"""
logger.info(f"Importing {HazardType.CYCLONE} data")

noaa_active_event_data = "https://www.ncei.noaa.gov/data/international-best-track-archive-for-climate-stewardship-ibtracs/v04r01/access/csv/ibtracs.ACTIVE.list.v04r01.csv" # noqa E261
# Create a Extraction object in the begining
instance_id = kwargs.get("instance_id", None)
retry_count = kwargs.get("retry_count", None)

noaa_instance = (
ExtractionData.objects.get(id=instance_id)
if instance_id
else ExtractionData.objects.create(
source=ExtractionData.Source.IBTRACS,
status=ExtractionData.Status.PENDING,
source_validation_status=ExtractionData.ValidationStatus.NO_VALIDATION,
hazard_type=HazardType.CYCLONE,
attempt_no=0,
resp_code=0,
)
)

noaa_url = noaa_active_event_data

# Extract the data from api.
noaa_extraction = Extraction(url=noaa_url)
response = None
try:
response = noaa_extraction.pull_data(
source=ExtractionData.Source.IBTRACS,
ext_object_id=noaa_instance.id,
retry_count=retry_count if retry_count else 1,
)
except requests.exceptions.RequestException as exc:
self.retry(exc=exc, kwargs={"instance_id": noaa_instance.id, "retry_count": self.request.retries})

if response:
# Save the extracted data into the existing glide object
noaa_instance = store_extraction_data(
response=response,
source=ExtractionData.Source.IBTRACS,
validate_source_func=None,
instance_id=noaa_instance.id,
)
logger.info(f"{HazardType.CYCLONE} data imported successfully")
return noaa_instance.id
9 changes: 6 additions & 3 deletions apps/etl/load/sources/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import requests
import uuid
from celery.utils.log import get_task_logger
from django.core.management.base import BaseCommand

Expand All @@ -9,6 +10,7 @@


def send_post_request_to_stac_api(result, collection_id):
print("...........", collection_id)
try:
# url = f"http://montandon-eoapi-stage.ifrc.org/stac/collections/{collection_id}/items"
url = f"https://montandon-eoapi-1.ifrc-go.dev.togglecorp.com/stac/collections/{collection_id}/items"
Expand All @@ -24,14 +26,15 @@ def load_data(django_command: BaseCommand | None = None):
"""Load data into STAC"""
logger.info("Loading data into Stac")

transformed_items = PyStacLoadData.objects.filter(load_status=PyStacLoadData.LoadStatus.PENDING)
transformed_items = PyStacLoadData.objects.exclude(load_status=PyStacLoadData.LoadStatus.SUCCESS)

bulk_mgr = BulkUpdateManager(["load_status"], chunk_size=1000)
for item in transformed_items.iterator():
# TODO Remove this after sucessfull testing
# item.item["id"] = f"{item.item['collection']}-{uuid.uuid4()}"
item.item["id"] = f"{item.item['collection']}-{uuid.uuid4()}"

response = send_post_request_to_stac_api(item.item, f"{item.collection_id}")
print(response)

# Set the loading status of item.
if response and response.status_code == 200:
Expand All @@ -52,7 +55,7 @@ def load_data(django_command: BaseCommand | None = None):
)

if django_command is not None:
django_command.stdout.write(django_command.ERROR(f"Fail to load item {item.id}"))
django_command.stdout.write(django_command.style.ERROR(f"Fail to load item {item.id}"))

bulk_mgr.done()

Expand Down
18 changes: 18 additions & 0 deletions apps/etl/management/commands/extract_ibtracs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import logging

from django.core.management.base import BaseCommand

from apps.etl.etl_tasks.noaa import import_noaa_hazard_data
from apps.etl.models import HazardType

logger = logging.getLogger(__name__)


class Command(BaseCommand):
help = "Import data from noaa api"

def handle(self, *args, **options):
import_noaa_hazard_data()



18 changes: 18 additions & 0 deletions apps/etl/migrations/0014_alter_extractiondata_hazard_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 5.1.5 on 2025-01-20 11:39

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('etl', '0013_glidetransformation'),
]

operations = [
migrations.AlterField(
model_name='extractiondata',
name='hazard_type',
field=models.CharField(blank=True, choices=[('EQ', 'Earthquake'), ('FL', 'Flood'), ('TC', 'Cyclone'), ('EP', 'Epidemic'), ('FI', 'Food Insecurity'), ('SS', 'Storm Surge'), ('DR', 'Drought'), ('TS', 'Tsunami'), ('CD', 'Cyclonic Wind'), ('WF', 'WildFire'), ('VO', 'Volcano'), ('CW', 'Cold Wave'), ('CE', 'Complex Emergency'), ('EC', 'Extratropical Cyclone'), ('ET', 'Extreme temperature'), ('FA', 'Famine'), ('FR', 'Fire'), ('FF', 'Flash Flood'), ('HT', 'Heat Wave'), ('IN', 'Insect Infestation'), ('LS', 'Land Slide'), ('MS', 'Mud Slide'), ('ST', 'Severe Local Strom'), ('SL', 'Slide'), ('AV', 'Snow Avalanche'), ('AC', 'Tech. Disaster'), ('TO', 'Tornado'), ('VW', 'Violent Wind'), ('WV', 'Wave/Surge')], max_length=100, verbose_name='hazard type'),
),
]
14 changes: 14 additions & 0 deletions apps/etl/migrations/0023_merge_20250121_0920.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Generated by Django 5.1.5 on 2025-01-21 09:20

from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
('etl', '0014_alter_extractiondata_hazard_type'),
('etl', '0022_alter_transform_is_loaded'),
]

operations = [
]
2 changes: 1 addition & 1 deletion apps/etl/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class Status(models.IntegerChoices):
hazard_type = models.CharField(max_length=100, verbose_name=_("hazard type"), choices=HazardType.choices, blank=True)

def __str__(self):
return str(self.id)
return f"{self.get_source_display()} - {self.id}"


class Transform(Resource):
Expand Down
5 changes: 5 additions & 0 deletions apps/etl/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ def extract_desinventar_data():
call_command("extract_desinventar_data")


@shared_task
def extract_noaa_data():
call_command("extract_ibtracs")


@shared_task
def load_data():
call_command("load_data_to_stac")
61 changes: 61 additions & 0 deletions apps/etl/transform/sources/noaa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import logging
import uuid
from celery import shared_task
from pystac_monty.sources.noaa import NoaaIbtracsSource, NoaaIbtracsTransformer

from apps.etl.models import ExtractionData, Transform, PyStacLoadData
from apps.etl.utils import read_file_data
from main.managers import BulkCreateManager

logger = logging.getLogger(__name__)

noaa_item_type_map = {
"ibtracs-events": PyStacLoadData.ItemType.EVENT,
"ibtracs-hazards": PyStacLoadData.ItemType.HAZARD,
}


@shared_task
def transform_event_data(event_extraction_id):
logger.debug("Transform event data for noaa")
logger.info("Transform started for event data for noaa")
noaa_instance = ExtractionData.objects.get(id=event_extraction_id)
# data = read_file_data(noaa_instance.resp_data)
transform_obj = Transform.objects.create(
extraction=noaa_instance,
status=Transform.Status.PENDING,
)

bulk_mgr = BulkCreateManager(chunk_size=1000)
try:
transformer = NoaaIbtracsTransformer(NoaaIbtracsSource(source_url=noaa_instance.url, data=noaa_instance.resp_data))
transformed_event_items = transformer.make_items()

transform_obj.status = Transform.Status.SUCCESS
transform_obj.save(update_fields=["status"])
except Exception as e:
logger.error("Noaa transformation failed", exc_info=True, extra={"extraction_id": noaa_instance.id})
transform_obj.status = Transform.Status.FAILED
transform_obj.save(update_fields=["status"])
raise e
for item in transformed_event_items:
item_type = noaa_item_type_map[item.collection_id]
transformed_item_dict = item.to_dict()
transformed_item_dict["properties"]["monty:etl_id"] = str(uuid.uuid4())
bulk_mgr.add(
PyStacLoadData(
transform_id=transform_obj,
item=transformed_item_dict,
collection_id=item.collection_id,
item_type=item_type,
load_status=PyStacLoadData.LoadStatus.PENDING,
)
)

bulk_mgr.done()


transform_obj.is_loaded = True
transform_obj.save(update_fields=["is_loaded"])

logger.info("Transformation ended for noaa data")
60 changes: 60 additions & 0 deletions libs/emdat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import pandas as pd
from shapely.geometry import Point, mapping
from typing import List, Optional
from pystac import Item

STAC_EVENT_ID_PREFIX = "emdat-"

class EmdatProcessor:
def make_source_event_items(self) -> List:
"""Create source event items from EMDAT data"""
event_items = []
# Assuming `self.data` is a DataFrame containing the EMDAT data
for _, row in self.data.iterrows():
try:
event_item = self._create_event_item_from_row(row)
if event_item:
event_items.append(event_item)
except Exception as e:
print(f"Error creating event item for SID {row.get('SID', 'unknown')}: {str(e)}")
break

return event_items

def _create_event_item_from_row(self, row):
if pd.isna(row.get("SID")):
return None

geometry = None
bbox = None
if geometry is None and not pd.isna(row.get("LAT")) and not pd.isna(row.get("LON")):
point = Point(float(row["LON"]), float(row["LAT"]))
geometry = mapping(point)
bbox = [float(row["LON"]), float(row["LAT"]), float(row["LON"]), float(row["LAT"])]

item = Item(
id=f"{STAC_EVENT_ID_PREFIX}{row['SID']}",
geometry=geometry,
bbox=bbox,
datetime="2022-02-02",
properties={},
)
item.set_collection(self.get_event_collection())

return item

def make_hazard_event_items(self) -> List:
"""Create hazard items based on event items"""
hazard_items = []
event_items = self.make_source_event_items()

for event_item in event_items:
hazard_item = self._create_hazard_item_from_event(event_item)
if hazard_item:
hazard_items.append(hazard_item)

return hazard_items

def _create_hazard_item_from_event(self, event_item: Item) -> Optional[Item]:
# Implementation for creating hazard item from event item
pass
2 changes: 1 addition & 1 deletion libs/pystac-monty
Loading