Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
f1a951f
Add setup for straberry django graphql
sandeshit Apr 24, 2025
6c2689d
Add dataloaders,enums,filters and queries
sandeshit Apr 24, 2025
b589902
Add utility files for strawberry and graphql
sandeshit Apr 24, 2025
cb33a9f
Updated uv files to include the strabery graphql and drf
sandeshit Apr 29, 2025
c71ef0b
Queries for extraction data
sandeshit Apr 29, 2025
c8448ac
Update the enums for extraction table
sandeshit Apr 29, 2025
b1edce5
Fix linter issues in graphql file
sandeshit Apr 29, 2025
992fd09
Include the django debug toolbar
sandeshit Apr 29, 2025
4e2becc
Fix pre-commit issues
sandeshit Apr 29, 2025
4d6732c
Query for all extraction data
sandeshit Apr 29, 2025
4898297
Created graphql schema and command
sandeshit Apr 29, 2025
ace11e4
Add pagination on query and filter
sandeshit Apr 29, 2025
5443faa
Rename the enums
sandeshit Apr 30, 2025
b6389a6
Query for count trace id
sandeshit Apr 30, 2025
c2f74bb
Add slicing to query
sandeshit Apr 30, 2025
ec549a7
(Fix):pagination query for all three tables
sandeshit Apr 30, 2025
8f0687c
Fix(graphql): remove depricated code
sandeshit May 5, 2025
fd56d78
feature(cors): setup cors
sandeshit May 5, 2025
4c92984
fix(query): add authentication
sandeshit May 5, 2025
3d4bc4f
feat(app): add user app and model
sandeshit May 8, 2025
fd46df4
fix(schema): add mutation in schema
sandeshit May 8, 2025
aa6996e
fix(migrations): separate user migration in two steps
sandeshit May 10, 2025
df189ff
chore(strawberry): update base util files
sandeshit May 12, 2025
4663abb
feat(mutation): add mutation to accept trace id as input
sandeshit May 13, 2025
966c54d
feat(api): add query for unique counts
sandeshit May 20, 2025
b6054ac
Add mutation to retrigger extraction, transformation.
Rup-Narayan-Rajbanshi May 14, 2025
a3e0e99
add file input_types.py for graphql mutations.
Rup-Narayan-Rajbanshi May 21, 2025
d88ebcf
feat(etl): add query for item count by source
sandeshit May 22, 2025
9bd6afb
fix(filter): add custom filter for source
sandeshit May 22, 2025
26fa553
fix(filter) add enum as the filter
sandeshit May 26, 2025
e5f8b4d
Add sources into extraction retrigger, and transformation retrigger.
Rup-Narayan-Rajbanshi May 30, 2025
a2f041e
retrigger Gdacs pipeline
Rup-Narayan-Rajbanshi Jun 4, 2025
a973296
add retrigger for USGS pipeline
Rup-Narayan-Rajbanshi Jun 12, 2025
6043eeb
regrigger pdc pipeline.
Rup-Narayan-Rajbanshi Jun 16, 2025
4e002c7
remove celery chain in remaining sources
Rup-Narayan-Rajbanshi Jun 17, 2025
3fd2387
chore: add response type for retrigger mutation
Rup-Narayan-Rajbanshi Jun 23, 2025
847c79c
fix: rename pdc latest extraction function
Rup-Narayan-Rajbanshi Jun 25, 2025
ae771fd
feat: add stats into dashboard
Rup-Narayan-Rajbanshi Jun 26, 2025
c88d4a8
fix: fix issue after merge.
Rup-Narayan-Rajbanshi Jun 27, 2025
d25eab3
fix: fix usgs proxy
Rup-Narayan-Rajbanshi Jun 30, 2025
e7d782b
Merge pull request #303 from IFRCGo/fix/usgs-proxy
sandeshit Jul 1, 2025
c0a70f8
fix(pdc): update pagination (#304)
sudan45 Jul 1, 2025
86bc87b
fix: fix type for trace_id, and transform_id in retrigger api
Rup-Narayan-Rajbanshi Jul 1, 2025
17430fa
fix(queue): update base queue name from extraction to default
sudan45 Jul 1, 2025
1eb30a9
fix(queue): add queue in pdc
sudan45 Jul 1, 2025
3fbcca6
fix(pdc): change geojson path (#307)
sudan45 Jul 4, 2025
561f9cb
fix: fix max retry to fail (#314)
Rup-Narayan-Rajbanshi Jul 10, 2025
d6f2636
performance(sentry): remove ratelimiterror in sentry
sudan45 Jul 16, 2025
d6b16a3
fix(query): optimized the N+1 queries
sudan45 Jul 4, 2025
78cd360
fix(performance): update n+1 queries in pystac
sudan45 Jul 14, 2025
e96fee7
performance(query): optimize file size n+1
sudan45 Jul 16, 2025
e688f67
fix(gdacs): fix gdacs retrigger (#319)
Rup-Narayan-Rajbanshi Jul 21, 2025
b6ebd91
fix(exceptions): add timepout exception
sudan45 Jul 22, 2025
88ee875
fixup! fix(exceptions): add timepout exception
sudan45 Jul 22, 2025
7bbe892
fix(exceptions): add timepout exception in emdat
sudan45 Jul 22, 2025
96e9264
fix(exceptions): add timepout exception in emdat
sudan45 Jul 22, 2025
77bd707
fix(performance): handle nonetype for gdacs" (#324)
sudan45 Jul 24, 2025
f5b90c2
chore(retrigger): add validation and task id and status as response.
Rup-Narayan-Rajbanshi Jul 3, 2025
cde37f3
fix(performance):handle error for missing response type
sudan45 Jul 24, 2025
d528452
fixup! fix(performance):handle error for missing response type
sudan45 Jul 24, 2025
6c1b072
fixup! fix(performance):handle error for missing response type
sudan45 Jul 24, 2025
f467517
fix(gdacs):handle geometry data
sudan45 Jul 24, 2025
ae4290b
fix(gdacs): update geometry from retrigger
sudan45 Jul 25, 2025
7437472
update(pystac)
sudan45 Jul 25, 2025
4533296
Chore/cleanup (#325)
sandeshit Jul 25, 2025
c945bbc
Project/e2e tests (#344)
Shhhhhubh Sep 8, 2025
d2c307a
CronJob task handler (#347)
ranjan-stha Oct 14, 2025
6cf3d92
Fix(CronJob): Add the sentry configs (#348)
ranjan-stha Oct 14, 2025
da115e3
Fix(helm values): update the replica count for workers (#350)
ranjan-stha Oct 15, 2025
fd7b3b1
Fix(Queue): add a new queue to load the stac items (#351)
ranjan-stha Oct 15, 2025
7971326
Feat(celery): add the celery lock to load the STAC items to STAC serv…
ranjan-stha Oct 16, 2025
fd56b12
Fix(CronJob): update the cronjob option field
ranjan-stha Oct 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,6 @@ dmypy.json

# editors
.idea/

# FIXME: This is temporary
assets/
23 changes: 16 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ env:
COMPOSE_FILE: docker-compose.yml:gh-docker-compose.yml
DJANGO_SECRET_KEY: "ci-test-insecure-django-secret-key"
# ETL app configs
GEOCODER_URL: https://geocoder.dummy.com
GEOCODER_URL: http://geocoding-service:8001

jobs:
pre_commit_checks:
Expand Down Expand Up @@ -56,7 +56,7 @@ jobs:
steps:
- uses: actions/checkout@main
with:
submodules: true
submodules: recursive

- name: Login to GitHub Container Registry
uses: docker/login-action@v3
Expand Down Expand Up @@ -118,7 +118,10 @@ jobs:
env:
DOCKER_IMAGE_BACKEND: ${{ steps.prep.outputs.tagged_image }}
run: |
touch .env && docker compose run --rm web bash -c 'wait-for-it db:5432 && ./manage.py check'
touch .env
docker compose run --rm web bash -c 'wait-for-it db:5432 && ./manage.py check'
# Another wait for geocoding-service (Move this to wait-for-it using custom python script)
sleep 60

- name: 🕮 Validate if there are no pending django migrations.
env:
Expand All @@ -129,10 +132,16 @@ jobs:
exit 1;
}

# - name: 🤞 Run Test 🧪 & Publish coverage to code climate
# env:
# DOCKER_IMAGE_BACKEND: ${{ steps.prep.outputs.tagged_image }}
# run: docker compose run --rm web /code/scripts/run_tests.sh
- name: 🤞 Run Test 🧪 & Publish coverage to code climate
env:
DOCKER_IMAGE_BACKEND: ${{ steps.prep.outputs.tagged_image }}
run: |
docker compose up -d web geocoding-service
# Wait for 60 seconds to ensure services are initialized
echo "Waiting for 30 seconds for services to initialize..."
sleep 30
# Run the tests
docker compose exec web pytest --cov-report term --cov=apps apps/etl/tests/sources/*.py

- name: 🐳 Docker push
if: ${{ inputs.push_docker_image }}
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,6 @@ dmypy.json
assets/
media/
*.gpkg

apps/etl/dataset
.DS_Store
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ RUN --mount=type=cache,target=/root/.cache/uv \
apt-get update -y \
&& apt-get install -y --no-install-recommends \
# Build required packages
build-essential libgdal-dev \
gcc libc-dev gdal-bin libproj-dev \
# Helper packages
procps \
Expand All @@ -30,6 +31,7 @@ RUN --mount=type=cache,target=/root/.cache/uv \
&& uv sync --frozen --no-install-project --all-groups \
# Clean-up
&& apt-get remove -y gcc libc-dev libproj-dev \
build-essential libgdal-dev \
&& apt-get autoremove -y \
&& rm -rf /var/lib/apt/lists/*

Expand Down
43 changes: 43 additions & 0 deletions apps/etl/dataloaders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import typing

from asgiref.sync import sync_to_async
from django.db import models
from django.utils.functional import cached_property
from strawberry.dataloader import DataLoader

from apps.etl.models import ExtractionData, PyStacLoadData

DjangoModel = typing.TypeVar("DjangoModel", bound=models.Model)


def load_model_objects(
Model: typing.Type[DjangoModel],
keys: list[int],
) -> list[DjangoModel]:
qs = Model.objects.filter(id__in=keys)
_map = {obj.pk: obj for obj in qs}
return [_map[key] for key in keys]


if typing.TYPE_CHECKING:
from apps.etl.types import ExtractionDataType, PyStacLoadDataType


def load_extraction(keys: list[int]) -> list["ExtractionDataType"]:
return load_model_objects(ExtractionData, keys) # type: ignore[reportReturnType]


def load_pystac(keys: list[int]) -> list["PyStacLoadDataType"]:
return load_model_objects(PyStacLoadData, keys) # type: ignore[reportReturnType]


class ExtractionDataLoader:
@cached_property
def load_data(self):
return DataLoader(load_fn=sync_to_async(load_extraction))


class PystacDataLoader:
@cached_property
def load_data(self):
return DataLoader(load_fn=sync_to_async(load_pystac))
24 changes: 24 additions & 0 deletions apps/etl/enums.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import strawberry

from apps.etl.models import ExtractionData, PyStacLoadData, Status
from utils.strawberry.enums import get_enum_name_from_django_field

DataStatusTypeEnum = strawberry.enum(Status, name="DataStatusTypeEnum")
ExtractionValidationTypeEnum = strawberry.enum(
ExtractionData.ValidationStatus, name="ExtractionDataValidationStatusTypeEnum"
)
SourceTypeEnum = strawberry.enum(ExtractionData.Source, name="SourceTypeEnum")
PyStacLoadDataStatusEnum = strawberry.enum(PyStacLoadData.Status, name="PyStacLoadDataStatusEnum")
PyStacLoadDataItemTypeEnum = strawberry.enum(PyStacLoadData.ItemType, name="PyStacLoadDataItemTypeEnum")


enum_map = {
get_enum_name_from_django_field(field): enum
for field, enum in (
(ExtractionData.source, SourceTypeEnum),
(ExtractionData.status, DataStatusTypeEnum),
(ExtractionData.source_validation_status, ExtractionValidationTypeEnum),
(PyStacLoadData.status, PyStacLoadDataStatusEnum),
(PyStacLoadData.item_type, PyStacLoadDataItemTypeEnum),
)
}
8 changes: 2 additions & 6 deletions apps/etl/etl_tasks/gidd.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from celery import chain, shared_task
from celery import shared_task

from apps.etl.extraction.sources.gidd.extract import GIDDExtraction, GIDDExtractionMetadata, GIDDExtractionMetadataType
from apps.etl.transform.sources.gidd import GIDDTransformHandler
from main.configs import etl_config


Expand All @@ -12,7 +11,4 @@ def ext_and_transform_gidd_latest_data():
extraction_obj = GIDDExtraction.init_extraction(
metadata=GIDDExtractionMetadata(url=url, type=GIDDExtractionMetadataType.QUERY), add_to_queue=False
)
chain(
GIDDExtraction.task.s(extraction_obj.id),
GIDDTransformHandler.task.s(),
).apply_async()
GIDDExtraction.task.delay(extraction_obj.id)
8 changes: 2 additions & 6 deletions apps/etl/etl_tasks/glide.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime, timedelta

from celery import chain, shared_task
from celery import shared_task

from apps.etl.extraction.sources.glide.extract import (
GlideExtraction,
Expand All @@ -9,7 +9,6 @@
GlideExtractionParamsMetadata,
)
from apps.etl.models import ExtractionData, HazardType
from apps.etl.transform.sources.glide import GlideTransformHandler
from main.configs import etl_config

GLIDE_HAZARDS = [
Expand Down Expand Up @@ -66,11 +65,8 @@ def _ext_and_transform_glide_historical_data(hazard_type: HazardType):
),
add_to_queue=False,
)
chain(
GlideExtraction.task.s(extraction_object.id),
GlideTransformHandler.task.s(),
).apply_async()

GlideExtraction.task.delay(extraction_object.id)
start_date = end_date + timedelta(days=1)


Expand Down
8 changes: 4 additions & 4 deletions apps/etl/etl_tasks/idu.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import logging

from celery import chain, shared_task
from celery import shared_task

from apps.etl.extraction.sources.idu.extract import IDUExtraction, IDUExtractionMetadata, IDUExtractionMetadataType
from apps.etl.transform.sources.idu import IDUTransformHandler
from main.configs import etl_config

logger = logging.getLogger(__name__)
Expand All @@ -16,7 +15,7 @@ def ext_and_transform_idu_historical_data():
extraction_obj = IDUExtraction.init_extraction(
metadata=IDUExtractionMetadata(url=url, type=IDUExtractionMetadataType.QUERY), add_to_queue=False
)
chain(IDUExtraction.task.s(extraction_obj.id), IDUTransformHandler.task.s()).apply_async()
IDUExtraction.task.delay(extraction_obj.id)


@shared_task
Expand All @@ -27,4 +26,5 @@ def ext_and_transform_idu_latest_data():
extraction_obj = IDUExtraction.init_extraction(
metadata=IDUExtractionMetadata(url=url, type=IDUExtractionMetadataType.QUERY), add_to_queue=False
)
chain(IDUExtraction.task.s(extraction_obj.id), IDUTransformHandler.task.s()).apply_async()

IDUExtraction.task.delay(extraction_obj.id)
37 changes: 21 additions & 16 deletions apps/etl/etl_tasks/pdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


@shared_task
def extract_and_transform_pdc_data():
def extract_and_transform_pdc_latest_data():
data_url = f"{etl_config.PDC_SENTRY_BASE_URL}/hp_srv/services/hazards/t/json/search_hazard"
pdc_latest_extraction = ExtractionData.objects.filter(
source=ExtractionData.Source.PDC,
Expand All @@ -28,21 +28,26 @@ def extract_and_transform_pdc_data():
else:
start_date = datetime.strptime(str(etl_config.PDC_START_DATE), "%Y-%m-%d")

data = PdcHazardInputMetadata(
pagination=Pagination(page=1, pagesize=100),
restrictions=[
[
Restriction(searchType="GREATER_THAN", createDate=str(int(start_date.timestamp() * 1000))), # type: ignore
]
],
)
PDCExtractionV2.init_extraction(
metadata=PDCExtractionMetadata(
hazard=data,
url=data_url,
type=PDCExtractionMetaDataType.HAZARD,
),
)
end_date = datetime.now()

for event in HAZARD_TYPE_MAP.keys():
data = PdcHazardInputMetadata(
pagination=Pagination(page=1, pagesize=100),
restrictions=[
[
Restriction(searchType="GREATER_THAN", createDate=str(int(start_date.timestamp() * 1000))), # type: ignore
Restriction(searchType="EQUALS", typeId=event), # type: ignore
Restriction(searchType="LESS_THAN", createDate=str(int(end_date.timestamp() * 1000))), # type: ignore
]
],
)
PDCExtractionV2.init_extraction(
metadata=PDCExtractionMetadata(
hazard=data,
url=data_url,
type=PDCExtractionMetaDataType.HAZARD,
),
)


def extract_and_transform_historical_pdc_data():
Expand Down
10 changes: 8 additions & 2 deletions apps/etl/etl_tasks/usgs.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from datetime import date, timedelta
from datetime import date, datetime, timedelta

from celery import shared_task

Expand Down Expand Up @@ -30,7 +30,13 @@ def ext_and_transform_usgs_latest_data():
else:
start_date = etl_config.USGS_START_DATE

url = f"{etl_config.USGS_DATA_URL}/fdsnws/event/1/query?format=geojson&starttime={start_date.strftime('%Y-%m-%d')}"
end_date = datetime.now().date()

url = (
f"{etl_config.USGS_DATA_URL}/fdsnws/event/1/query?format=geojson"
f"&starttime={start_date.strftime('%Y-%m-%d')}"
f"&endtime={end_date.strftime('%Y-%m-%d')}"
)

USGSExtraction.init_extraction(
metadata=USGSExtractionMetadata(
Expand Down
Loading
Loading