Skip to content

Commit

Permalink
Merge branch 'main' into fix/sheet_issues
Browse files Browse the repository at this point in the history
  • Loading branch information
paul-tharun authored Mar 19, 2024
2 parents 3ff518a + 06ef96a commit 2a121b0
Show file tree
Hide file tree
Showing 17 changed files with 1,859 additions and 162 deletions.
Binary file modified .DS_Store
Binary file not shown.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Suppose , if there is no data-quality check after, data-cleaning then there woul
So, ideally the dataset should look like :

|sector|organization|short_form|....|time_saved_in_hours|price|
|-|-|-|....|-|-|
|-|-|-|-|-|-|
|Education|All India Survey on Higher Education|AISHE|....|4|1996|

If there would be a tool where a user can upload its `metadata/csv` file and can figure out its potential problems then metadata sheet can be revisited and updated properly.
Expand Down
Binary file modified app/.DS_Store
Binary file not shown.
15 changes: 0 additions & 15 deletions app/api/api_v1/routers/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,6 @@ async def execute_dataset_expectation(request: Request):

@router.post(
"/expectation/datasets/",
# response_model=Dict[
# str,
# Dict[
# str,
# Union[
# List[GeneralTableExpectation],
# RegexPatternExpectation,
# RegexMatchList,
# ColumnValuesToBeInSet,
# DateStrftimePattern,
# ],
# ],
# ],
# response_model_exclude_none=True,
# response_model_exclude_unset=True,
summary="Execute all possible expectation to a dataset",
)
async def execute_dataset_expectation_post(
Expand Down
160 changes: 49 additions & 111 deletions app/api/api_v1/routers/s3_checks.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,48 @@
from typing import List, Union

from fastapi import APIRouter, Form, HTTPException, status
from fastapi.logger import logger
from fastapi import APIRouter, Body, HTTPException, status

from app.core.config import Settings
from app.models.s3_checks import s3FileCheck, s3FileCheckResponse
from app.utils.s3_checks import get_s3_resource
from app.models.s3_checks import s3FileKeyCheckRequest, s3FileKeysCheckRequest
from app.utils.s3_checks import (
check_file_metadata,
check_files_metadata,
get_s3_resource,
)

s3_router = router = APIRouter()
settings = Settings()


@router.post("/files/key/", response_model=s3FileCheck)
@router.post(
"/files/key/",
)
async def check_if_file_exist_in_bucket(
file_key: str = Form(...),
bucket: str = Form(...),
s3_access_key: Union[str, None] = Form(
None,
description="S3 access key. If None then take the default one from env variables",
),
s3_secret_key: Union[str, None] = Form(
None,
description="S3 secret key. If None then take the default one from env variables",
),
s3_endpoint_url: Union[str, None] = Form(
None,
description="S3 endpoint url key. If None then take the default one from env variables",
),
resource: Union[str, None] = Form(
None,
description="S3 resource. If None then take the default one from env variables",
),
request: s3FileKeyCheckRequest = Body(
None, examples=s3FileKeyCheckRequest.Config.schema_extra["examples"]
)
):
"""
Check if file exist in bucket
"""
s3_access_key = (
settings.S3_SOURCE_ACCESS_KEY
if s3_access_key is None
else s3_access_key
if request.s3_access_key is None
else request.s3_access_key
)
s3_secret_key = (
settings.S3_SOURCE_SECRET_KEY
if s3_secret_key is None
else s3_secret_key
if request.s3_secret_key is None
else request.s3_secret_key
)
s3_endpoint_url = (
settings.S3_SOURCE_ENDPOINT_URL
if s3_endpoint_url is None
else s3_endpoint_url
if request.s3_endpoint_url is None
else request.s3_endpoint_url
)
resource = (
settings.S3_SOURCE_RESOURCE
if request.resource is None
else request.resource
)
resource = settings.S3_SOURCE_RESOURCE if resource is None else resource
try:
s3_resource = get_s3_resource(
s3_access_key=s3_access_key,
Expand All @@ -64,62 +56,40 @@ async def check_if_file_exist_in_bucket(
detail=f"Error connecting to S3: {e}",
)
else:
is_exist = False
bucket = s3_resource.Bucket(bucket)
if bucket in s3_resource.buckets.all():
objs = list(bucket.objects.filter(Prefix=file_key))
logger.info(f"Checking if file exist in bucket: {file_key}")
if len(objs) == 1 and objs[0].key == file_key:
is_exist = True
return {
"file_key": file_key,
"is_exists": is_exist,
}
else:
logger.info(f"Bucket {bucket} does not exist")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Bucket does not exist",
)
file_metadata = await check_file_metadata(
s3_resource, request.file_key
)
return file_metadata


@router.post("/files", response_model=s3FileCheckResponse)
@router.post(
"/files",
)
async def check_if_files_exist_in_bucket(
file_keys: List[str] = Form(...),
bucket: str = Form(...),
s3_access_key: Union[str, None] = Form(
None,
description="S3 access key. If None then take the default one from env variables",
),
s3_secret_key: Union[str, None] = Form(
None,
description="S3 secret key. If None then take the default one from env variables",
),
s3_endpoint_url: Union[str, None] = Form(
None,
description="S3 endpoint url . If None then take the default one from env variables",
),
resource: Union[str, None] = Form(
None,
description="S3 resource. If None then take the default one from env variables",
),
request: s3FileKeysCheckRequest = Body(
None, examples=s3FileKeysCheckRequest.Config.schema_extra["examples"]
)
):
s3_access_key = (
settings.S3_SOURCE_ACCESS_KEY
if s3_access_key is None
else s3_access_key
if request.s3_access_key is None
else request.s3_access_key
)
s3_secret_key = (
settings.S3_SOURCE_SECRET_KEY
if s3_secret_key is None
else s3_secret_key
if request.s3_secret_key is None
else request.s3_secret_key
)
s3_endpoint_url = (
settings.S3_SOURCE_ENDPOINT_URL
if s3_endpoint_url is None
else s3_endpoint_url
if request.s3_endpoint_url is None
else request.s3_endpoint_url
)
resource = (
settings.S3_SOURCE_RESOURCE
if request.resource is None
else request.resource
)
resource = settings.S3_SOURCE_RESOURCE if resource is None else resource
try:
s3_resource = get_s3_resource(
s3_access_key=s3_access_key,
Expand All @@ -134,40 +104,8 @@ async def check_if_files_exist_in_bucket(
)
else:
# TODO : Check how Form is combining all the strings inside list
if len(file_keys) == 1:
file_keys = [key for key in file_keys[0].split(",")]
file_keys_set = set(file_keys)

bucket = s3_resource.Bucket(bucket)

if bucket in s3_resource.buckets.all():
logger.debug("Bucket exists: {}".format(bucket))

folders = set([key.split("/")[0] for key in file_keys_set])
all_s3_objects = []
for folder in folders:
objects_in_folder = [
{"key": obj.key, "size": obj.size}
for obj in bucket.objects.filter(Prefix=folder).all()
]
all_s3_objects = all_s3_objects + objects_in_folder

existing_keys = file_keys_set.intersection(
[obj["key"] for obj in all_s3_objects]
)
non_existing_keys = set(file_keys).difference(existing_keys)
existing_file_details = [
obj for obj in all_s3_objects if obj["key"] in existing_keys
]

return {
"exists": existing_file_details,
"non_exists": list(non_existing_keys),
}
files_metadata = await check_files_metadata(
session=s3_resource, file_keys=request.file_keys
)

else:
logger.info(f"Bucket {bucket} does not exist")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Bucket does not exist",
)
return files_metadata
62 changes: 55 additions & 7 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class GeographySettings(BaseSettings):

COUNTRY_KEYWORD = "country"
STATE_KEYWORD = "state"
CITY_KEYWORD = "city"
DISTRICT_KEYWORD = "district"
COUNTRY_EXPECTATION = {
"data_asset_type": None,
"expectation_suite_name": "country_expectation_suite",
Expand All @@ -200,7 +200,7 @@ class GeographySettings(BaseSettings):
}
STATE_EXPECTATION = {
"data_asset_type": None,
"expectation_suite_name": "date_expectation_suite",
"expectation_suite_name": "state_expectation_suite",
"expectations": [
{
"expectation_type": "expect_column_values_to_be_in_set",
Expand All @@ -217,21 +217,21 @@ class GeographySettings(BaseSettings):
}
],
}
CITY_EXPECTATION = {
DISTRICT_EXPECTATION = {
"data_asset_type": None,
"expectation_suite_name": "city_expectation_suite",
"expectation_suite_name": "district_expectation_suite",
"expectations": [
{
"expectation_type": "expect_column_values_to_be_in_set",
"kwargs": {
"column": "city",
"column": "district",
"value_set": [],
"result_format": "SUMMARY",
},
"meta": {
"expectation_name": "City Name",
"expectation_name": "District Name",
"cleaning_pdf_link": "https://wp.me/ad1WQ9-dvg",
"expectation_error_message": "City Name should be from the Data Dictionary",
"expectation_error_message": "District Name should be from the Data Dictionary",
},
}
],
Expand Down Expand Up @@ -590,3 +590,51 @@ class TagsSettings(BaseSettings):
}
],
}


