Skip to content

Commit

Permalink
Isolate Towns Fund transform modules by round and update ingest depen…
Browse files Browse the repository at this point in the history
…dencies
  • Loading branch information
wjrm500 committed Oct 15, 2024
1 parent 8f86de9 commit 4fa6b35
Show file tree
Hide file tree
Showing 11 changed files with 3,323 additions and 88 deletions.
4 changes: 2 additions & 2 deletions data_store/controllers/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def ingest(
if fund_name == "Towns Fund":
if not isinstance(ingest_dependencies, TFIngestDependencies):
raise ValueError("Ingest dependencies should be of type TFIngestDependencies")
transformed_data = ingest_dependencies.transform(workbook_data, reporting_round)
transformed_data = ingest_dependencies.transform(workbook_data)
tf_validate(
transformed_data,
workbook_data,
Expand All @@ -137,7 +137,7 @@ def ingest(
if error_messages:
raise ValidationError(error_messages)
coerce_data(tables, ingest_dependencies.extract_process_validate_schema)
transformed_data = ingest_dependencies.transform(tables, reporting_round)
transformed_data = ingest_dependencies.transform(tables)
except InitialValidationError as e:
return build_validation_error_response(initial_validation_messages=e.error_messages)
except OldValidationError as validation_error:
Expand Down
8 changes: 5 additions & 3 deletions data_store/controllers/ingest_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from data_store.transformation.pathfinders.pf_transform_r2 import transform as pf_r2_transform
from data_store.transformation.towns_fund.tf_transform_r3 import transform as tf_r3_transform
from data_store.transformation.towns_fund.tf_transform_r4 import transform as tf_r4_transform
from data_store.transformation.towns_fund.tf_transform_r5 import transform as tf_r5_transform
from data_store.transformation.towns_fund.tf_transform_r6 import transform as tf_r6_transform
from data_store.validation.initial_validation.checks import Check
from data_store.validation.initial_validation.schemas import (
PF_ROUND_1_INIT_VAL_SCHEMA,
Expand Down Expand Up @@ -54,7 +56,7 @@ class IngestDependencies(ABC):

initial_validation_schema: list[Check]
table_to_load_function_mapping: dict[str, Callable]
transform: Callable[[dict[str, pd.DataFrame], int], dict[str, pd.DataFrame]]
transform: Callable[[dict[str, pd.DataFrame]], dict[str, pd.DataFrame]]


@dataclass
Expand Down Expand Up @@ -120,7 +122,7 @@ def ingest_dependencies_factory(fund: str, reporting_round: int) -> IngestDepend
)
case ("Towns Fund", 5):
return TFIngestDependencies(
transform=tf_r4_transform,
transform=tf_r5_transform,
validation_schema=TF_ROUND_4_VAL_SCHEMA,
initial_validation_schema=TF_ROUND_5_INIT_VAL_SCHEMA,
messenger=TFMessenger(),
Expand All @@ -129,7 +131,7 @@ def ingest_dependencies_factory(fund: str, reporting_round: int) -> IngestDepend
)
case ("Towns Fund", 6):
return TFIngestDependencies(
transform=tf_r4_transform,
transform=tf_r6_transform,
validation_schema=TF_ROUND_4_VAL_SCHEMA,
initial_validation_schema=TF_ROUND_6_INIT_VAL_SCHEMA,
messenger=TFMessenger(),
Expand Down
2 changes: 1 addition & 1 deletion data_store/transformation/pathfinders/pf_transform_r1.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
}


def transform(df_dict: dict[str, pd.DataFrame], reporting_round: int) -> dict[str, pd.DataFrame]:
def transform(df_dict: dict[str, pd.DataFrame]) -> dict[str, pd.DataFrame]:
"""
Transform the data extracted from the Excel file into a format that can be loaded into the database.
Expand Down
2 changes: 1 addition & 1 deletion data_store/transformation/pathfinders/pf_transform_r2.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
}


def transform(df_dict: dict[str, pd.DataFrame], reporting_round: int) -> dict[str, pd.DataFrame]:
def transform(df_dict: dict[str, pd.DataFrame]) -> dict[str, pd.DataFrame]:
"""
Transform the data extracted from the Excel file into a format that can be loaded into the database.
Expand Down
41 changes: 9 additions & 32 deletions data_store/transformation/towns_fund/tf_transform_r3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""
Methods specifically for extracting data from Towns Fund Round 3 reporting template used for Reporting Round
1 October 2022 to 31 March 2023.
Methods used to extract and transform data from the Towns Fund Round 3 reporting template.
"""

import typing
Expand All @@ -25,7 +24,7 @@
)


def transform(df_ingest: dict[str, pd.DataFrame], reporting_round: int = 3) -> dict[str, pd.DataFrame]:
def transform(df_ingest: dict[str, pd.DataFrame]) -> dict[str, pd.DataFrame]:
"""
Extract data from Towns Fund Reporting Template into column headed Pandas DataFrames.
Expand Down Expand Up @@ -72,11 +71,7 @@ def transform(df_ingest: dict[str, pd.DataFrame], reporting_round: int = 3) -> d
df_ingest["4a - Funding Profiles"],
project_lookup,
)
towns_fund_extracted["Funding"] = extract_funding_data(
df_ingest["4a - Funding Profiles"],
project_lookup,
reporting_round,
)
towns_fund_extracted["Funding"] = extract_funding_data(df_ingest["4a - Funding Profiles"], project_lookup)
towns_fund_extracted["Private Investments"] = extract_psi(df_ingest["4b - PSI"], project_lookup)
towns_fund_extracted["Output_Data"] = extract_outputs(df_ingest["5 - Project Outputs"], project_lookup)
towns_fund_extracted["Outputs_Ref"] = extract_output_categories(towns_fund_extracted["Output_Data"])
Expand Down Expand Up @@ -339,7 +334,7 @@ def extract_programme_progress(df_data: pd.DataFrame, programme_id: str) -> pd.D
return df_data


def extract_project_progress(df_data: pd.DataFrame, project_lookup: dict, reporting_round: int = 3) -> pd.DataFrame:
def extract_project_progress(df_data: pd.DataFrame, project_lookup: dict) -> pd.DataFrame:
"""
Extract Project progress rows from a DataFrame.
Expand All @@ -351,8 +346,7 @@ def extract_project_progress(df_data: pd.DataFrame, project_lookup: dict, report
:param round_four: if True, ingest two additional columns
:return: A new DataFrame containing the extracted project progress rows.
"""
# if round 4 or 5, ingest two additional columns
df_data = df_data.iloc[18:39, 2:15] if reporting_round >= 4 else df_data.iloc[18:39, 2:13]
df_data = df_data.iloc[18:39, 2:13]
df_data = df_data.rename(columns=df_data.iloc[0].to_dict()).iloc[1:]
df_data = drop_empty_rows(df_data, ["Project Name"])
df_data["Project ID"] = df_data["Project Name"].map(project_lookup)
Expand Down Expand Up @@ -521,7 +515,7 @@ def extract_funding_comments(df_input: pd.DataFrame, project_lookup: dict) -> pd
return df_fund_comments


def extract_funding_data(df_input: pd.DataFrame, project_lookup: dict, reporting_round: int = 3) -> pd.DataFrame:
def extract_funding_data(df_input: pd.DataFrame, project_lookup: dict) -> pd.DataFrame:
"""
Extract funding data (excluding comments) from a DataFrame.
Expand Down Expand Up @@ -623,14 +617,7 @@ def extract_funding_data(df_input: pd.DataFrame, project_lookup: dict, reporting
df_funding.drop(unused_mask.index, inplace=True)

if fund_type == "HS":
# Round 3 collects up to H2 23/24, round 4 collects up to H1 24/25, round 5 collects up to H2 24/25
start_date_cut_off_mapping = {
3: datetime(2023, 10, 1),
4: datetime(2024, 4, 1),
5: datetime(2024, 10, 1),
6: datetime(2025, 4, 1),
}
start_date_cut_off = start_date_cut_off_mapping[reporting_round]
start_date_cut_off = datetime(2023, 10, 1)
unused_fhsf_mask = df_funding.loc[
# drop unused FHSF Questions
(
Expand Down Expand Up @@ -702,12 +689,9 @@ def extract_psi(df_psi: pd.DataFrame, project_lookup: dict) -> pd.DataFrame:
return df_psi


def extract_risks(
df_risk: pd.DataFrame, project_lookup: dict, programme_id: str, reporting_round: int = 3
) -> pd.DataFrame:
def extract_risks(df_risk: pd.DataFrame, project_lookup: dict, programme_id: str) -> pd.DataFrame:
"""
Extract Programme specific risk register rows from a DataFrame.
Input dataframe is parsed from Excel spreadsheet: "Towns Fund reporting template".
Specifically Risk Register work sheet, parsed as dataframe.
Expand Down Expand Up @@ -740,14 +724,7 @@ def extract_risks(
]
df_risk_all.drop(["Pre-mitigated Raw Total Score", "Post-mitigated Raw Total Score"], axis=1, inplace=True)
df_risk_all.columns = pd.Index(risk_columns)
if reporting_round == 3:
# Round 3 ingests were completed using the behaviour of discarding any rows with no Risk Name
# This is preserved to ensure previously valid R3 subs remain valid
df_risk_all = drop_empty_rows(df_risk_all, ["RiskName"])
else:
# Round 4 and 5 ingests behaviour requires all non id columns to be empty in order to drop the row
drop_if_all_empty = [column for column in risk_columns if column not in ["Programme ID", "Project ID"]]
df_risk_all = drop_empty_rows(df_risk_all, drop_if_all_empty)
df_risk_all = drop_empty_rows(df_risk_all, ["RiskName"])
return df_risk_all


Expand Down
Loading

0 comments on commit 4fa6b35

Please sign in to comment.