Skip to content

Commit

Permalink
Update ArcGis geometry in events
Browse files Browse the repository at this point in the history
  • Loading branch information
sudan45 authored and Rup-Narayan-Rajbanshi committed Feb 14, 2025
1 parent b6c8eb8 commit 85642e7
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 37 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ env:
GFD_CREDENTIAL: dummy-value
GFD_SERVICE_ACCOUNT: dummy-value

PDC_BASE_URL: https://pdc.dummy.com
PDC_AUTHORIZATION_KEY: dummy-value
ARC_DOMAIN: https://arc.dummy.com
ARC_USERNAME: dummy-username
ARC_PASSWORD: dummy-password

jobs:
pre_commit_checks:
Expand Down
12 changes: 12 additions & 0 deletions apps/etl/etl_tasks/pdc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from celery import shared_task

from apps.etl.extraction.sources.pdc.extract import (
get_hazard_details,
import_hazard_data,
)


@shared_task
def extract_and_transform_pdc_data():
extraction_id = import_hazard_data()
get_hazard_details(extraction_id)
14 changes: 14 additions & 0 deletions apps/etl/extraction/sources/base/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import hashlib
import json
import os

from django.conf import settings
from django.core.files.base import ContentFile

from apps.etl.models import ExtractionData
Expand Down Expand Up @@ -103,3 +105,15 @@ def store_pdc_exposure_data(
instance.resp_data.save(content_file.name, content_file)

return instance


def store_geojson_file(response, source=None, validate_source_func=None, instance_id=None, hazard_type=None, metadata=None):
file_extension = "geojson"
file_name = f"{instance_id}pdc.{file_extension}"
instance = ExtractionData.objects.get(id=instance_id.id)
file_path = os.path.join(settings.MEDIA_ROOT, "source_raw_data", file_name)
with open(file_path, "w") as f:
json.dump(response, f)
instance.metadata["geojson_file_path"] = file_path
instance.save()
return instance.id
60 changes: 33 additions & 27 deletions apps/etl/extraction/sources/pdc/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
from apps.etl.extraction.sources.base.extract import Extraction
from apps.etl.extraction.sources.base.utils import (
store_extraction_data,
store_geojson_file,
store_pdc_exposure_data,
)
from apps.etl.models import ExtractionData, HazardType
from apps.etl.transform.sources.pdc import PDCTransformHandler
from apps.etl.utils import AccessTokenManager

logger = logging.getLogger(__name__)

Expand All @@ -21,7 +23,6 @@
"EARTHQUAKE": HazardType.EARTHQUAKE,
"EXTREMETEMPERATURE": HazardType.EXTREME_TEMPERATURE,
"FLOOD": HazardType.FLOOD,
# "HIGHSURF": HazardType.OTHER,
"HIGHWIND": HazardType.WIND,
"LANDSLIDE": HazardType.LANDSLIDE,
"SEVEREWEATHER": HazardType.OTHER,
Expand All @@ -39,36 +40,41 @@
def get_hazard_details(self, extraction_id, **kwargs):
instance_id = ExtractionData.objects.get(id=extraction_id)
response_data = json.loads(instance_id.resp_data.read())
geo = AccessTokenManager(requests.Session())
for hazard in response_data:
if hazard["type_ID"] not in HAZARD_TYPE_MAP.keys():
continue
r = requests.get(
f"{settings.PDC_BASE_URL}/hazard/{hazard['uuid']}/exposure",
headers={"Authorization": f"Bearer {settings.PDC_AUTHORIZATION_KEY}"},
)
for exposure_id in r.json():
if ExtractionData.objects.filter(
metadata__exposure_id=exposure_id,
source=ExtractionData.Source.PDC,
status=ExtractionData.Status.SUCCESS,
metadata__uuid=hazard["uuid"],
).exists():
try:
geo_json_file = geo.get_polygon(hazard["uuid"])
store_geojson_file(geo_json_file, instance_id=instance_id)
if hazard["type_ID"] not in HAZARD_TYPE_MAP.keys():
continue
detail_url = f"{settings.PDC_BASE_URL}/hazard/{hazard['uuid']}/exposure/{exposure_id}"
detail_response = requests.get(
url=detail_url,
r = requests.get(
f"{settings.PDC_BASE_URL}/hazard/{hazard['uuid']}/exposure",
headers={"Authorization": f"Bearer {settings.PDC_AUTHORIZATION_KEY}"},
)
exposure_detail = store_pdc_exposure_data(
response=detail_response.json(),
source=ExtractionData.Source.PDC,
validate_source_func=None,
parent_id=instance_id.id,
hazard_type=HAZARD_TYPE_MAP.get(hazard["type_ID"]),
metadata={"exposure_id": exposure_id, "uuid": hazard["uuid"]},
)
PDCTransformHandler.task(exposure_detail)
return None
for exposure_id in r.json():
if ExtractionData.objects.filter(
metadata__exposure_id=exposure_id,
source=ExtractionData.Source.PDC,
status=ExtractionData.Status.SUCCESS,
metadata__uuid=hazard["uuid"],
).exists():
continue
detail_url = f"{settings.PDC_BASE_URL}/hazard/{hazard['uuid']}/exposure/{exposure_id}"
detail_response = requests.get(
url=detail_url,
headers={"Authorization": f"Bearer {settings.PDC_AUTHORIZATION_KEY}"},
)
exposure_detail = store_pdc_exposure_data(
response=detail_response.json(),
source=ExtractionData.Source.PDC,
validate_source_func=None,
parent_id=instance_id.id,
hazard_type=HAZARD_TYPE_MAP.get(hazard["type_ID"]),
metadata={"exposure_id": exposure_id, "uuid": hazard["uuid"]},
)
PDCTransformHandler.task(exposure_detail)
except Exception as exc:
self.retry(exc=exc, kwargs={"instance_id": instance_id.id, "retry_count": self.request.retries})


@shared_task(bind=True, max_retries=3, default_retry_delay=5)
Expand Down
1 change: 0 additions & 1 deletion apps/etl/load/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def send_post_request_to_stac_api(result, collection_id):

response = requests.post(url, json=result, headers={"Content-Type": "application/json"})
response.raise_for_status()
print(response)
return response
except requests.exceptions.RequestException as e:
print(f"Error posting data for {collection_id}: {e}")
Expand Down
6 changes: 3 additions & 3 deletions apps/etl/management/commands/extract_pdc_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

from django.core.management.base import BaseCommand

# TODO we need to extract command from etl_tasks
from apps.etl.extraction.sources.pdc.extract import import_hazard_data
from apps.etl.etl_tasks.pdc import extract_and_transform_pdc_data

logger = logging.getLogger(__name__)

Expand All @@ -12,4 +11,5 @@ class Command(BaseCommand):
help = "Import data from pdc api"

def handle(self, *args, **options):
import_hazard_data()
# extract_and_transform_pdc_data.delay()
extract_and_transform_pdc_data()
7 changes: 1 addition & 6 deletions apps/etl/transform/sources/pdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,16 @@ class PDCTransformHandler(BaseTransformerHandler):
@classmethod
def get_schema_data(cls, extraction_obj: ExtractionData):
source_url = extraction_obj.url
print("---------->", extraction_obj)
data = {
"hazards_file_path": extraction_obj.parent.resp_data.path,
"exposure_timestamp": extraction_obj.metadata["exposure_id"],
"uuid": extraction_obj.metadata["uuid"],
"exposure_detail_file_path": extraction_obj.resp_data.path,
"geojson_file_path": "",
"geojson_file_path": extraction_obj.parent.metadata["geojson_file_path"] or None,
}

return cls.transformer_schema(source_url=source_url, data=json.dumps(data))

@staticmethod
@app.task
def task(extraction_id):
return PDCTransformHandler().handle_transformation(extraction_id)


# from apps.etl.extraction.sources.pdc.extract import get_hazard_details
82 changes: 82 additions & 0 deletions apps/etl/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import datetime
import logging

import requests
from django.conf import settings

logger = logging.getLogger(__name__)


Expand All @@ -10,3 +14,81 @@ def read_file_data(file):
with file.open() as data_file:
data = data_file.read()
return data


ARC_GIS_DEFAULT_PARAMS = {
"geometryType": "esriGeometryEnvelope",
"spatialRel": "esriSpatialRelIntersects",
"returnGeometry": True,
"returnTrueCurves": False,
"returnIdsOnly": False,
"returnCountOnly": False,
"returnZ": False,
"returnM": False,
"returnDistinctValues": False,
"returnExtentOnly": False,
"featureEncoding": "esriDefault",
"f": "geojson",
"text": "",
"objectIds": "",
"time": "",
"geometry": "",
"inSR": "",
"relationParam": "",
"outFields": "",
"maxAllowableOffset": "",
"geometryPrecision": "",
"outSR": "",
"having": "",
"orderByFields": "",
"groupByFieldsForStatistics": "",
"outStatistics": "",
"gdbVersion": "",
"historicMoment": "",
"resultOffset": "",
"resultRecordCount": "",
"queryByDistance": "",
"datumTransformation": "",
"parameterValues": "",
"rangeValues": "",
"quantizationParameters": "",
}


class AccessTokenManager:
def __init__(self, session: requests.Session):
self.token_expires = None
self.access_token = None
self.session = session
self.update() # Ensure the token is fetched on initialization

def get_access_token(self):
login_url = "https://partners.pdc.org/arcgis/tokens/generateToken"
data = {
"f": "json",
"username": settings.ARC_USERNAME,
"password": settings.ARC_PASSWORD,
"referer": "https://www.arcgis.com",
}
login_response = self.session.post(login_url, data=data, allow_redirects=True).json()
self.access_token = login_response["token"]
self.token_expires = datetime.datetime.fromtimestamp(login_response["expires"] / 1000)
return self.access_token, self.token_expires

def update(self):
if self.token_expires is None or datetime.datetime.now() >= self.token_expires:
self.get_access_token()
self.session.headers.update({"Authorization": f"Bearer {self.access_token}"})

def get_polygon(self, uuid):
url = settings.ARC_DOMAIN
response = self.session.post(
url=url,
data={
**ARC_GIS_DEFAULT_PARAMS,
"where": f"hazard_uuid IN ('{uuid}')",
"outFields": "hazard_uuid,type_id",
},
)

return response.json()
4 changes: 4 additions & 0 deletions helm/templates/config/secret.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ stringData:
GFD_CREDENTIAL: {{ required "secrets.GFD_CREDENTIAL" .Values.secrets.GFD_CREDENTIAL | quote }}
GFD_SERVICE_ACCOUNT: {{ required "secrets.GFD_SERVICE_ACCOUNT" .Values.secrets.GFD_SERVICE_ACCOUNT | quote }}
IFRC_DATA_URL: {{ required "secrets.IFRC_DATA_URL" .Values.secrets.IFRC_DATA_URL | quote }}
PDC_AUTHORIZATION_KEY: {{ required "secrets.PDC_AUTHORIZATION_KEY" .Values.secrets.PDC_AUTHORIZATION_KEY | quote }}
PDC_BASE_URL: {{ required "secrets.PDC_BASE_URL" .Values.secrets.PDC_BASE_URL | quote }}
ARC_USERNAME: {{ required "secrets.ARC_USERNAME" .Values.secrets.ARC_USERNAME | quote }}
ARC_PASSWORD: {{ required "secrets.ARC_PASSWORD" .Values.secrets.ARC_PASSWORD | quote }}

# Database
{{- if .Values.postgresql.enabled }}
Expand Down
6 changes: 6 additions & 0 deletions helm/values-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ secrets:
GFD_CREDENTIAL: dummy-gfd-cred
GFD_SERVICE_ACCOUNT: dummy-gfd-service-ac
IFRC_DATA_URL: dummy-url
PDC_AUTHORIZATION_KEY: dummy-key
PDC_BASE_URL: https://pdc.dummy.com
ARC_DOMAIN: https://arc.dummy.com
ARC_USERNAME: dummy-username
ARC_PASSWORD: dummy-password

secretsAdditional:
ENABLE_MAGIC_SECRET: "true"
MAGIC_KEY: to-much-fun
Expand Down
4 changes: 4 additions & 0 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ env:
AZURE_STORAGE_STATIC_CONTAINER:
# ETL Load config
EOAPI_DOMAIN:
PDC_BASE_URL:
PDC_AUTHORIZATION_KEY:
ARC_DOMAIN:
ARC_USERNAME:
# NOTE: Used to pass additional configs to api/worker containers
# NOTE: Not used by azure vault
envAdditional:
Expand Down
11 changes: 11 additions & 0 deletions main/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@
IFRC_DATA_URL=str,
PDC_BASE_URL=(str, "https://sentry.pdc.org/hp_srv/services"),
PDC_AUTHORIZATION_KEY=str,
ARC_DOMAIN=str,
ARC_USERNAME=str,
ARC_PASSWORD=str,
)

GFD_SERVICE_ACCOUNT = env("GFD_SERVICE_ACCOUNT")
Expand All @@ -97,6 +100,10 @@

EOAPI_DOMAIN = env("EOAPI_DOMAIN")

ARC_DOMAIN = env("ARC_DOMAIN")
ARC_USERNAME = env("ARC_USERNAME")
ARC_PASSWORD = env("ARC_PASSWORD")

TIME_ZONE = env("DJANGO_TIME_ZONE")

SECRET_KEY = env("DJANGO_SECRET_KEY")
Expand Down Expand Up @@ -351,6 +358,10 @@
"task": "apps.etl.etl_tasks.ext_and_transform_ifrcevent_latest_data",
"schedule": crontab(minute=0, hour=0), # This task execute daily at 12 AM (UTC)
},
"import_pdc_data": {
"task": "apps.etl.tasks.extract_pdc_data",
"schedule": crontab(minute=0, hour=0), # This task execute daily at 12 AM (UTC)
},
"load_data_to_stac": {
"task": "apps.etl.tasks.load_data",
"schedule": crontab(minute=0, hour=0), # TODO Set time to run this job
Expand Down

0 comments on commit 85642e7

Please sign in to comment.