Skip to content

Commit

Permalink
Add a range-validation feature (#477)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielhuppmann authored Mar 3, 2025
1 parent 4e5d9a9 commit 3c0943d
Show file tree
Hide file tree
Showing 13 changed files with 143 additions and 88 deletions.
1 change: 1 addition & 0 deletions nomenclature/processor/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class AggregationItem(BaseModel):

class Aggregator(Processor):
"""Aggregation or renaming of an IamDataFrame on a `dimension`"""

file: FilePath
dimension: str
aggregate: list[AggregationItem]
Expand Down
151 changes: 102 additions & 49 deletions nomenclature/processor/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@
from pathlib import Path

import yaml
from pandas import concat
import pandas as pd
from pyam import IamDataFrame
from pyam.logging import adjust_log_level
from pydantic import computed_field, field_validator, model_validator
from pydantic import (
BaseModel,
ConfigDict,
computed_field,
field_validator,
model_validator,
Field,
)

from nomenclature.definition import DataStructureDefinition
from nomenclature.error import ErrorCollector
Expand All @@ -25,11 +32,20 @@ class WarningEnum(str, Enum):
low = "low"


class DataValidationCriteria(IamcDataFilter):
class DataValidationCriteria(BaseModel):
model_config = ConfigDict(extra="forbid")

warning_level: WarningEnum = WarningEnum.error

@property
def criteria(self):
pass

def __str__(self):
return ", ".join([f"{key}: {value}" for key, value in self.criteria.items()])


class DataValidationCriteriaValue(DataValidationCriteria):
class DataValidationValue(DataValidationCriteria):
value: float
rtol: float = 0.0
atol: float = 0.0
Expand Down Expand Up @@ -65,7 +81,10 @@ def criteria(self):
)


class DataValidationCriteriaBounds(DataValidationCriteria):
class DataValidationBounds(DataValidationCriteria):
# allow extra but raise error to guard against multiple criteria
model_config = ConfigDict(extra="allow")

upper_bound: float | None = None
lower_bound: float | None = None

Expand All @@ -75,6 +94,14 @@ def check_validation_criteria_exist(self):
raise ValueError("No validation criteria provided: " + str(self.criteria))
return self

@model_validator(mode="after")
def check_validation_multiple_criteria(self):
if self.model_extra:
raise ValueError(
"Must use either bounds, range or value, found: " + str(self.criteria)
)
return self

@property
def validation_args(self):
return self.criteria
Expand All @@ -86,67 +113,96 @@ def criteria(self):
)


class DataValidationCriteriaMultiple(IamcDataFilter):
validation: (
list[DataValidationCriteriaValue | DataValidationCriteriaBounds] | None
) = None
class DataValidationRange(DataValidationCriteria):
range: list[float] = Field(..., min_length=2, max_length=2)

@field_validator("range", mode="after")
def check_range_is_valid(cls, value: list[float]):
if value[0] > value[1]:
raise ValueError(
"Validation 'range' must be given as `(lower_bound, upper_bound)`, "
"found: " + str(value)
)
return value

@computed_field
def upper_bound(self) -> float:
return self.range[1]

@computed_field
def lower_bound(self) -> float:
return self.range[0]

@property
def validation_args(self):
"""Attributes used for validation (as bounds)."""
return self.model_dump(
exclude_none=True,
exclude_unset=True,
exclude=["warning_level", "range"],
)

@property
def criteria(self):
return self.model_dump(
exclude_none=True,
exclude_unset=True,
exclude=["warning_level", "lower_bound", "upper_bound"],
)


class DataValidationItem(IamcDataFilter):
validation: list[DataValidationValue | DataValidationRange | DataValidationBounds]

@model_validator(mode="after")
def check_warnings_order(self):
"""Check if warnings are set in descending order of severity."""
if self.validation != sorted(self.validation, key=lambda c: c.warning_level):
raise ValueError(
f"Validation criteria for {self.criteria} not"
f"Validation criteria for {self.criteria} not sorted"
" in descending order of severity."
)
else:
return self

