From 2ea53ef7876409afbe0e900fa8b0123195925231 Mon Sep 17 00:00:00 2001 From: Hemanth Manikala Date: Mon, 5 Jun 2023 23:22:28 +0530 Subject: [PATCH 1/2] Fix: fixed sector issue --- .pre-commit-config.yaml | 4 ++-- app/api/api_v1/routers/metadata.py | 15 +++++++++++---- app/core/sector.csv | 1 - app/utils/common.py | 20 ++++++++++++++++++-- app/utils/metadata.py | 16 ++++++++++++++-- pyproject.toml | 2 +- 6 files changed, 46 insertions(+), 12 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index aebb248..f9d834e 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -5,10 +5,10 @@ repos: - id: black language_version: python3 - repo: https://github.com/pycqa/flake8 - rev: 4.0.1 + rev: 3.9.0 hooks: - id: flake8 - repo: https://github.com/timothycrosley/isort - rev: 5.9.3 + rev: 5.12.0 hooks: - id: isort \ No newline at end of file diff --git a/app/api/api_v1/routers/metadata.py b/app/api/api_v1/routers/metadata.py index 88582d5..c04c3a8 100644 --- a/app/api/api_v1/routers/metadata.py +++ b/app/api/api_v1/routers/metadata.py @@ -6,6 +6,7 @@ from app.models.enums import ExpectationResultType from app.models.metadata_gsheet import MetadataGsheetRequest +from app.utils.common import read_dataset from app.utils.gsheets import get_records_from_gsheets from app.utils.metadata import metadata_expectation_suite @@ -20,16 +21,22 @@ async def execute_metadata_expectation_from_file( ExpectationResultType.SUMMARY, description="Level of Details for a Expectation result", ), - datasets: UploadFile = File(...), + file: UploadFile = File(...), ): # read the dataset from uploaded CSV file - logger.info(f"dataset: {datasets.filename}") - df = pd.read_csv(datasets.file) + logger.info(f"dataset: {file.filename}") + dataset = await read_dataset(file, is_file=True) + # df = pd.read_csv(datasets.file) + + # # metadata expectation + # expectation = await metadata_expectation_suite( + # df, result_type, dataset_name=datasets.filename + # ) # metadata expectation expectation = await metadata_expectation_suite( - df, result_type, dataset_name=datasets.filename + dataset, result_type, dataset_name=file.filename ) return expectation diff --git a/app/core/sector.csv b/app/core/sector.csv index bcfb9f6..d3ab573 100644 --- a/app/core/sector.csv +++ b/app/core/sector.csv @@ -50,4 +50,3 @@ Youth and Sports Banking Trade Water Resources -Youth and Sports diff --git a/app/utils/common.py b/app/utils/common.py index b4b25d3..03b9922 100644 --- a/app/utils/common.py +++ b/app/utils/common.py @@ -25,7 +25,11 @@ def get_encoding(obj): async def read_dataset( - source: str, s3_client=None, bucket_name: Union[str, None] = None, **kwargs + source: str, + s3_client=None, + bucket_name: Union[str, None] = None, + is_file: bool = False, + **kwargs, ) -> ge.dataset.pandas_dataset.PandasDataset: if s3_client: # dataset should be downloaded from s3 storage @@ -42,7 +46,19 @@ async def read_dataset( finally: response.close() response.release_conn() - + elif is_file: + try: + file = source.file.read() + dataset = ge.read_csv(BytesIO(file)) + logger.info(f"Dataset read from : {source.filename}") + except UnicodeDecodeError: + encoding = get_encoding(obj=file) + dataset = ge.read_csv(BytesIO(file), encoding=encoding) + logger.info( + f"Dataset read from : {source.filename} with non-utf8 encoding" + ) + except Exception as e: + logger.info(f"Error reading Dataset from : {source.filename}: {e}") else: session = kwargs.pop("session") try: diff --git a/app/utils/metadata.py b/app/utils/metadata.py index a372bb8..ba3cac2 100644 --- a/app/utils/metadata.py +++ b/app/utils/metadata.py @@ -62,12 +62,13 @@ async def sector_expectation_suite(dataset, result_format): expectation_suite = await modify_sector_expectation_suite( sector_column, result_format ) + # convert pandas dataset to great_expectations dataset ge_pandas_dataset = ge.from_pandas( dataset, expectation_suite=expectation_suite ) - validation = ge_pandas_dataset.validate() + validation = ge_pandas_dataset.validate() validation_ui_name = ( validation["results"][0]["expectation_config"]["meta"][ "expectation_name" @@ -123,11 +124,13 @@ async def organization_expectation_suite(dataset, result_format): expectation_suite = await modify_organization_expectation_suite( organization_column, result_format ) + # convert pandas dataset to great_expectations dataset ge_pandas_dataset = ge.from_pandas( dataset, expectation_suite=expectation_suite ) validation = ge_pandas_dataset.validate() + validation_ui_name = ( validation["results"][0]["expectation_config"]["meta"][ "expectation_name" @@ -433,8 +436,17 @@ async def metadata_expectation_suite( if isinstance(dataset, str): dataset = await read_dataset(dataset) + # Dataset modification for sector expectation suite + dataset_sector = dataset.copy() + # explode the dataset based on sector column + dataset_sector["sectors"] = dataset_sector["sectors"].apply( + lambda x: x.split(",") + ) + dataset_sector = dataset_sector.explode("sectors").reset_index(drop=True) + dataset_sector["sectors"] = dataset_sector["sectors"].str.strip() + expectations = await asyncio.gather( - sector_expectation_suite(dataset, result_format), + sector_expectation_suite(dataset_sector, result_format), organization_expectation_suite(dataset, result_format), short_form_expectation_suite(dataset, result_format), unit_expectation_suite(dataset, result_format), diff --git a/pyproject.toml b/pyproject.toml index 65a6727..6e92f19 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ line-length = 79 include = '\.pyi?$' exclude = ''' /( - .eggs + .eggs | .git | .venv | .cache From 3ff518ab097b06c9fff7c57de375446ed6893c46 Mon Sep 17 00:00:00 2001 From: HemanthM005 Date: Tue, 19 Mar 2024 13:36:15 +0530 Subject: [PATCH 2/2] Feat: Added expectations for validly and metafacts --- .pre-commit-config.yaml | 6 +- app/core/config.py | 45 ++++++- app/expectations/custom_expectations.py | 56 +++++++++ app/utils/common.py | 14 +++ app/utils/general.py | 155 +++++++++++++++++++++++- app/utils/metadata.py | 43 +++++++ 6 files changed, 311 insertions(+), 8 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f9d834e..9645fdd 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,14 +1,14 @@ repos: - repo: https://github.com/psf/black - rev: 22.3.0 + rev: 24.3.0 hooks: - id: black language_version: python3 - repo: https://github.com/pycqa/flake8 - rev: 3.9.0 + rev: 7.0.0 hooks: - id: flake8 - repo: https://github.com/timothycrosley/isort - rev: 5.12.0 + rev: 5.13.2 hooks: - id: isort \ No newline at end of file diff --git a/app/core/config.py b/app/core/config.py index 653ec0b..4c1b628 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -16,9 +16,11 @@ class Settings(BaseSettings): MODE: str = "development" DOCS_URL: str = "/api/docs" EXAMPLE_FOLDER: str = "/Users/somitragupta/factly/news-room-datasets" - EXAMPLE_URL: str = "/Users/somitragupta/factly/factly-datasets/projects/rbi/\ + EXAMPLE_URL: str = ( + "/Users/somitragupta/factly/factly-datasets/projects/rbi/\ data/processed/1_timeseries/5_handbook-of-statistics-on-the-indian-economy/\ hbs-mb-scb-select-aggregates-weekly/output.csv" + ) EXAMPLE_URL_COUNTRY: str = """https://storage.factly.org/mande/\ edu-ministry/data/processed/statistics/1_AISHE_report/19_enrolment_foreign/output.csv""" EXAMPLE_URL_STATE: str = """https://storage.factly.org/mande/edu-ministry/data/\ @@ -298,7 +300,7 @@ class NoteSettings(BaseSettings): { "expectation_type": "expect_column_values_to_match_regex_list", "kwargs": { - "column": "unit", + "column": "note", "regex_list": [",?.+?:[^,]+[,]?"], "result_format": "SUMMARY", }, @@ -314,7 +316,33 @@ class NoteSettings(BaseSettings): class CustomExpectationsSettings(BaseSettings): + NULL_DATETIME_VALUE_NAME: str = "Null date values Flag - {column}" + NULL_DATETIME_VALUE_MSG: str = ( + "Null values should not be permitted for datetime values" + ) + NUMERIC_COLUMNS_TYPES = ["float64", "int64"] + NUMERIC_VALUES_PATTERN = re.compile(r"^-?\d+(\.\d{1,2})?$") + NUMERIC_EXPECTATION_NAME: str = ( + "Numeric values in specific pattern - {column}" + ) + NUMERIC_EXPECTATION_ERR_MSG: str = ( + "Numeric values should be in proper format both integer and float(roundoff to two decimal places)" + ) + + NEGATIVE_NUMERIC_VALUES_PATTERN = re.compile(r"^-\d+(\.\d{1,})?$") + NEGATIVE_NUMERIC_EXPECTATION_NAME: str = ( + "Negative Numeric values Flag - {column}" + ) + NEGATIVE_NUMERIC_EXPECTATION_ERR_MSG: str = ( + "Flag Numeric values that are negative" + ) + + COLUMN_NAMES_PATTERN = re.compile(r"^[a-z]+(?:_[a-z]+)*$") + COLUMN_NAMES_EXPECTATION_NAME: str = "Column names in specific pattern" + COLUMN_NAMES_EXPECTATION_ERR_MSG: str = ( + "Column names should be in lower case and separated by underscore - {column}" + ) TRAIL_OR_LEAD_WHITESPACE_PATTERN = re.compile(r"^\s+.*|.*\s+$") LEADING_TRAILING_WHITE_SPACE_EXPECTATION_NAME: str = ( @@ -334,7 +362,9 @@ class CustomExpectationsSettings(BaseSettings): SPECIAL_CHARACTER_EXPECTATION_NAME: str = ( "No special characters in Columns" ) - SPECIAL_CHARACTER_EXPECTATION_ERR_MSG: str = "There should be no special character in the category name and measured value, like Telangana** , and any additional information should be captured in notes instead of using a special character" + SPECIAL_CHARACTER_EXPECTATION_ERR_MSG: str = ( + "There should be no special character in the category name and measured value, like Telangana** , and any additional information should be captured in notes instead of using a special character" + ) BRACKET_PATTERN = re.compile(r".*([\[\(].+?[\)\]]).*") BRACKETS_EXPECTATION_NAME: str = "No unnecessary brackets in Categories" @@ -358,7 +388,9 @@ class CustomExpectationsSettings(BaseSettings): MINIMUM_DATASET_OBSERVATION_THRESH: int = 10 OBSERVATIONS_MORE_THAN_THRESH_NAME: str = "Minimum required observation" - OBSERVATIONS_MORE_THAN_THRESH_MSG: str = "Generally the datasets must be more a threshold number of observation ({thresh})" + OBSERVATIONS_MORE_THAN_THRESH_MSG: str = ( + "Generally the datasets must be more a threshold number of observation ({thresh})" + ) class MetadataSettings(BaseSettings): @@ -504,6 +536,11 @@ class MetadataSettings(BaseSettings): ], } + DESCRIPTION_NAME: str = "Description" + DESCRIPTION_ERROR_MSG: str = ( + "Description should be in the range of 50 to 5000" + ) + TIME_SAVED_IN_HOURS_NAME: str = "Null values in columns - {column}" TIME_SAVED_IN_HOURS_MSG: str = ( "Null values should not present in these columns" diff --git a/app/expectations/custom_expectations.py b/app/expectations/custom_expectations.py index a748573..65b163a 100644 --- a/app/expectations/custom_expectations.py +++ b/app/expectations/custom_expectations.py @@ -1,6 +1,8 @@ +import logging from datetime import date import numpy as np +import pandas as pd from great_expectations.dataset import MetaPandasDataset, PandasDataset from app.core.config import CustomExpectationsSettings @@ -8,6 +10,7 @@ custom_expectation_settings = CustomExpectationsSettings() CURRENT_YEAR = str(date.today().year) +logging.basicConfig(level=logging.INFO) class GenericCustomExpectations(PandasDataset): @@ -78,3 +81,56 @@ def expect_multicolumn_dataset_to_have_more_than_x_rows(self, column_list): ), length, ) + + @MetaPandasDataset.multicolumn_map_expectation + def expect_numerical_values_to_be_in_specific_pattern( + self, + column_list, + pattern=custom_expectation_settings.NUMERIC_VALUES_PATTERN, + meta={ + "expectation_name": "Numeric values in specific pattern", + }, + include_meta=True, + ): + bool_list = column_list.applymap( + lambda x: True if pattern.match(str(x)) else False + ) + return bool_list[bool_list.columns[0]] + + @MetaPandasDataset.multicolumn_map_expectation + def flag_negative_numerical_values( + self, + column_list, + pattern=custom_expectation_settings.NEGATIVE_NUMERIC_VALUES_PATTERN, + meta={ + "expectation_name": "Negative Numeric values Flag", + }, + include_meta=True, + ): + bool_list = column_list.applymap( + lambda x: False if pattern.match(str(x)) else True + ) + return bool_list[bool_list.columns[0]] + + @MetaPandasDataset.multicolumn_map_expectation + def expect_column_names_to_be_in_specific_pattern( + self, + column_list, + pattern=custom_expectation_settings.COLUMN_NAMES_PATTERN, + meta={ + "expectation_name": "Values in specific pattern", + }, + include_meta=True, + find_columns=False, + ): + boolean_list = pd.Series(column_list.columns).apply( + lambda x: True if pattern.match(str(x)) else False + ) + # improper_column_list = [ + # column + # for column, boolean in zip(column_list.columns, boolean_list) + # if not boolean + # ] + # logging.info(boolean_list.all()) + + return boolean_list.all() diff --git a/app/utils/common.py b/app/utils/common.py index 03b9922..e6c702e 100644 --- a/app/utils/common.py +++ b/app/utils/common.py @@ -112,6 +112,20 @@ async def modify_default_expectation_suite( return expectation_suite +async def modify_values_to_be_in_between( + changed_config: dict, default_config: str +): + for expectation in default_config["expectations"]: + if ( + expectation["expectation_type"] + == "expect_column_values_to_be_between" + ): + expectation["kwargs"].update( + changed_config["expect_column_values_to_be_between"] + ) + return default_config + + async def modify_values_to_be_in_set( changed_config: dict, default_config: str ): diff --git a/app/utils/general.py b/app/utils/general.py index 0e418d6..9918c3d 100644 --- a/app/utils/general.py +++ b/app/utils/general.py @@ -10,7 +10,10 @@ ) from app.expectations.custom_expectations import GenericCustomExpectations from app.models.enums import ExpectationResultType -from app.utils.column_mapping import find_metadata_columns +from app.utils.column_mapping import ( + find_datetime_columns, + find_metadata_columns, +) settings = Settings() custom_settings = CustomExpectationsSettings() @@ -191,6 +194,106 @@ async def null_not_in_columns(dataset, result_format, column, column_type): return response +async def null_not_in_datetime_columns(dataset, result_format, column): + expectation_name = custom_settings.NULL_DATETIME_VALUE_NAME.format( + column=column + ) + expectation_error_message = custom_settings.NULL_DATETIME_VALUE_MSG + + ge_pandas_dataset = ge.from_pandas( + dataset, dataset_class=GenericCustomExpectations + ) + expectation = ge_pandas_dataset.expect_column_values_to_not_be_null( + column=column, + catch_exceptions=True, + result_format=result_format, + ) + expectation_dict = expectation.to_json_dict() + expectation_dict["expectation_config"]["meta"] = { + "cleaning_pdf_link": settings.DATA_CLEANING_GUIDE_LINK, + "expectation_name": expectation_name, + "expectation_error_message": expectation_error_message, + } + if "unexpected_index_list" in expectation_dict["result"]: + expectation_dict["result"]["unexpected_list"] = ( + dataset.iloc[ + expectation_dict["result"]["unexpected_index_list"], : + ] + .fillna("") + .to_dict(orient="records") + ) + # partial unexpected index list is not made with this expectation but is required by the studio + if ( + "partial_unexpected_index_list" not in expectation_dict["result"] + and result_format == ExpectationResultType.COMPLETE + ): + expectation_dict["result"][ + "partial_unexpected_index_list" + ] = expectation_dict["result"]["unexpected_index_list"][ + : max(20, len(expectation_dict["result"]["unexpected_index_list"])) + ] + response = { + expectation_dict["expectation_config"]["meta"][ + "expectation_name" + ]: expectation_dict + } + return response + + +async def numeric_values_expectation_suite( + dataset, result_format, numeric_column +): + ge_pandas_dataset = ge.from_pandas( + dataset, dataset_class=GenericCustomExpectations + ) + expectation = ( + ge_pandas_dataset.expect_numerical_values_to_be_in_specific_pattern( + column_list=[numeric_column], + result_format=result_format, + ) + ) + expectation_dict = expectation.to_json_dict() + expectation_dict["expectation_config"]["meta"] = { + "cleaning_pdf_link": settings.DATA_CLEANING_GUIDE_LINK, + "expectation_name": custom_settings.NULL_DATETIME_VALUE_NAME.format( + column=numeric_column + ), + "expectation_error_message": custom_settings.NULL_DATETIME_VALUE_MSG, + } + response = { + expectation_dict["expectation_config"]["meta"][ + "expectation_name" + ]: expectation_dict + } + return response + + +async def negative_numeric_values_expectation_suite( + dataset, result_format, numeric_column +): + ge_pandas_dataset = ge.from_pandas( + dataset, dataset_class=GenericCustomExpectations + ) + expectation = ge_pandas_dataset.flag_negative_numerical_values( + column_list=[numeric_column], + result_format=result_format, + ) + expectation_dict = expectation.to_json_dict() + expectation_dict["expectation_config"]["meta"] = { + "cleaning_pdf_link": settings.DATA_CLEANING_GUIDE_LINK, + "expectation_name": custom_settings.NEGATIVE_NUMERIC_EXPECTATION_NAME.format( + column=numeric_column + ), + "expectation_error_message": custom_settings.NEGATIVE_NUMERIC_EXPECTATION_ERR_MSG, + } + response = { + expectation_dict["expectation_config"]["meta"][ + "expectation_name" + ]: expectation_dict + } + return response + + async def observation_more_than_thresh_expectation_suite( dataset, result_format ): @@ -219,6 +322,38 @@ async def observation_more_than_thresh_expectation_suite( return response +async def column_names_expectation_suite(dataset, result_format): + ge_pandas_dataset = ge.from_pandas( + dataset, dataset_class=GenericCustomExpectations + ) + expectation = ( + ge_pandas_dataset.expect_column_names_to_be_in_specific_pattern( + column_list=dataset.columns.tolist(), + result_format=result_format, + ) + ) + expectation_dict = expectation.to_json_dict() + expectation_dict["expectation_config"]["meta"] = { + "cleaning_pdf_link": settings.DATA_CLEANING_GUIDE_LINK, + "expectation_name": custom_settings.COLUMN_NAMES_EXPECTATION_NAME, + "expectation_error_message": custom_settings.COLUMN_NAMES_EXPECTATION_ERR_MSG.format( + column=dataset.columns.tolist() + ), + } + response = { + expectation_dict["expectation_config"]["meta"][ + "expectation_name" + ]: expectation_dict + } + response[custom_settings.COLUMN_NAMES_EXPECTATION_NAME]["result"][ + "partial_unexpected_index_list" + ] = [] + response[custom_settings.COLUMN_NAMES_EXPECTATION_NAME]["result"][ + "partial_unexpected_list" + ] = [] + return response + + async def general_table_expectation_suite(dataset, result_format): """Chaining all general expectaion suites for Datasets @@ -241,6 +376,7 @@ async def general_table_expectation_suite(dataset, result_format): for numeric_column in numeric_columns if numeric_column not in custom_settings.UNIT_NOTE_COLUMNS ] + datetime_columns = await find_datetime_columns(set(dataset.columns)) expectations = await asyncio.gather( duplicates_expectation_suite(dataset, result_format), @@ -253,6 +389,23 @@ async def general_table_expectation_suite(dataset, result_format): null_not_in_columns(dataset, result_format, col, "numeric") for col in numeric_columns ], + *[ + null_not_in_datetime_columns(dataset, result_format, col) + for _, col_set in datetime_columns.items() + if len(col_set) > 0 + for col in col_set + ], + *[ + numeric_values_expectation_suite(dataset, result_format, col) + for col in numeric_columns + ], + *[ + negative_numeric_values_expectation_suite( + dataset, result_format, col + ) + for col in numeric_columns + ], + column_names_expectation_suite(dataset, result_format), observation_more_than_thresh_expectation_suite(dataset, result_format), ) expectations = ChainMap(*expectations) diff --git a/app/utils/metadata.py b/app/utils/metadata.py index ba3cac2..c9699be 100644 --- a/app/utils/metadata.py +++ b/app/utils/metadata.py @@ -419,6 +419,48 @@ async def time_saved_in_hours_expectation_suite(dataset, result_format): return response +async def description_expectation_suite(dataset, result_format): + """Expectation to check description in specific range + + Expectation is on whether description lies in the range of 50 to 5000 characters + Flag if its outside the range. + + Args: + dataset (Dataframe): Read metadata csv using Pandas Dataframe + result_format (str): SUMMARY + + Returns: + Dict: Dictionary of Expectations + """ + mapped_columns = await find_metadata_columns(set(dataset.columns)) + description_column = mapped_columns["description"][0] + expectation_name = meta_data_setting.DESCRIPTION_KEYWORD.format( + column=description_column + ) + + ge_pandas_dataset = ge.from_pandas(dataset) + + expectation = ge_pandas_dataset.expect_column_values_to_be_between( + column=description_column, + min_value=50, + max_value=5000, + catch_exceptions=True, + result_format=result_format, + ) + + expectation_dict = expectation.to_json_dict() + expectation_dict["expectation_config"]["meta"] = { + "cleaning_pdf_link": settings.DATA_CLEANING_GUIDE_LINK, + "expectation_name": expectation_name, + } + response = { + expectation_dict["expectation_config"]["meta"][ + "expectation_name" + ]: expectation_dict + } + return response + + async def metadata_expectation_suite( dataset, result_format, dataset_name: str ): @@ -449,6 +491,7 @@ async def metadata_expectation_suite( sector_expectation_suite(dataset_sector, result_format), organization_expectation_suite(dataset, result_format), short_form_expectation_suite(dataset, result_format), + # description_expectation_suite(dataset, result_format), unit_expectation_suite(dataset, result_format), tags_expectation_suite(dataset, result_format), frequency_of_update_expectation_suite(dataset, result_format),