Skip to content

Commit

Permalink
Merge pull request #15 from d3b-center/feature/update_validation
Browse files Browse the repository at this point in the history
🚚 update cerberus validation
  • Loading branch information
HuangXiaoyan0106 authored Sep 16, 2024
2 parents 55d26dc + 035772c commit 72387e4
Show file tree
Hide file tree
Showing 9 changed files with 1,097 additions and 104 deletions.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include d3b_dff_cli/modules/validation/validation_rules_schema.json
8 changes: 0 additions & 8 deletions d3b_dff_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,6 @@ def create_parser():
manifest_parser = validation_subparsers.add_parser(
"manifest", help="Manifest validation based on defined rules."
)
manifest_parser.add_argument(
"-rules", help="Formatted JSON file defining validation rules.", required=True
)
manifest_parser.add_argument(
"-rule_type",
help="Specific type of validation rule defined in the json rule file.",
required=True,
)
manifest_parser.add_argument(
"-manifest_file",
help="Manifest based on the d3b genomics manifest template.",
Expand Down
84 changes: 84 additions & 0 deletions d3b_dff_cli/modules/validation/cerberus_custom_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from cerberus import Validator
import warnings

# Suppress specific UserWarnings from Cerberus
warnings.filterwarnings("ignore", category=UserWarning, module="cerberus.validator")

class CustomValidator(Validator):
def __init__(self, schema, rules=None, *args, **kwargs):
"""
Initialize the CustomValidator with schema and optional custom rules.
"""
super().__init__(schema, *args, **kwargs)
self.custom_rules = rules or {}

def _check_dependencies(self, field, document):
"""
Check if the field's dependencies are met.
"""
dependencies = self.schema.get(field, {}).get('dependencies', {})
for dependency_field, allowed_values in dependencies.items():
dependency_value = document.get(dependency_field)
if isinstance(allowed_values, list):
if dependency_value not in allowed_values:
return False
else:
if dependency_value != allowed_values:
return False
return True

def _validate_custom_rules(self, field, value):
"""
Apply custom validation rules that are beyond the default Cerberus validation.
"""
if field == 'file_name':
file_format = self.document.get('file_format')
if file_format:
extensions = self.custom_rules.get('file_name_extensions')
expected_extension = extensions.get(file_format)
if expected_extension and not value.lower().endswith(expected_extension):
self._error(field, f"{field} must end with {expected_extension} for file_format '{file_format}'.")
return False

if field == 'file_size':
file_format = self.document.get('file_format')
experiment = self.document.get("experiment_strategy")
byte_cutoff_general = self.custom_rules.get('file_size_byte_cutoff').get('general_cutoff')
byte_cutoff_wgs_wxs = self.custom_rules.get('file_size_byte_cutoff').get('wgs_wxs_cutoff')
dependencies_format = self.custom_rules.get('file_size_byte_cutoff').get('dependencies').get('file_format')

minum_value = byte_cutoff_wgs_wxs if experiment in ["wgs", "wxs", "wes"] else byte_cutoff_general

if file_format in dependencies_format:
if value < minum_value:
self._error(field, f"[Warning] must be at least {minum_value} for file_format '{file_format}'.")
return False

return True


def validate(self, document, *args, **kwargs):
"""
Override validate method to first check dependencies, then apply default and custom validation.
"""
self.document = document

# Prepare filtered document with fields that meet dependencies
filtered_document = {}
for field in self.schema:
if self._check_dependencies(field, document):
filtered_document[field] = document.get(field)

# Perform default validation
super().validate(filtered_document, *args, **kwargs)

for field, value in filtered_document.items():
self._validate_custom_rules(field, value)

# Filter and return errors
print_errors = {field: errors for field, errors in self.errors.items() if field in filtered_document}

# Determine overall validity based on errors
is_valid = not bool(print_errors)

return is_valid, print_errors
195 changes: 106 additions & 89 deletions d3b_dff_cli/modules/validation/check_manifest.py
Original file line number Diff line number Diff line change
@@ -1,110 +1,127 @@
import json
import argparse
import pandas as pd
import re

# Define a function to perform validation
def validate_row(row, rules):
error_messages = []
for rule in rules:
conditions = rule.get("conditions", [])
consequences = rule.get("consequences", [])
condition_met = all(
str(row.get(cond["column"])).lower() in map(str.lower,cond.get("equals").split(","))
for cond in conditions
)
if condition_met:
for consequence in consequences:
col = consequence.get("column")
op_value = consequence.get("equals")
is_empty = consequence.get("empty")

cell_value = row.get(col)
if is_empty and pd.isna(cell_value):
error_messages.append(f"*{col}*: cannot be empty.")
else:
cell_value = str(cell_value).lower()

if op_value != "" and op_value is not None:
allowed_values = op_value.split(",")
if len(allowed_values) > 1:
if cell_value not in map(str.lower,allowed_values):
error_messages.append(f"*{col}*: must be one of {', '.join(allowed_values)}.")
else:
if cell_value != op_value.lower():
error_messages.append(f"*{col}*: must be {op_value}.")

# Check if file_name ends with a valid extension
if col == "file_name" and "ends_with" in consequence:
format = conditions[0].get("equals")
valid_extensions = consequence["ends_with"].split(",")
if not any(cell_value.lower().endswith(ext.strip()) for ext in valid_extensions):
error_messages.append(f"*file_format* is: {format}, but *{col}* is: {cell_value}, which must end with: {', '.join(valid_extensions)}")

# Check if file_format is "FASTQ," "BAM," or "CRAM" and file_size > specified value
if col == "file_size" and row.get("file_format", "").lower() in ["fastq", "bam", "cram"]:
general_cutoff = consequence.get("general_byte_cutoff")
wgs_wxs_cutoff = consequence.get("wgs_wxs_byte_cutoff")
if general_cutoff:
experiment = row.get("experiment_strategy", "").lower()
if experiment in ["wgs", "wxs", "wes"]:
minum_value = float(wgs_wxs_cutoff)
else:
minum_value = float(general_cutoff)

if pd.notna(cell_value):
try:
size_byte = float(cell_value)
if size_byte < minum_value:
error_messages.append(f"Warning: *{col}* less than {minum_value}")

except ValueError:
error_messages.append(f"*{col}*: {cell_value} is not a valid value")

if error_messages:
return False, error_messages # Return all error messages for this row
else:
return True, None
import json
import os
from .cerberus_custom_checks import CustomValidator

def main(args):
rule_type = args.rule_type
rules_json = args.rules
manifest = args.manifest_file
wk_dir = os.path.dirname(os.path.abspath(__file__))
validation_schema = os.path.join(wk_dir, "validation_rules_schema.json")

file_extension = manifest.split('.')[-1].lower()
def load_data(manifest_file):
"""
Load data from a manifest file and convert strings to lowercase.
"""
file_extension = manifest_file.split('.')[-1].lower()
if file_extension == 'csv':
manifest_data = pd.read_csv(manifest)
manifest_data = pd.read_csv(manifest_file)
elif file_extension == 'tsv':
manifest_data = pd.read_csv(manifest, delimiter='\t')
manifest_data = pd.read_csv(manifest_file, delimiter='\t')
elif file_extension in ['xls', 'xlsx']:
xlsx = pd.ExcelFile(manifest)
xlsx = pd.ExcelFile(manifest_file)
if len(xlsx.sheet_names) == 1:
manifest_data = pd.read_excel(xlsx)
elif "Genomics_Manifest" in xlsx.sheet_names:
manifest_data = pd.read_excel(xlsx, "Genomics_Manifest")
manifest_data = pd.read_excel(xlsx, sheet_name="Genomics_Manifest")
else:
raise ValueError(f"Genomics_Manifest sheet not found in {manifest}")
raise ValueError(f"Sheet 'Genomics_Manifest' not found in {manifest_file}")
else:
raise ValueError("Unsupported file format. Please provide a CSV, TSV, or Excel file.")

with open(rules_json, "r") as json_file:
validation_rules = json.load(json_file)[rule_type]
manifest_data = manifest_data.apply(lambda col: col.astype(str).str.lower() if col.dtype.name in ['object'] else col)
return manifest_data

def convert_schema_to_lowercase(schema):
"""
Convert all string values in the schema to lowercase.
"""
for k, v in schema.items():
if isinstance(v, dict):
convert_schema_to_lowercase(v)
elif isinstance(v, list):
schema[k] = [item.lower() if isinstance(item, str) else item for item in v]
elif isinstance(v, str):
schema[k] = v.lower()
return schema

# Iterate through each row in the DataFrame and perform validation
validation_failed = False
for index, row in manifest_data.iterrows():
is_valid, messages = validate_row(row, validation_rules)
def validate_data(df, schema_json):
"""
Validate the DataFrame against the schema.
"""
valid = True
errors = []
custom_rules = schema_json.get('custom_rules', {})

for index, row in df.iterrows():
experiment_strategy = row.get("experiment_strategy", "").lower()
platform = row.get("platform", "").lower()

if platform == "pacbio":
rule_type = "pacbio_longread_rules"
else:
if experiment_strategy in ["wgs", "wxs", "wes", "target sequencing", "panel", "target"]:
rule_type = "DNAseq_rules"
elif experiment_strategy in ["rna-seq", "rnaseq", "mirna-seq", "mirnaseq"]:
rule_type = "RNAseq_rules"
elif experiment_strategy in ["scrna-seq", "snran-seq", "scrnaseq", "snranseq"]:
rule_type = "single_cell_rules"
elif experiment_strategy in ["methtlation", "methylation microarray"]:
rule_type = "methylation_rules"
else:
raise ValueError(f"Unsupported experiment_strategy for Row {index + 1}")

schema = schema_json.get(rule_type, {})

# Filter out fields not in the schema fields for combined manifest
row_dict = row.to_dict()
filtered_row_dict = {k: v for k, v in row_dict.items() if k in schema}

v = CustomValidator(schema, custom_rules)
(is_valid, out_error) = v.validate(filtered_row_dict)

if not is_valid:
error_message = "Validation Failed For Row {0}:\n{1}".format(index + 1, '\n'.join(messages))
print(error_message,"\n")
validation_failed = True
if not validation_failed:
print("Validation Passed: All rows are valid.")
valid = False
errors.append({
'row': index + 1,
'errors': out_error
})

return valid, errors

def main(args):
"""
Main function to load schema, validate data, and print the validation report.
"""
with open(validation_schema, 'r') as f:
schema = json.load(f)

schema_json = convert_schema_to_lowercase(schema)

# Load and preprocess the data
df = load_data(args.manifest_file)

# Validate the data
valid, errors = validate_data(df, schema_json)

# Print validation report
if valid:
print("====Validation Passed====\n All rows are valid.")
else:
# Check if all warning messages
only_warnings = all("Warning" in field_error for error in errors for field_errors in error['errors'].values() for field_error in field_errors)

if only_warnings:
print("====Validation Warnings====")
else:
print("====Validation Failed====")

for error in errors:
print(f"Row {error['row']}:")
for field, field_errors in error['errors'].items():
for field_error in field_errors:
print(f" {field}: {field_error}")

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Validate a manifest based on defined rules.")
parser.add_argument("-rules", help="Formatted JSON file defining validation rules.", required=True)
parser.add_argument("-rule_type", help="Specific type of validation rule defined in the json rule file.")
parser.add_argument("-manifest_file", help="Manifest based on the d3b genomics manifest template.")
parser.add_argument("-manifest_file", required=True, help="Path to the manifest file (CSV/Excel).")
args = parser.parse_args()
main(args)
Loading

0 comments on commit 72387e4

Please sign in to comment.