class InsuranceCompanySettings(BaseSettings):

INSURANCE_COMPANY_NAME_KEYWORD: str = "insurance_company"
INSURANCE_COMPANY_NAME_EXPECTATION = {
"data_asset_type": None,
"expectation_suite_name": "insurance_company_name_expectation_suite",
"expectations": [
{
"expectation_type": "expect_column_values_to_be_in_set",
"kwargs": {
"column": "insurance_company",
"value_set": [],
"result_format": "SUMMARY",
},
"meta": {
"expectation_name": "Insurance Company Name",
"cleaning_pdf_link": "https://wp.me/ad1WQ9-dvg",
"expectation_error_message": "Insurance Company Name should be from the Data Dictionary",
},
}
],
}


class PsuCompanySettings(BaseSettings):

PSU_COMPANY_NAME_KEYWORD: str = "psu_companies"
PSU_COMPANY_NAME_EXPECTATION = {
"data_asset_type": None,
"expectation_suite_name": "psu_company_name_expectation_suite",
"expectations": [
{
"expectation_type": "expect_column_values_to_be_in_set",
"kwargs": {
"column": "psu_companies",
"value_set": [],
"result_format": "SUMMARY",
},
"meta": {
"expectation_name": "PSU Company Name",
"cleaning_pdf_link": "https://wp.me/ad1WQ9-dvg",
"expectation_error_message": "PSU Company Name should be from the Data Dictionary",
},
}
],
}
Loading

0 comments on commit 2a121b0

Please sign in to comment.