@property
def criteria(self):
def filter_args(self):
"""Attributes used for validation (as specified in the file)."""
return self.model_dump(
exclude_none=True, exclude_unset=True, exclude=["validation"]
)

def __str__(self):
return ", ".join([f"{key}: {value}" for key, value in self.filter_args.items()])


class DataValidator(Processor):
"""Processor for validating IAMC datapoints"""

criteria_items: list[DataValidationCriteriaMultiple]
criteria_items: list[DataValidationItem]
file: Path

@field_validator("criteria_items", mode="before")
def check_criteria(cls, v):
for item in v:
for criterion in item["validation"]:
has_bounds = any(c in criterion for c in ["upper_bound", "lower_bound"])
has_values = any(c in criterion for c in ["value", "atol", "rtol"])
if has_bounds and has_values:
raise ValueError(
f"Cannot use bounds and value-criteria simultaneously: {criterion}"
)
return v

@classmethod
def from_file(cls, file: Path | str) -> "DataValidator":
with open(file, "r", encoding="utf-8") as f:
content = yaml.safe_load(f)
criteria_items = []
for item in content:
filter_args = {k: item[k] for k in item if k in IamcDataFilter.model_fields}
criteria_args = {
k: item[k]
for k in item
if k not in IamcDataFilter.model_fields and k != "validation"
}
if "validation" in item:
for criterion in item["validation"]:
criterion.update(filter_args)
else:
item["validation"] = [{**filter_args, **criteria_args}]
criteria_items.append({k: item[k] for k in item if k not in criteria_args})
return cls(file=file, criteria_items=criteria_items)
# handling of simple case where filter and criteria args are given at the same level
if "validation" not in item:
filter_args = {
k: item[k] for k in item if k in IamcDataFilter.model_fields
}
criteria_args = [
{
k: item[k]
for k in item
if k not in IamcDataFilter.model_fields and k != "validation"
}
]
item = dict(**filter_args, validation=criteria_args)
criteria_items.append(item)
return cls(file=file, criteria_items=criteria_items) # type: ignore

def apply(self, df: IamDataFrame) -> IamDataFrame:
"""Validates data in IAMC format according to specified criteria.
Expand All @@ -172,33 +228,30 @@ def apply(self, df: IamDataFrame) -> IamDataFrame:

with adjust_log_level():
for item in self.criteria_items:
per_item_df = df
per_item_df = df.filter(**item.filter_args)
for criterion in item.validation:
failed_validation = per_item_df.validate(
**criterion.validation_args
)
if failed_validation is not None:
per_item_df = IamDataFrame(
concat([df.data, failed_validation]).drop_duplicates(
keep=False
)
)
criteria_msg = " Criteria: " + ", ".join(
[
f"{key}: {value}"
for key, value in criterion.criteria.items()
]
pd.concat(
[per_item_df.data, failed_validation]
).drop_duplicates(keep=False)
)
failed_validation["warning_level"] = (
criterion.warning_level.value
)
if criterion.warning_level == WarningEnum.error:
error = True
fail_list.append(criteria_msg)
fail_list.append(
textwrap.indent(str(failed_validation), prefix=" ")
" Criteria: " + str(item) + ", " + str(criterion)
)
fail_list.append(
textwrap.indent(failed_validation.to_string(), prefix=" ")
+ "\n"
)

fail_msg = "(file %s):\n" % get_relative_path(self.file)
if error:
fail_msg = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,5 @@
validation:
- warning_level: low
upper_bound: 2.5
lower_bound: 1
- upper_bound: 5
lower_bound: 1
- variable: Primary Energy|Coal
year: 2010
upper_bound: 5
lower_bound: 1
# default warning_level: error
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- variable: Primary Energy
year: 2010
validation:
- range: [ 1, 5 ]
Loading

0 comments on commit 3c0943d

Please sign in to comment.