From e76857772f3f0f4549f08502649864bbaa4e1ede Mon Sep 17 00:00:00 2001 From: Anuja Venkatachalam Date: Mon, 27 May 2024 12:07:05 +0530 Subject: [PATCH] Delete src/epipipeline/preprocess/dengue/functions directory deleting the old version of the functions directory as they have been incorporated in the pipeline in codebase --- .../dengue/functions/map_columns.py | 21 ------- .../dengue/functions/set_headers.py | 63 ------------------- .../dengue/functions/split_workbook_NVBDCP.py | 58 ----------------- 3 files changed, 142 deletions(-) delete mode 100644 src/epipipeline/preprocess/dengue/functions/map_columns.py delete mode 100644 src/epipipeline/preprocess/dengue/functions/set_headers.py delete mode 100644 src/epipipeline/preprocess/dengue/functions/split_workbook_NVBDCP.py diff --git a/src/epipipeline/preprocess/dengue/functions/map_columns.py b/src/epipipeline/preprocess/dengue/functions/map_columns.py deleted file mode 100644 index 37080fb..0000000 --- a/src/epipipeline/preprocess/dengue/functions/map_columns.py +++ /dev/null @@ -1,21 +0,0 @@ -import re - -def map_columns(colname:str, map_dict: dict) -> str: - """This function standardises column names using mapping in config file - - Args: - colname (str): Current column in DataFrame - map (dict): Dictionary mapping of preprocessed col names to standardised col names - - Returns: - str: Standardised column name - """ - - colname=re.sub(r"[^\w\s]","", colname.lstrip().rstrip().lower()) - colname=re.sub(r"(\s+)"," ", colname) - colname=re.sub(r"\s","_", colname) - - for key, values in map_dict.items(): - if colname in values: - return key - return colname diff --git a/src/epipipeline/preprocess/dengue/functions/set_headers.py b/src/epipipeline/preprocess/dengue/functions/set_headers.py deleted file mode 100644 index 848eb02..0000000 --- a/src/epipipeline/preprocess/dengue/functions/set_headers.py +++ /dev/null @@ -1,63 +0,0 @@ -import pandas as pd -import re - - -def search_header(L: list, pivot_col_name) -> bool: - """This function identifies the header row in a dataframe - - Args: - L (list): Current list of Dataframe headers - pivot_col_name (str): Column name) is that a stable header - - Returns: - bool: Whether header was identified - """ - assert isinstance(L,list) and isinstance(pivot_col_name,str), "Invalid input" - - header_search=True - pivot_col_name=pivot_col_name.lstrip().rstrip() - for column in L: - if re.search(pivot_col_name, str(column), re.IGNORECASE): - header_search=False - break - return header_search - -def set_headers(df: pd.DataFrame, pivot_column: str, col_start_index: int, col_start_value): - """_summary_ - - Args: - df (pd.DataFrame): DataFrame - pivot_column (str): Name of the Stable Column used to identify the header - col_start_index (int): Index of the column used to identify the dataframe start row (start at 0) - col_start_value (_type_): Str/Int value of col_start_index column to indicate start of dataframe - """ - assert isinstance(df, pd.DataFrame) and isinstance(pivot_column, str) and isinstance(col_start_index, int) and col_start_index in range(0,len(df)+1), "Invalid input" - -# sets the dataframe's row start - i=0 - while search_header(list(df.columns), pivot_column) and (i<6): # change pivot column here, if needed - df.columns=df.iloc[i,:] - i+=1 - df.drop(axis=0, index=[n for n in range(i)], inplace=True) - -# forward fills for nan columns after the correct columns are identified - for i in range(1,len(df.columns)): - if (re.search("Unnamed", str(df.columns[i]), re.IGNORECASE)) or (re.search("NaN", str(df.columns[i]), re.IGNORECASE)): - df.columns.values[i]=df.columns.values[i-1] - -# identify where data starts based on a column and value input - e.g., S.No. is a digit - start_index=df[df.iloc[:,col_start_index]==col_start_value].index[0] - 1 - -# upward fills merged columns after the correct columns are identified - for row in range(start_index): - row_data=df.iloc[i].to_list() - for i in range(len(row_data)): - if not re.search("nan",str(row_data[i]), re.IGNORECASE): - merge_col=re.sub(r"[\,\.\-\d\(\)\s\*\-\_]+", "", str(row_data[i])).lower() - df.columns.values[i]=df.columns.values[i]+merge_col - -# drops headers - df.drop(axis=0, index=[i for i in range(1, start_index+1)], inplace=True) - - return (df) - diff --git a/src/epipipeline/preprocess/dengue/functions/split_workbook_NVBDCP.py b/src/epipipeline/preprocess/dengue/functions/split_workbook_NVBDCP.py deleted file mode 100644 index 4b94849..0000000 --- a/src/epipipeline/preprocess/dengue/functions/split_workbook_NVBDCP.py +++ /dev/null @@ -1,58 +0,0 @@ -import pandas as pd -import os -import re -import datetime - -def split_workbook_NVBDCP(workbook_name:str) -> bool: - """This function splits the NVBDCP Raw Line list into individual csvs by source. - - Args: - workbook_name (str): Name of Raw Excel Workbook - - Returns: - bool: Whether all sheets have been processed - """ - - assert isinstance(workbook_name,str) and re.search(".xls", workbook_name), "Invalid input" - - processed=True - - wb=pd.ExcelFile(workbook_name) - - for sheet in wb.sheet_names: - if re.search("PCMC", sheet, re.IGNORECASE): - df=pd.read_excel(workbook_name, sheet_name=sheet) - path=os.path.join(os.curdir, "PCMC") - try: - os.mkdir(path) - except FileExistsError: # if directory exists, just save file - df.to_csv(os.path.join(path, sheet+".csv"), index=False) - else: - df.to_csv(os.path.join(path, sheet+".csv"), index=False) - - elif re.search("PMC", sheet, re.IGNORECASE): - df=pd.read_excel(workbook_name, sheet_name=sheet) - path=os.path.join(os.curdir, "PMC") - try: - os.mkdir(path) - except FileExistsError: # if directory exists, just save file - df.to_csv(os.path.join(path, sheet+".csv"), index=False) - else: - df.to_csv(os.path.join(path, sheet+".csv"), index=False) - - elif re.search("PR|Rural", sheet, re.IGNORECASE): - df=pd.read_excel(workbook_name, sheet_name=sheet) - path=os.path.join(os.curdir, "Pune Rural") - try: - os.mkdir(path) - except FileExistsError: # if directory exists, just save file - df.to_csv(os.path.join(path, sheet+".csv"), index=False) - else: - df.to_csv(os.path.join(path, sheet+".csv"), index=False) - else: - processed=False - log=open("error_log.txt", "a") - log.write(f"\nDateTime:{datetime.datetime.now()}") - log.write(f"\nCheck source for sheet {sheet}, and process manually.\n") - log.close() - return(processed) \ No newline at end of file