Skip to content

Commit

Permalink
First pass of validation rules for FOCUS 1.0 (#102)
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Kwan <[email protected]>
Co-authored-by: Varun Mittal <[email protected]>
  • Loading branch information
joshk0 and varunmittal91 authored Nov 22, 2023
1 parent 6fe474a commit 935eb6f
Show file tree
Hide file tree
Showing 149 changed files with 986 additions and 250 deletions.
1 change: 1 addition & 0 deletions .github/workflows/coverage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ jobs:
run: |
poetry config virtualenvs.create true --local
poetry config virtualenvs.in-project true --local
poetry lock
- uses: actions/cache@v3
name: Define a cache for the virtual environment based on the dependencies lock file
with:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ jobs:
run: |
poetry config virtualenvs.create true --local
poetry config virtualenvs.in-project true --local
poetry lock
- uses: actions/cache@v3
name: Define a cache for the virtual environment based on the dependencies lock file
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/unittest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:

strategy:
matrix:
python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12" ]
python-version: [ "3.8", "3.9", "3.10", "3.11" ]

steps:
- uses: actions/checkout@v3
Expand Down
20 changes: 19 additions & 1 deletion focus_validator/config_objects/common.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from enum import Enum
from typing import List, Literal

from pydantic import BaseModel
import sqlglot
from pydantic import BaseModel, field_validator


class AllowNullsCheck(BaseModel):
Expand All @@ -12,6 +13,22 @@ class ValueInCheck(BaseModel):
value_in: List[str]


class SQLQueryCheck(BaseModel):
sql_query: str

@field_validator("sql_query")
def check_sql_query(cls, sql_query):
returned_columns = [
column.alias
for column in sqlglot.parse_one(sql_query).find_all(sqlglot.exp.Alias)
]

assert returned_columns == [
"check_output"
], "SQL query must only return a column called 'check_output'"
return sql_query


SIMPLE_CHECKS = Literal["check_unique", "column_required"]


Expand All @@ -20,6 +37,7 @@ class DataTypes(Enum):
DECIMAL = "decimal"
DATETIME = "datetime"
CURRENCY_CODE = "currency-code"
STRINGIFIED_JSON_OBJECT = "stringified-json-object"


class DataTypeCheck(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
from itertools import groupby
from typing import Dict, List, Optional, Set, Union

import pandas as pd
import pandera as pa
import sqlglot
from pandera.api.pandas.types import PandasDtypeInputTypes

from focus_validator.config_objects import ChecklistObject, InvalidRule, Rule
Expand All @@ -10,11 +13,25 @@
ChecklistObjectStatus,
DataTypeCheck,
DataTypes,
SQLQueryCheck,
ValueInCheck,
)
from focus_validator.config_objects.override import Override
from focus_validator.exceptions import FocusNotImplementedError

# group index column adds a column to the dataframe which is used to group the dataframe, otherwise the default
# groupby function does not carry forward all rows in the dataframe causing it to not have row numbers
GROUP_INDEX_COLUMN = "group_index_column"


def __groupby_fnc__(df: pd.DataFrame, column_alias: List[str]):
"""
Custom groupby function to be used with pandera check_sql_query, allowing null values
Default groupby function does not allow null values
"""
df[GROUP_INDEX_COLUMN] = range(0, len(df))
return df.groupby(column_alias + [GROUP_INDEX_COLUMN], dropna=False)


class FocusToPanderaSchemaConverter:
@staticmethod
Expand All @@ -40,6 +57,19 @@ def __generate_pandera_check__(rule: Rule, check_id):
return pa.Check.check_value_in(
allowed_values=check.value_in, error=error_string
)
elif isinstance(check, SQLQueryCheck):
column_alias = [
column.alias_or_name
for column in sqlglot.parse_one(check.sql_query).find_all(
sqlglot.exp.Column
)
]
return pa.Check.check_sql_query(
sql_query=check.sql_query,
error=error_string,
column_alias=column_alias,
groupby=lambda df: __groupby_fnc__(df=df, column_alias=column_alias),
)
elif isinstance(check, AllowNullsCheck):
return pa.Check.check_not_null(
error=error_string, ignore_na=False, allow_nulls=check.allow_nulls
Expand Down Expand Up @@ -77,6 +107,14 @@ def __generate_column_definition__(
error=f"{rule.check_id}:::Ensures that column is of {data_type.value} type.",
)
)
elif data_type == DataTypes.STRINGIFIED_JSON_OBJECT:
pandera_type = None
column_checks.append(
pa.Check.check_stringified_json_object_dtype(
ignore_na=True,
error=f"{rule.check_id}:::Ensures that column is of {data_type.value} type.",
)
)
else:
pandera_type = pa.String

Expand Down Expand Up @@ -151,7 +189,7 @@ def generate_pandera_schema(
for rule in rules:
if isinstance(rule, InvalidRule):
checklist[rule.rule_path] = ChecklistObject(
check_name=rule.rule_path,
check_name=os.path.splitext(os.path.basename(rule.rule_path))[0],
column_id="Unknown",
error=f"{rule.error_type}: {rule.error}",
status=ChecklistObjectStatus.ERRORED,
Expand Down Expand Up @@ -180,4 +218,7 @@ def generate_pandera_schema(
overrides=overrides,
schema_dict=schema_dict,
)
return pa.DataFrameSchema(schema_dict, strict=False), checklist
return (
pa.DataFrameSchema(schema_dict, strict=False),
checklist,
)
25 changes: 19 additions & 6 deletions focus_validator/config_objects/rule.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
from typing import Optional, Union

import yaml
Expand All @@ -8,6 +9,7 @@
AllowNullsCheck,
ChecklistObjectStatus,
DataTypeCheck,
SQLQueryCheck,
ValueInCheck,
generate_check_friendly_name,
)
Expand All @@ -27,7 +29,9 @@ class Rule(BaseModel):

check_id: str
column_id: str
check: Union[SIMPLE_CHECKS, AllowNullsCheck, ValueInCheck, DataTypeCheck]
check: Union[
SIMPLE_CHECKS, AllowNullsCheck, ValueInCheck, DataTypeCheck, SQLQueryCheck
]

check_friendly_name: Optional[
str
Expand All @@ -46,6 +50,8 @@ def root_val(cls, values):
"""
Root validator that checks for all options passed in the config and generate missing options.
"""
if values is None:
values = {}

check = values.get("check")
check_friendly_name = values.get("check_friendly_name")
Expand All @@ -59,17 +65,19 @@ def root_val(cls, values):
check_type_friendly_name = check.__class__.__name__
values["check_type_friendly_name"] = check_type_friendly_name

if check_friendly_name is None and column_id is not None:
values["check_friendly_name"] = generate_check_friendly_name(
check=check, column_id=column_id
)
if check_friendly_name is None and column_id is not None:
values["check_friendly_name"] = generate_check_friendly_name(
check=check, column_id=column_id
)

return values

@staticmethod
def load_yaml(
rule_path, column_namespace: Optional[str] = None
) -> Union["Rule", InvalidRule]:
rule_path_basename = os.path.splitext(os.path.basename(rule_path))[0]

try:
with open(rule_path, "r") as f:
rule_obj = yaml.safe_load(f)
Expand All @@ -81,10 +89,15 @@ def load_yaml(
):
rule_obj["column"] = f"{column_namespace}:{rule_obj['column']}"

if isinstance(rule_obj, dict) and "check_id" not in rule_obj:
rule_obj["check_id"] = rule_path_basename

return Rule.model_validate(rule_obj)
except Exception as e:
return InvalidRule(
rule_path=rule_path, error=str(e), error_type=e.__class__.__name__
rule_path=rule_path_basename,
error=str(e),
error_type=e.__class__.__name__,
)


Expand Down
4 changes: 2 additions & 2 deletions focus_validator/data_loaders/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ def __init__(self, data_filename):
def find_data_loader(self):
file_mime_type = get_file_mime_type(self.data_filename)

if file_mime_type in ["ASCII text", "CSV text"]:
if file_mime_type in ["ASCII text", "CSV text", "CSV ASCII text"]:
return CSVDataLoader
elif file_mime_type == "Apache Parquet":
return ParquetDataLoader
else:
raise FocusNotImplementedError(
msg=f"Validator for file_type {file_mime_type} not implemented yet."
msg=f"Validator for file_type '{file_mime_type}' not implemented yet."
)

def load(self):
Expand Down
2 changes: 1 addition & 1 deletion focus_validator/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def main():
help="Allow transitional rules in validation",
)
parser.add_argument(
"--validate-version", default="0.5", help="Version of FOCUS to validate against"
"--validate-version", default="1.0", help="Version of FOCUS to validate against"
)
parser.add_argument(
"--rule-set-path",
Expand Down
60 changes: 35 additions & 25 deletions focus_validator/outputter/outputter_console.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import math

import pandas as pd
from tabulate import tabulate

from focus_validator.config_objects import Rule
from focus_validator.rules.spec_rules import ValidationResult
Expand Down Expand Up @@ -55,41 +55,51 @@ def __restructure_check_list__(result_set: ValidationResult):

def write(self, result_set: ValidationResult):
self.result_set = result_set

checklist = self.__restructure_check_list__(result_set)
print("Checklist:")
print(tabulate(checklist, headers="keys", tablefmt="psql"))

if result_set.failure_cases is not None:
aggregated_failures = result_set.failure_cases.groupby(by=['Check Name', 'Column', 'Description'], as_index=False).aggregate(lambda x: maybe_collapse_range(x.unique().tolist()))
aggregated_failures = result_set.failure_cases.groupby(
by=["Check Name", "Description"], as_index=False
).aggregate(lambda x: collapse_occurrence_range(x.unique().tolist()))

print("Checks summary:")
print(
tabulate(
tabular_data=aggregated_failures, # type: ignore
headers="keys",
tablefmt="psql",
print("Errors encountered:")
for _, fail in aggregated_failures.iterrows():
print(
f'{fail["Check Name"]} failed:\n\tDescription: {fail["Description"]}\n\tRows: {fail["Row #"] if fail["Row #"] else "(whole file)"}\n\tExample values: {fail["Values"] if fail["Values"] else "(none)"}\n'
)
)
print("Validation failed!")
else:
print("Validation succeeded.")

def maybe_collapse_range(l):

def collapse_occurrence_range(occurrence_range: list):
start = None
i = None
collapsed = []
for n in sorted(l):

# Edge case
if len(occurrence_range) == 1:
if isinstance(occurrence_range[0], float) and math.isnan(occurrence_range[0]):
return ""
if occurrence_range[0] is None:
return ""

for n in sorted(occurrence_range):
if not isinstance(n, int) and not (isinstance(n, float) and not math.isnan(n)):
return l
return ",".join([str(x) for x in occurrence_range])
elif i is None:
start = i = n
start = i = int(n)
elif n == i + 1:
i = n
i = int(n)
elif i:
if i == start: collapsed.append(f'{int(start)}')
else: collapsed.append(f'{int(start)}-{int(i)}')
start = i = n
if i == start:
collapsed.append(f"{start}")
else:
collapsed.append(f"{start}-{i}")
start = i = int(n)

if start is not None:
if i == start: collapsed.append(int(start))
else: collapsed.append(f'{int(start)}-{int(i)}')
if i == start:
collapsed.append(f"{start}")
else:
collapsed.append(f"{start}-{i}")

return collapsed
return ",".join(collapsed)
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
check_id: FV-M002-0001
column_id: AmortizedCost
check:
data_type: decimal
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
check_id: FV-M002-0002
column_id: AmortizedCost
check:
allow_nulls: false
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
check_id: FV-M002-0003
column_id: AmortizedCost
check:
column_required
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
check_id: FV-D014-0001
column_id: AvailabilityZone
check:
data_type: string
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
check_id: FV-D014-0002
column_id: AvailabilityZone
check:
allow_nulls: true
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
check_id: FV-M001-0001
column_id: BilledCost
check:
data_type: decimal
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
check_id: FV-M001-0002
column_id: BilledCost
check:
allow_nulls: false
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
check_id: FV-M001-0003
column_id: BilledCost
check:
column_required
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
check_id: FV-D010-0001
column_id: BilledCurrency
check:
data_type: currency-code
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
check_id: FV-D010-0002
column_id: BilledCurrency
check:
allow_nulls: false
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
check_id: FV-D010-0003
column_id: BilledCurrency
check:
column_required
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
check_id: FV-D006-0001
column_id: BillingAccountId
check:
data_type: string
Loading

0 comments on commit 935eb6f

Please sign in to comment.