Skip to content

Commit

Permalink
Fix: fixed sector issue
Browse files Browse the repository at this point in the history
  • Loading branch information
HemanthM005 committed Jun 5, 2023
1 parent 811ed1a commit 2ea53ef
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 12 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ repos:
- id: black
language_version: python3
- repo: https://github.com/pycqa/flake8
rev: 4.0.1
rev: 3.9.0
hooks:
- id: flake8
- repo: https://github.com/timothycrosley/isort
rev: 5.9.3
rev: 5.12.0
hooks:
- id: isort
15 changes: 11 additions & 4 deletions app/api/api_v1/routers/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from app.models.enums import ExpectationResultType
from app.models.metadata_gsheet import MetadataGsheetRequest
from app.utils.common import read_dataset
from app.utils.gsheets import get_records_from_gsheets
from app.utils.metadata import metadata_expectation_suite

Expand All @@ -20,16 +21,22 @@ async def execute_metadata_expectation_from_file(
ExpectationResultType.SUMMARY,
description="Level of Details for a Expectation result",
),
datasets: UploadFile = File(...),
file: UploadFile = File(...),
):

# read the dataset from uploaded CSV file
logger.info(f"dataset: {datasets.filename}")
df = pd.read_csv(datasets.file)
logger.info(f"dataset: {file.filename}")
dataset = await read_dataset(file, is_file=True)
# df = pd.read_csv(datasets.file)

# # metadata expectation
# expectation = await metadata_expectation_suite(
# df, result_type, dataset_name=datasets.filename
# )

# metadata expectation
expectation = await metadata_expectation_suite(
df, result_type, dataset_name=datasets.filename
dataset, result_type, dataset_name=file.filename
)

return expectation
Expand Down
1 change: 0 additions & 1 deletion app/core/sector.csv
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,3 @@ Youth and Sports
Banking
Trade
Water Resources
Youth and Sports
20 changes: 18 additions & 2 deletions app/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ def get_encoding(obj):


async def read_dataset(
source: str, s3_client=None, bucket_name: Union[str, None] = None, **kwargs
source: str,
s3_client=None,
bucket_name: Union[str, None] = None,
is_file: bool = False,
**kwargs,
) -> ge.dataset.pandas_dataset.PandasDataset:
if s3_client:
# dataset should be downloaded from s3 storage
Expand All @@ -42,7 +46,19 @@ async def read_dataset(
finally:
response.close()
response.release_conn()

elif is_file:
try:
file = source.file.read()
dataset = ge.read_csv(BytesIO(file))
logger.info(f"Dataset read from : {source.filename}")
except UnicodeDecodeError:
encoding = get_encoding(obj=file)
dataset = ge.read_csv(BytesIO(file), encoding=encoding)
logger.info(
f"Dataset read from : {source.filename} with non-utf8 encoding"
)
except Exception as e:
logger.info(f"Error reading Dataset from : {source.filename}: {e}")
else:
session = kwargs.pop("session")
try:
Expand Down
16 changes: 14 additions & 2 deletions app/utils/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ async def sector_expectation_suite(dataset, result_format):
expectation_suite = await modify_sector_expectation_suite(
sector_column, result_format
)

# convert pandas dataset to great_expectations dataset
ge_pandas_dataset = ge.from_pandas(
dataset, expectation_suite=expectation_suite
)
validation = ge_pandas_dataset.validate()

validation = ge_pandas_dataset.validate()
validation_ui_name = (
validation["results"][0]["expectation_config"]["meta"][
"expectation_name"
Expand Down Expand Up @@ -123,11 +124,13 @@ async def organization_expectation_suite(dataset, result_format):
expectation_suite = await modify_organization_expectation_suite(
organization_column, result_format
)

# convert pandas dataset to great_expectations dataset
ge_pandas_dataset = ge.from_pandas(
dataset, expectation_suite=expectation_suite
)
validation = ge_pandas_dataset.validate()

validation_ui_name = (
validation["results"][0]["expectation_config"]["meta"][
"expectation_name"
Expand Down Expand Up @@ -433,8 +436,17 @@ async def metadata_expectation_suite(
if isinstance(dataset, str):
dataset = await read_dataset(dataset)

# Dataset modification for sector expectation suite
dataset_sector = dataset.copy()
# explode the dataset based on sector column
dataset_sector["sectors"] = dataset_sector["sectors"].apply(
lambda x: x.split(",")
)
dataset_sector = dataset_sector.explode("sectors").reset_index(drop=True)
dataset_sector["sectors"] = dataset_sector["sectors"].str.strip()

expectations = await asyncio.gather(
sector_expectation_suite(dataset, result_format),
sector_expectation_suite(dataset_sector, result_format),
organization_expectation_suite(dataset, result_format),
short_form_expectation_suite(dataset, result_format),
unit_expectation_suite(dataset, result_format),
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ line-length = 79
include = '\.pyi?$'
exclude = '''
/(
.eggs
.eggs
| .git
| .venv
| .cache
Expand Down

0 comments on commit 2ea53ef

Please sign in to comment.