Skip to content

Commit

Permalink
Clean and improve the importer task
Browse files Browse the repository at this point in the history
- add TypedDict to better understand the data
- clean some fields
  • Loading branch information
tudoramariei committed Feb 22, 2024
1 parent 5b13122 commit facdae0
Showing 1 changed file with 56 additions and 14 deletions.
70 changes: 56 additions & 14 deletions backend/importer/tasks/processor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import csv
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, TypedDict, Union

from django.apps import apps as django_apps
from django.conf import settings
Expand All @@ -15,9 +15,32 @@
logger = logging.getLogger(__name__)


def parse_imported_date(date_str: str):
class ImporterPostFieldType(TypedDict):
# the field that will be used as an identifier to find the object
identifier: str

# a dictionary with the fields that need to be altered/processed by a function
fields: Dict[str, callable]


class ImporterType(TypedDict):
# the header that will be used in case the CSV file does not have a one
default_header: str

# a dictionary with the fields that need to be altered/processed by a function
fields_mapping: Dict[str, callable]

# a list of fields that should be ignored
ignore_fields: List[str]

# a dictionary with the fields that need to be altered by a function after the import
# i.e., adding a many-to-many relationship
fields_post: ImporterPostFieldType


def parse_imported_date(date_str: str) -> datetime:
timestamp = float(date_str)
datetime_obj_with_tz = make_aware(datetime.fromtimestamp(timestamp))
datetime_obj_with_tz: datetime = make_aware(datetime.fromtimestamp(timestamp))

return datetime_obj_with_tz

Expand Down Expand Up @@ -106,6 +129,24 @@ def map_county(county: str) -> str:
return county_mapping.get(county, county)


def clean_bank_account(value: str) -> str:
value = "".join(value.split()).strip().upper()

if len(value) != 24:
logger.warning(f"Invalid bank account number: {value}")

return value


def clean_registration(value: str) -> str:
value = "".join(value.split()).strip().upper()

if (value.startswith("RO") and len(value) != 10) or (not value.startswith("RO") and len(value)) != 8:
logger.warning(f"Invalid registration number: {value}")

return value


def ngo_slugs_to_ids(ngo_slugs: str) -> List[int]:
ngo_slugs = ngo_slugs.split(",")
ngo_ids = []
Expand All @@ -119,18 +160,19 @@ def ngo_slugs_to_ids(ngo_slugs: str) -> List[int]:
return ngo_ids


IMPORT_DETAILS = {
IMPORT_DETAILS: Dict[str, ImporterType] = {
ImportModelTypeChoices.NGO.value: {
"default_header": (
"slug,has_special_status,description,name,bank_account,registration_number,address,county,"
"active_region,phone,website,email,is_verified,date_created,is_active,logo_url,is_accepting_forms"
),
"fields_mapping": {
"date_created": parse_imported_date,
"bank_account": lambda x: x.replace(" ", ""),
"address": lambda x: x.strip(),
"county": map_county,
"active_region": map_county,
"bank_account": clean_bank_account,
"registration_number": clean_registration,
},
"ignore_fields": [],
"fields_post": {},
Expand All @@ -142,9 +184,9 @@ def ngo_slugs_to_ids(ngo_slugs: str) -> List[int]:
"fields_mapping": {
"date_created": parse_imported_date,
"date_updated": parse_imported_date,
"first_name": lambda x: x[:150],
"last_name": lambda x: x[:150],
"email": lambda x: x[:150],
"first_name": lambda x: x[:150].strip(),
"last_name": lambda x: x[:150].strip(),
"email": lambda x: x[:150].strip(),
},
"ignore_fields": ["old_id"],
"fields_post": {},
Expand Down Expand Up @@ -173,7 +215,7 @@ def ngo_slugs_to_ids(ngo_slugs: str) -> List[int]:
}


def process_import_task(import_id):
def process_import_task(import_id) -> ImportJob:
logger.info(f"Processing import {import_id}")

import_obj = ImportJob.objects.get(id=import_id)
Expand All @@ -191,7 +233,7 @@ def process_import_task(import_id):
return import_obj


def run_import(import_obj: ImportJob):
def run_import(import_obj: ImportJob) -> None:
raw_data: List[Dict] = extract_data_from_csv(import_obj)

import_model_name = ImportModelTypeChoices(import_obj.import_type).value
Expand All @@ -212,12 +254,12 @@ def run_import(import_obj: ImportJob):
import_obj.save()


def import_data_into_db(import_obj: ImportJob, import_data: Dict[str, Union[List[Dict], Dict]], import_model):
def import_data_into_db(import_obj: ImportJob, import_data: Dict[str, Union[List[Dict], Dict]], import_model) -> None:
batch_number = 0
items_imported = 0
items_successfully_imported = 0

post_import_details = IMPORT_DETAILS[import_obj.import_type]["fields_post"]
post_import_details = IMPORT_DETAILS[import_obj.import_type].get("fields_post", {})
post_data_identifier = post_import_details.get("identifier", None)

existing_items: List[Any] = []
Expand Down Expand Up @@ -290,13 +332,13 @@ def process_raw_data(
ngos_slug_ids: Dict = dict(Ngo.objects.all().values_list("slug", "id"))

post_data = {}
post_data_identifier = field_details["fields_post"].get("identifier", None)
post_data_identifier = field_details.get("fields_post", {}).get("identifier", None)

processed_data: List[Dict] = []
for index, raw_item in enumerate(raw_data):
item = {}
for field, value in raw_item.items():
if field in field_details["ignore_fields"]:
if field in field_details.get("ignore_fields", []):
continue
elif field in field_details.get("fields_post", {}).get("fields", {}).keys():
post_data[item[post_data_identifier]] = {field: value}
Expand Down

0 comments on commit facdae0

Please sign in to comment.