Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
ClausHolbechArista committed Jul 3, 2024
1 parent ffe6195 commit d815ba2
Show file tree
Hide file tree
Showing 20 changed files with 5,401 additions and 2,547 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ repos:
language: python
files: python-avd/pyavd/[a-z_]+/schema
pass_filenames: false
additional_dependencies: ['deepmerge>=1.1.0', 'PyYAML>=6.0.0', 'pydantic>=2.3.0', 'jsonschema>=4.10.3', 'referencing>=0.35.0']
additional_dependencies: ['deepmerge>=1.1.0', 'PyYAML>=6.0.0', 'pydantic>=2.3.0', 'jsonschema>=4.10.3', 'referencing>=0.35.0', 'isort']

- id: templates
name: Precompile eos_cli_config_gen Jinja2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6697,7 +6697,7 @@
"algorithm": {
"type": "string",
"default": "first_id",
"description": "This variable defines the Multi-chassis Link Aggregation (MLAG) algorithm used.\nEach MLAG link will have a /31* subnet with each subnet allocated from the relevant MLAG pool via a calculated offset.\nThe offset is calculated using one of the following algorithms:\n - first_id: `(mlag_primary_id - 1) * 2` where `mlag_primary_id` is the ID of the first node defined under the node_group.\n This allocation method will skip every other /31* subnet making it less space efficient than `odd_id`.\n - odd_id: `(odd_id - 1) / 2`. Requires the node_group to have a node with an odd ID and a node with an even ID.\n - same_subnet: the offset will always be zero.\n This allocation method will cause every MLAG link to be addressed with the same /31* subnet.\n\\* - The prefix length is configurable with a default of /31.",
"description": "This variable defines the Multi-chassis Link Aggregation (MLAG) algorithm used.\nEach MLAG link will have a /31\u00b9 subnet with each subnet allocated from the relevant MLAG pool via a calculated offset.\nThe offset is calculated using one of the following algorithms:\n - first_id: `(mlag_primary_id - 1) * 2` where `mlag_primary_id` is the ID of the first node defined under the node_group.\n This allocation method will skip every other /31\u00b9 subnet making it less space efficient than `odd_id`.\n - odd_id: `(odd_id - 1) / 2`. Requires the node_group to have a node with an odd ID and a node with an even ID.\n - same_subnet: the offset will always be zero.\n This allocation method will cause every MLAG link to be addressed with the same /31\u00b9 subnet.\n\u00b9 The prefix length is configurable with a default of /31.",
"enum": [
"first_id",
"odd_id",
Expand Down
12 changes: 6 additions & 6 deletions python-avd/pyavd/_eos_designs/schema/eos_designs.schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1363,17 +1363,17 @@ keys:
type: str
default: first_id
description: "This variable defines the Multi-chassis Link Aggregation
(MLAG) algorithm used.\nEach MLAG link will have a /31* subnet with
(MLAG) algorithm used.\nEach MLAG link will have a /31\xB9 subnet with
each subnet allocated from the relevant MLAG pool via a calculated offset.\nThe
offset is calculated using one of the following algorithms:\n - first_id:
`(mlag_primary_id - 1) * 2` where `mlag_primary_id` is the ID of the
first node defined under the node_group.\n This allocation method
will skip every other /31* subnet making it less space efficient than
`odd_id`.\n - odd_id: `(odd_id - 1) / 2`. Requires the node_group to
have a node with an odd ID and a node with an even ID.\n - same_subnet:
will skip every other /31\xB9 subnet making it less space efficient
than `odd_id`.\n - odd_id: `(odd_id - 1) / 2`. Requires the node_group
to have a node with an odd ID and a node with an even ID.\n - same_subnet:
the offset will always be zero.\n This allocation method will cause
every MLAG link to be addressed with the same /31* subnet.\n\\* - The
prefix length is configurable with a default of /31."
every MLAG link to be addressed with the same /31\xB9 subnet.\n\xB9
The prefix length is configurable with a default of /31."
valid_values:
- first_id
- odd_id
Expand Down
2,741 changes: 1,476 additions & 1,265 deletions python-avd/pyavd/_schema/eos_cli_config_gen.py

Large diffs are not rendered by default.

4,211 changes: 3,281 additions & 930 deletions python-avd/pyavd/_schema/eos_designs.py

Large diffs are not rendered by default.

172 changes: 123 additions & 49 deletions python-avd/pyavd/_schema/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
# that can be found in the LICENSE file.
from __future__ import annotations

from typing import Any, TYPE_CHECKING, get_type_hints, get_origin, get_args, Annotated
from collections import ChainMap
from typing import Any, TYPE_CHECKING, Optional, Union, get_type_hints, get_origin, get_args, Annotated, Generator
from types import NoneType, UnionType
from warnings import warn

from .._utils import get_all
from .types import InvalidKey
from .validator import validator

if TYPE_CHECKING:
from .eos_designs import EosDesigns
Expand All @@ -19,62 +21,135 @@


class AvdBase:

_allow_other_keys: bool = False

def __repr__(self) -> str:
# cls_name = super().__repr__().split(maxsplit=1)[0]
cls_name = self.__class__.__name__
attrs = [f"{key}={repr(getattr(self, key, None))}" for key in self._get_type_hints() if not key.startswith("_") and key[0].islower()]
attrs = [f"{key}={repr(getattr(self, key, None))}" for key in self.keys()]
return f"<{cls_name}({', '.join(attrs)})>"

def validate(self, data: Any) -> bool:
pass

def _set_with_coercion(self, attr: str, value: Any):
type_hints = self._get_type_hints()[attr]
if isinstance(value, type_hints):
return setattr(self, attr, value)
base_class = self._get_attr_base_class(attr)
print(base_class)
if issubclass(base_class, AvdBase) and isinstance(value, dict):
return setattr(self, attr, base_class(**value))
if base_class is int and isinstance(value, (str, bool)):
value = int(value)
return setattr(self, attr, value)
if base_class is str and isinstance(value, (int, bool, float)):
value = str(value)
return setattr(self, attr, value)
raise TypeError(f"Invalid type '{type(value)}. Expected '{base_class}'")

@classmethod
def _get_type_hints(cls) -> type:
return get_type_hints(cls, include_extras=True)
def keys(cls) -> Generator[str, None, None]:
return (key for key in cls._get_type_hints() if not key.startswith("_") and key[0].islower())

def items(self) -> Generator[tuple[str, Any], None, None]:
return ((key, getattr(self, key, None)) for key in self.keys())

@classmethod
def _get_attr_base_class(cls, attr: str) -> type:
type_hints = cls._get_type_hints()[attr]
type_hints = cls._strip_union_none_from_type_hints(type_hints)
origin = get_origin(type_hints)
if origin is Annotated:
return get_args(type_hints)[0]

return type_hints

@staticmethod
def _strip_union_none_from_type_hints(type_hints: type) -> type:
origin = get_origin(type_hints)
if origin is UnionType:
args = tuple(arg for arg in get_args(type_hints) if arg is not NoneType)
if len(args) > 1:
raise TypeError("Unable to remove union since this field has more non-NonType than one")
return args[0]
return type_hints
def validate(cls, data: dict, path: list | None = None) -> bool: # TODO: Rename all methods to something that cannot collide with schema keys
if warnings := validator(data, cls, path or []):
for warning in warnings:
warn(warning)
return False
return True

# if not isinstance(data, (dict, ChainMap)):
# raise TypeError("Invalid data type '{type(data)}'. Expected dict.")

# valid = True
# if path is None:
# path = []

# base_type_hints = cls._get_type_hints(include_extras=False)
# for key, value in data.items():
# if not isinstance(key, str) or key.startswith("_") or key not in base_type_hints:
# if not cls._allow_other_keys:
# warn(InvalidKey(str([*path, key]), path=[*path, key]))
# valid = False
# continue

# type_hints = base_type_hints[key]
# base_class = cls._get_attr_base_class(key)

# if value is None and cls._is_optional(type_hints):
# # No need to try further validation if the value is None.
# continue

# try:
# if issubclass(base_class, AvdBase):
# avd_class: AvdBase = cls._strip_union_none_from_type_hints(type_hints)
# if not isinstance(value, (avd_class, dict, ChainMap)):
# warn(f"{key}: Invalid type '{type(value)}'. Expected '{avd_class}' or dict or ChainMap. Optional: {cls._is_optional(type_hints)}")
# valid = False
# # No need to try further validation if the type is wrong.
# continue
# # Perform validation of the value on the base_class
# valid = avd_class.validate(value, path=[*path, key]) and valid

# elif not (isinstance(value, base_class) or (base_class in ACCEPTED_COERCION_MAP and isinstance(value, ACCEPTED_COERCION_MAP[base_class]))):
# warn(f"{key}: Invalid type '{type(value)}'. Expected '{base_class}'")
# valid = False
# # No need to try further validation if the type is wrong.
# continue
# except TypeError as e:
# raise TypeError(f"type_hints: {type_hints}, base_class: {base_class}", e)

# return valid

# @staticmethod
# def _is_optional(type_hints: type) -> bool:
# origin = get_origin(type_hints)
# if origin is Optional:
# return True

# if origin is UnionType or origin is Union:
# return NoneType in get_args(type_hints)

# return False

# def _set_with_coercion(self, attr: str, value: Any):
# type_hints = self._get_type_hints(include_extras=False)[attr]
# if isinstance(value, type_hints):
# return setattr(self, attr, value)

# base_class = self._get_attr_base_class(attr)
# print(base_class)

# if issubclass(base_class, AvdBase) and isinstance(value, dict):
# avd_class: AvdBase = self._strip_union_none_from_type_hints(type_hints)
# return setattr(self, attr, avd_class(**value))

# if base_class in ACCEPTED_COERCION_MAP and isinstance(value, ACCEPTED_COERCION_MAP[base_class]):
# value = base_class(value)
# return setattr(self, attr, value)

# raise TypeError(f"Invalid type '{type(value)}. Expected '{base_class}'")

@classmethod
def _get_attr_annotations(cls, attr: str) -> tuple[type]:
type_hints = cls._get_type_hints()[attr]
type_hints = cls._strip_union_none_from_type_hints(type_hints)
origin = get_origin(type_hints)
if origin is Annotated:
return get_args(type_hints)[1:]
def _get_type_hints(cls, include_extras: bool = True) -> dict[str, type]:
return get_type_hints(cls, include_extras=include_extras)

# @classmethod
# def _get_attr_base_class(cls, attr: str) -> type:
# org_type_hints = cls._get_type_hints(include_extras=False)[attr]
# type_hints = cls._strip_union_none_from_type_hints(org_type_hints)
# origin = get_origin(type_hints)
# if origin is None:
# return type_hints
# return origin

# @staticmethod
# def _strip_union_none_from_type_hints(type_hints: type) -> type:
# while True:
# origin = get_origin(type_hints)
# if origin is UnionType or origin is Union:
# args = tuple(arg for arg in get_args(type_hints) if arg is not NoneType)
# if len(args) > 1:
# raise TypeError("Unable to remove union since this field has more non-NonType than one")
# type_hints = args[0]
# continue

# if origin is Optional:
# args = get_args(type_hints)
# if len(args) > 1:
# raise TypeError("Unable to remove Optional since this field has types inside than one")
# type_hints = args[0]
# continue

# break
# return type_hints


class AvdDictBaseModel(AvdBase):
Expand All @@ -86,7 +161,6 @@ class AvdDictBaseModel(AvdBase):
"""

_custom_data: dict[str, Any] | None = None
_allow_other_keys: bool = False

def __init__(self, **kwargs):
"""
Expand Down Expand Up @@ -172,7 +246,7 @@ def _extract_dynamic_keys(self: EosDesigns, kwargs: dict[str, Any]):
model_key: str = dynamic_key_map["model_key"]
model_key_list = []

dynamic_keys = get_all(self, dynamic_keys_path, required=True)
dynamic_keys = get_all(self, dynamic_keys_path)
for dynamic_key in dynamic_keys:
# dynamic_key is one key like "l3leaf".
if kwargs.get(dynamic_key) is None:
Expand Down
115 changes: 55 additions & 60 deletions python-avd/pyavd/_schema/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,84 +3,79 @@
# that can be found in the LICENSE file.
from __future__ import annotations

from typing import Iterator
from types import GenericAlias
from typing import Any, ClassVar

from annotated_types import BaseMetadata, GroupedMetadata
from pydantic import BeforeValidator

class ValidationOption:
is_validation_type: ClassVar[bool] = True

class ConvertTypes:
def __class_getitem__(cls, item):
return GenericAlias(cls, item)


class Format:
def __class_getitem__(cls, item):
return GenericAlias(cls, item)
class Format(ValidationOption):
pass


class MaxLen:
def __class_getitem__(cls, item):
return GenericAlias(cls, item)
class MaxLen(ValidationOption):
pass


class MinLen:
def __class_getitem__(cls, item):
return GenericAlias(cls, item)
class MinLen(ValidationOption):
pass


class Pattern:
def __class_getitem__(cls, item):
return GenericAlias(cls, item)
class Pattern(ValidationOption):
pass


class ToLower:
is_validation_type: ClassVar[bool] = True


class ValidValues(ValidationOption):
pass


class InvalidKey(UserWarning):
def __init__(self, msg, path: list[str | int] | None = None):
class ValidationWarning(UserWarning):
msg: str
"""Message"""
path: list[str | int]
"""Data path to key that raised this warning"""


class InvalidKey(ValidationWarning):
def __init__(self, key: str, path: list[str | int] | None = None):
self.path = path
self.msg = f"Got invalid key '{key}'"
return super().__init__(self.msg)


class InvalidType(ValidationWarning):
def __init__(self, key: str, invalid_type: type, valid_type: type, path: list[str | int] | None = None):
self.path = path
self.msg = f"Got invalid type for '{key}'. Expected '{valid_type}', got '{invalid_type}'"
return super().__init__(self.msg)


class InvalidValue(ValidationWarning):
def __init__(self, key: str, value: Any, valid_values: tuple, path: list[str | int] | None = None):
self.path = path
self.msg = f"Got invalid value for '{key}'. Expected '{valid_values}', got '{value}'"
return super().__init__(self.msg)


class InvalidLength(ValidationWarning):
def __init__(self, key: str, length: int, valid_length: str, path: list[str | int] | None = None):
self.path = path

self.msg = f"Got invalid length for '{key}'. Expected {valid_length}, got '{length}'"
return super().__init__(self.msg)


class InvalidPattern(ValidationWarning):
def __init__(self, key: str, value: Any, pattern: str, path: list[str | int] | None = None):
self.path = path
msg = f"Got invalid key '{msg}'"
return super().__init__(msg)


class StrConvert(GroupedMetadata):
def __init__(
self,
convert_types: tuple[type] | None = None,
to_lower: bool = False,
):
self.convert_types = convert_types
self.to_lower = to_lower

def __iter__(self) -> Iterator[BaseMetadata]:
if self.to_lower:
yield BeforeValidator(lambda v: v.lower() if isinstance(v, str) else v)
if self.convert_types is not None:
yield BeforeValidator(lambda v: str(v) if isinstance(v, self.convert_types) else v)


class IntConvert(GroupedMetadata):
def __init__(
self,
convert_types: tuple[type] | None = None,
):
self.convert_types = convert_types

def __iter__(self) -> Iterator[BaseMetadata]:
if self.convert_types is not None:
yield BeforeValidator(lambda v: int(v) if isinstance(v, self.convert_types) else v)


class BoolConvert(GroupedMetadata):
def __init__(
self,
convert_types: tuple[type] | None = None,
):
self.convert_types = convert_types

def __iter__(self) -> Iterator[BaseMetadata]:
if self.convert_types is not None:
yield BeforeValidator(lambda v: bool(v) if isinstance(v, self.convert_types) else v)
self.msg = f"Got invalid value for '{key}'. Expected a value matching the pattern '{pattern}', got '{value}'"
return super().__init__(self.msg)
Loading

0 comments on commit d815ba2

Please sign in to comment.