From 0867f1f376a3c8b6ca7e79a41e05617018412b43 Mon Sep 17 00:00:00 2001 From: venu-sambarapu-DS Date: Tue, 23 Apr 2024 13:43:24 +0530 Subject: [PATCH] fix/metadata_from_g_sheet: Added checks suggested in google sheet --- app/core/config.py | 80 +++++++++--- app/utils/column_mapping.py | 14 +-- app/utils/common.py | 26 ++++ app/utils/metadata.py | 245 +++++++++++++++++++++++++----------- 4 files changed, 264 insertions(+), 101 deletions(-) diff --git a/app/core/config.py b/app/core/config.py index 35011e6..5a66f3f 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -60,6 +60,9 @@ class Settings(BaseSettings): SERVICE_ACCOUNT_CONF: Dict[str, str] = {"": ""} GSHEET_SCOPES: List[str] = ["https://www.googleapis.com/auth/spreadsheets"] + # Metadata File Parameters + METADATA_COLUMN_ORDER_STRING = "" + class Config: env_file = ".env" @@ -397,7 +400,7 @@ class MetadataSettings(BaseSettings): SECTOR_KEYWORD = "sector" ORGANIZATION_KEYWORD = "organization" - SHORT_FORM_KEYWORD = "short_form" + # SHORT_FORM_KEYWORD = "short_form" DESCRIPTION_KEYWORD = "description" DATASET_NAME_FOR_FACTLY_KEYWORD = "dataset_name_for_factly" @@ -412,66 +415,107 @@ class MetadataSettings(BaseSettings): VARIABLE_MEASURED_KEYWORD = "variable_measured" DATA_NEXT_UPDATE_KEYWORD = "data_next_update" SOURCE_KEYWORD = "source" - SECTOR_EXPECTATION = { + DATASET_NAME_FOR_FACTLY_EXPECTATION = { "data_asset_type": None, - "expectation_suite_name": "sector_expectation_suite", + "expectation_suite_name": "dataset_name_for_factly_expectation_suite", "expectations": [ { - "expectation_type": "expect_column_values_to_be_in_set", + "expectation_type": "expect_column_value_lengths_to_be_between", "kwargs": { - "column": "sector", - "value_set": [], + "column": "dataset_name_for_factly", + "min_value": 5, + "max_value": 200, "result_format": "SUMMARY", }, "meta": { - "expectation_name": "Sector Name in set of values", + "expectation_name": "Dataset Name For Factly Length", "cleaning_pdf_link": "https://wp.me/ad1WQ9-dvg", - "expectation_error_message": "Sector Name should be from the Data Dictionary", + "expectation_error_message": "Dataset Name For Factly Length should be less than 200", }, } ], } - ORGANIZATION_EXPECTATION = { + DESCRIPTION_EXPECTATION = { "data_asset_type": None, - "expectation_suite_name": "organization_expectation_suite", + "expectation_suite_name": "description_expectation_suite", + "expectations": [ + { + "expectation_type": "expect_column_value_lengths_to_be_between", + "kwargs": { + "column": "description", + "min_value": 50, + "max_value": 5000, + "result_format": "SUMMARY", + }, + "meta": { + "expectation_name": "Description Length", + "cleaning_pdf_link": "https://wp.me/ad1WQ9-dvg", + "expectation_error_message": "Description should be grater than 50", + }, + } + ], + } + SECTOR_EXPECTATION = { + "data_asset_type": None, + "expectation_suite_name": "sector_expectation_suite", "expectations": [ { "expectation_type": "expect_column_values_to_be_in_set", "kwargs": { - "column": "organization", + "column": "sector", "value_set": [], "result_format": "SUMMARY", }, "meta": { - "expectation_name": "Organization Name in set of values", + "expectation_name": "Sector Name in set of values", "cleaning_pdf_link": "https://wp.me/ad1WQ9-dvg", - "expectation_error_message": "Organization Name should be from the Data Dictionary", + "expectation_error_message": "Sector Name should be from the Data Dictionary", }, } ], } - SHORT_FORM_EXPECTATION = { + ORGANIZATION_EXPECTATION = { "data_asset_type": None, - "expectation_suite_name": "short_form_expectation_suite", + "expectation_suite_name": "organization_expectation_suite", "expectations": [ { "expectation_type": "expect_column_values_to_be_in_set", "kwargs": { - "column": "short_form", + "column": "organization", "value_set": [], "result_format": "SUMMARY", }, "meta": { - "expectation_name": "Short Form in set of values", + "expectation_name": "Organization Name in set of values", "cleaning_pdf_link": "https://wp.me/ad1WQ9-dvg", - "expectation_error_message": "Short Form should be from the Data Dictionary", + "expectation_error_message": "Organization Name should be from the Data Dictionary", }, } ], } + # SHORT_FORM_EXPECTATION = { + # "data_asset_type": None, + # "expectation_suite_name": "short_form_expectation_suite", + # "expectations": [ + # { + # "expectation_type": "expect_column_values_to_be_in_set", + # "kwargs": { + # "column": "short_form", + # "value_set": [], + # "result_format": "SUMMARY", + # }, + # "meta": { + # "expectation_name": "Short Form in set of values", + # "cleaning_pdf_link": "https://wp.me/ad1WQ9-dvg", + # "expectation_error_message": "Short Form should be from the Data Dictionary", + # }, + # } + # ], + # } + FREQUENCY_OF_UPDATE_EXPECTATION = { "data_asset_type": None, "expectation_suite_name": "frequency_of_update_expectation_suite", diff --git a/app/utils/column_mapping.py b/app/utils/column_mapping.py index 098e0bb..cedf571 100644 --- a/app/utils/column_mapping.py +++ b/app/utils/column_mapping.py @@ -170,9 +170,9 @@ async def find_metadata_columns(columns: set): organization_pattern = re.compile( r".*({}).*".format(metadata_settings.ORGANIZATION_KEYWORD) ) - short_form_pattern = re.compile( - r".*({}).*".format(metadata_settings.SHORT_FORM_KEYWORD) - ) + # short_form_pattern = re.compile( + # r".*({}).*".format(metadata_settings.SHORT_FORM_KEYWORD) + # ) description_pattern = re.compile( r".*({}).*".format(metadata_settings.DESCRIPTION_KEYWORD) ) @@ -217,9 +217,9 @@ async def find_metadata_columns(columns: set): organization_column, columns = extract_pattern_from_columns( columns, organization_pattern ) - short_form_column, columns = extract_pattern_from_columns( - columns, short_form_pattern - ) + # short_form_column, columns = extract_pattern_from_columns( + # columns, short_form_pattern + # ) description_column, columns = extract_pattern_from_columns( columns, description_pattern ) @@ -261,7 +261,7 @@ async def find_metadata_columns(columns: set): return { "sector": list(sector_column), "organization": list(organization_column), - "short_form": list(short_form_column), + # "short_form": list(short_form_column), "description": list(description_column), "tags": list(tags_column), "temporal_coverage": list(temporal_coverage_column), diff --git a/app/utils/common.py b/app/utils/common.py index e6c702e..5a4228e 100644 --- a/app/utils/common.py +++ b/app/utils/common.py @@ -112,6 +112,18 @@ async def modify_default_expectation_suite( return expectation_suite +async def modify_column_order_expectation_suite( + expectation_suite: dict, column_order: list +): + modified_expectations = [] + for expectation in expectation_suite["expectations"]: + if expectation["expectation_type"] == "expect_table_columns_to_match_ordered_list": + expectation["kwargs"]["column_list"] = column_order + modified_expectations.append(expectation) + expectation_suite["expectations"] = modified_expectations + return expectation_suite + + async def modify_values_to_be_in_between( changed_config: dict, default_config: str ): @@ -126,6 +138,20 @@ async def modify_values_to_be_in_between( return default_config +async def modify_values_length_to_be_between( + changed_config: dict, default_config: str +): + for expectation in default_config["expectations"]: + if ( + expectation["expectation_type"] + == "expect_column_value_lengths_to_be_between" + ): + expectation["kwargs"].update( + changed_config["expect_column_value_lengths_to_be_between"] + ) + return default_config + + async def modify_values_to_be_in_set( changed_config: dict, default_config: str ): diff --git a/app/utils/metadata.py b/app/utils/metadata.py index c9699be..0b91d67 100644 --- a/app/utils/metadata.py +++ b/app/utils/metadata.py @@ -9,8 +9,10 @@ from app.utils.common import ( modify_values_to_be_in_set, modify_values_to_match_regex_list, + # modify_column_order_expectation_suite, read_dataset, read_pandas_dataset, + modify_values_length_to_be_between, ) from app.utils.general import general_metadata_expectation_suite from app.utils.tags import tags_expectation_suite @@ -20,9 +22,140 @@ meta_data_setting = MetadataSettings() +async def check_column_order(dataset): + results = {} + settings.METADATA_COLUMN_ORDER_STRING.split(",") + column_order_list = settings.METADATA_COLUMN_ORDER_STRING.split(",") + validation = dataset.expect_table_columns_to_match_ordered_list(column_order_list) + results["Expect Table Columns To Match The Given List"] = validation + return jsonable_encoder(results) + + +async def modify_dataset_name_for_factly_expectation_suite( + column_name: str, result_format: str +): + default_expectation_suite = meta_data_setting.DATASET_NAME_FOR_FACTLY_EXPECTATION + changed_config = { + "expect_column_value_lengths_to_be_between": { + "min_value": 5, + "max_value": 200, + "column": column_name, + "result_format": result_format, + } + } + changed_expectation_suite = await modify_values_length_to_be_between( + changed_config, default_expectation_suite + ) + return changed_expectation_suite + + +async def dataset_name_for_factly_expectation_suite(dataset, result_format): + """Expectation to check description in specific range + + Expectation is on whether description lies in the range of 50 to 5000 characters + Flag if its outside the range. + + Args: + dataset (Data-frame): Read metadata csv using Pandas Data-frame + result_format (str): SUMMARY + + Returns: + Dict: Dictionary of Expectations + """ + results = {} + mapped_columns = await find_metadata_columns(set(dataset.columns)) + sector_column = mapped_columns["dataset_name_for_factly"][0] + + expectation_suite = await modify_dataset_name_for_factly_expectation_suite( + sector_column, result_format + ) + # convert pandas dataset to great_expectations dataset + ge_pandas_dataset = ge.from_pandas( + dataset, expectation_suite=expectation_suite + ) + validation = ge_pandas_dataset.validate() + validation_ui_name = ( + validation["results"][0]["expectation_config"]["meta"][ + "expectation_name" + ] + + " - " + + validation["results"][0]["expectation_config"]["_kwargs"]["column"] + ) + results[validation_ui_name] = validation + + return jsonable_encoder(results) + + +async def modify_description_expectation_suite( + column_name: str, result_format: str +): + default_expectation_suite = meta_data_setting.DESCRIPTION_EXPECTATION + changed_config = { + "expect_column_value_lengths_to_be_between": { + "min_value": 50, + "max_value": 5000, + "column": column_name, + "result_format": result_format, + } + } + changed_expectation_suite = await modify_values_length_to_be_between( + changed_config, default_expectation_suite + ) + return changed_expectation_suite + + +async def description_expectation_suite(dataset, result_format): + """Expectation to check description in specific range + + Expectation is on whether description lies in the range of 50 to 5000 characters + Flag if its outside the range. + + Args: + dataset (Data-frame): Read metadata csv using Pandas Data-frame + result_format (str): SUMMARY + + Returns: + Dict: Dictionary of Expectations + """ + results = {} + mapped_columns = await find_metadata_columns(set(dataset.columns)) + sector_column = mapped_columns["description"][0] + + expectation_suite = await modify_description_expectation_suite( + sector_column, result_format + ) + # convert pandas dataset to great_expectations dataset + ge_pandas_dataset = ge.from_pandas( + dataset, expectation_suite=expectation_suite + ) + + validation = ge_pandas_dataset.validate() + validation_ui_name = ( + validation["results"][0]["expectation_config"]["meta"][ + "expectation_name" + ] + + " - " + + validation["results"][0]["expectation_config"]["_kwargs"]["column"] + ) + results[validation_ui_name] = validation + + return jsonable_encoder(results) + + async def modify_sector_expectation_suite( column_name: str, result_format: str ): + """ + Summary: Modify the default sector expectation suite using + sector.csv file in app.core + + Args: + column_name (str): _description_ + result_format (str): _description_ + + Returns: + _type_: _description_ + """ default_expectation_suite = meta_data_setting.SECTOR_EXPECTATION @@ -139,7 +272,7 @@ async def organization_expectation_suite(dataset, result_format): + validation["results"][0]["expectation_config"]["_kwargs"]["column"] ) results[validation_ui_name] = validation - + # print(jsonable_encoder(results)) return jsonable_encoder(results) @@ -166,41 +299,41 @@ async def modify_short_form_expectation_suite( return changed_expectation_suite -async def short_form_expectation_suite(dataset, result_format): - """Expectation to check if Short Form values are in short_form.csv +# async def short_form_expectation_suite(dataset, result_format): +# """Expectation to check if Short Form values are in short_form.csv - Expectation is on whether every value present in short form column of metadata - csv is in short_form.csv file or not +# Expectation is on whether every value present in short form column of metadata +# csv is in short_form.csv file or not - Args: - dataset (Dataframe): Read metadata csv using Pandas Dataframe - result_format (str): SUMMARY +# Args: +# dataset (Dataframe): Read metadata csv using Pandas Dataframe +# result_format (str): SUMMARY - Returns: - Dict: Dictionary of Expectations - """ - results = {} - mapped_columns = await find_metadata_columns(set(dataset.columns)) - short_form_column = mapped_columns["short_form"][0] +# Returns: +# Dict: Dictionary of Expectations +# """ +# results = {} +# mapped_columns = await find_metadata_columns(set(dataset.columns)) +# short_form_column = mapped_columns["short_form"][0] - expectation_suite = await modify_short_form_expectation_suite( - short_form_column, result_format - ) - # convert pandas dataset to great_expectations dataset - ge_pandas_dataset = ge.from_pandas( - dataset, expectation_suite=expectation_suite - ) - validation = ge_pandas_dataset.validate() - validation_ui_name = ( - validation["results"][0]["expectation_config"]["meta"][ - "expectation_name" - ] - + " - " - + validation["results"][0]["expectation_config"]["_kwargs"]["column"] - ) - results[validation_ui_name] = validation +# expectation_suite = await modify_short_form_expectation_suite( +# short_form_column, result_format +# ) +# # convert pandas dataset to great_expectations dataset +# ge_pandas_dataset = ge.from_pandas( +# dataset, expectation_suite=expectation_suite +# ) +# validation = ge_pandas_dataset.validate() +# validation_ui_name = ( +# validation["results"][0]["expectation_config"]["meta"][ +# "expectation_name" +# ] +# + " - " +# + validation["results"][0]["expectation_config"]["_kwargs"]["column"] +# ) +# results[validation_ui_name] = validation - return jsonable_encoder(results) +# return jsonable_encoder(results) async def modify_frequency_of_update_expectation_suite( @@ -419,48 +552,6 @@ async def time_saved_in_hours_expectation_suite(dataset, result_format): return response -async def description_expectation_suite(dataset, result_format): - """Expectation to check description in specific range - - Expectation is on whether description lies in the range of 50 to 5000 characters - Flag if its outside the range. - - Args: - dataset (Dataframe): Read metadata csv using Pandas Dataframe - result_format (str): SUMMARY - - Returns: - Dict: Dictionary of Expectations - """ - mapped_columns = await find_metadata_columns(set(dataset.columns)) - description_column = mapped_columns["description"][0] - expectation_name = meta_data_setting.DESCRIPTION_KEYWORD.format( - column=description_column - ) - - ge_pandas_dataset = ge.from_pandas(dataset) - - expectation = ge_pandas_dataset.expect_column_values_to_be_between( - column=description_column, - min_value=50, - max_value=5000, - catch_exceptions=True, - result_format=result_format, - ) - - expectation_dict = expectation.to_json_dict() - expectation_dict["expectation_config"]["meta"] = { - "cleaning_pdf_link": settings.DATA_CLEANING_GUIDE_LINK, - "expectation_name": expectation_name, - } - response = { - expectation_dict["expectation_config"]["meta"][ - "expectation_name" - ]: expectation_dict - } - return response - - async def metadata_expectation_suite( dataset, result_format, dataset_name: str ): @@ -477,7 +568,7 @@ async def metadata_expectation_suite( """ if isinstance(dataset, str): dataset = await read_dataset(dataset) - + # print(dir(dataset)) # Dataset modification for sector expectation suite dataset_sector = dataset.copy() # explode the dataset based on sector column @@ -488,10 +579,12 @@ async def metadata_expectation_suite( dataset_sector["sectors"] = dataset_sector["sectors"].str.strip() expectations = await asyncio.gather( + check_column_order(dataset), sector_expectation_suite(dataset_sector, result_format), organization_expectation_suite(dataset, result_format), - short_form_expectation_suite(dataset, result_format), - # description_expectation_suite(dataset, result_format), + # short_form_expectation_suite(dataset, result_format), + description_expectation_suite(dataset, result_format), + dataset_name_for_factly_expectation_suite(dataset, result_format), unit_expectation_suite(dataset, result_format), tags_expectation_suite(dataset, result_format), frequency_of_update_expectation_suite(dataset, result_format),