Skip to content

Commit

Permalink
feat(anta.models): propagate error in AntaCommand object
Browse files Browse the repository at this point in the history
  • Loading branch information
mtache committed Jul 5, 2023
1 parent 8058801 commit 908bcc1
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 30 deletions.
10 changes: 6 additions & 4 deletions anta/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from anta.models import AntaCommand
from anta.result_manager.models import TestResult
from anta.tools.misc import exc_to_str

# TODO - should probably use mypy Awaitable in some places rather than this everywhere - @gmuloc
F = TypeVar("F", bound=Callable[..., Any])
Expand Down Expand Up @@ -86,12 +87,13 @@ async def wrapper(*args: Any, **kwargs: Dict[str, Any]) -> TestResult:

await anta_test.device.collect(command=command)

command_output = cast(Dict[str, Any], command.output)

if "vrfs" not in command_output:
if command.failed is not None:
anta_test.result.is_error(f'{command.command}: {exc_to_str(command.failed)}')
return anta_test.result
if "vrfs" not in command.json_output:
anta_test.result.is_skipped(f"no BGP configuration for {family} on this device")
return anta_test.result
if len(bgp_vrfs := command_output["vrfs"]) == 0 or len(bgp_vrfs["default"]["peers"]) == 0:
if len(bgp_vrfs := command.json_output["vrfs"]) == 0 or len(bgp_vrfs["default"]["peers"]) == 0:
# No VRF
anta_test.result.is_skipped(f"no {family} peer on this device")
return anta_test.result
Expand Down
27 changes: 13 additions & 14 deletions anta/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
import asyncio
import logging
import httpx
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Dict, Iterator, List, Literal, Optional, Tuple, Union
Expand Down Expand Up @@ -166,11 +167,6 @@ class AsyncEOSDevice(AntaDevice):
tags: List of tags for this device
"""

# Hardware model definition in show version
HW_MODEL_KEY: str = "modelName"
# Maximum concurrent SSH connections opened with EOS
MAX_SSH_CONNECTIONS: int = 10

def __init__( # pylint: disable=R0913
self,
host: str,
Expand Down Expand Up @@ -269,16 +265,15 @@ async def collect(self, command: AntaCommand) -> None:
logger.debug(f"{self.name}: {command}")

except EapiCommandError as e:
# TODO @mtache - propagate the exception in some AntaCommand attribute
logger.error(f"Command '{command.command}' failed on {self.name}: {e.errmsg}")
logger.debug(command)
command.failed = e
except (HTTPError, ConnectError) as e:
# TODO @mtache - propagate the exception in some AntaCommand attribute
logger.error(f"Cannot connect to device {self.name}: {exc_to_str(e)}")
command.failed = e
except Exception as e: # pylint: disable=broad-exception-caught
# TODO @mtache - propagate the exception in some AntaCommand attribute
logger.critical(f"Exception raised while collecting command '{command.command}' on device {self.name} - {exc_to_str(e)}")
logger.debug(tb_to_str(e))
command.failed = e
logger.debug(command)

async def refresh(self) -> None:
Expand All @@ -290,20 +285,24 @@ async def refresh(self) -> None:
- established: When a command execution succeeds
- hw_model: The hardware model of the device
"""
# Refresh command
COMMAND: str = 'show version'
# Hardware model definition in show version
HW_MODEL_KEY: str = "modelName"
logger.debug(f"Refreshing device {self.name}")
self.is_online = await self._session.check_connection()
if self.is_online:
try:
response = await self._session.cli(command="show version")
response = await self._session.cli(command=COMMAND)
except EapiCommandError as e:
logger.warning(f"Cannot get hardware information from device {self.name}: {e.errmsg}")
except (HTTPError, ConnectError) as e:
logger.warning(f"Cannot get hardware information from device {self.name}: {type(e).__name__}{'' if not str(e) else f' ({str(e)})'}")
logger.warning(f"Cannot get hardware information from device {self.name}: {exc_to_str(e)}")
else:
if self.HW_MODEL_KEY in response:
self.hw_model = response[self.HW_MODEL_KEY]
if HW_MODEL_KEY in response:
self.hw_model = response[HW_MODEL_KEY]
else:
logger.warning(f"Cannot get hardware information from device {self.name}: cannot parse 'show version'")
logger.warning(f"Cannot get hardware information from device {self.name}: cannot parse '{COMMAND}'")
else:
logger.warning(f"Could not connect to device {self.name}: cannot open eAPI port")
self.established = bool(self.is_online and self.hw_model)
Expand Down
38 changes: 26 additions & 12 deletions anta/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,18 @@ class AntaCommand(BaseModel):
ofmt: eAPI output - json or text - default is json
output: collected output either dict for json or str for text
template: AntaTemplate object used to render this command
failed: If the command execution fails, the Exception object is stored in this field
"""
class Config:
# This is required if we want to keep an Exception object in the failed field
arbitrary_types_allowed = True

command: str
version: Union[int, Literal['latest']] = 'latest'
ofmt: Literal['json', 'text'] = 'json'
output: Optional[Union[Dict[str, Any], str]]
template: Optional[AntaTemplate]
failed: Optional[Exception] = None

@property
def json_output(self) -> Dict[str, Any]:
Expand Down Expand Up @@ -177,6 +182,14 @@ def all_data_collected(self) -> bool:
"""returns True if output is populated for every command"""
return all(command.output is not None for command in self.instance_commands)

def get_failed_commands(self) -> List[AntaCommand]:
"""returns a list of all the commands that have a populated failed field"""
errors = []
for command in self.instance_commands:
if command.failed is not None:
errors.append(command)
return errors

def __init_subclass__(cls) -> None:
"""
Verify that the mandatory class attributes are defined
Expand Down Expand Up @@ -214,14 +227,14 @@ async def wrapper(
**kwargs: dict[str, Any],
) -> TestResult:
"""
Wraps the test function and implement (in this order):
1. Instantiate the command outputs if `eos_data` is provided
2. Collect missing command outputs from the device
3. Run the test function
4. Catches and set the result if the test function raises an exception
Returns:
TestResult: self.result, populated with the correct exit status
Wraps the test function and implement (in this order):
1. Instantiate the command outputs if `eos_data` is provided
2. Collect missing command outputs from the device
3. Run the test function
4. Catches and set the result if the test function raises an exception
Returns:
TestResult: self.result, populated with the correct exit status
"""
if self.result.result != "unset":
return self.result
Expand All @@ -230,18 +243,19 @@ async def wrapper(

# Data
if eos_data is not None:
logger.debug("Test initialized with input data")
self.save_commands_data(eos_data)
logger.debug(f"Test {self.name} initialized with input data {eos_data}")

# No test data is present, try to collect
# If some data is missing, try to collect
if not self.all_data_collected():
await self.collect()
if self.result.result != "unset":
return self.result

try:
if not self.all_data_collected():
raise ValueError("Some command output is missing")
if cmds := self.get_failed_commands():
self.result.is_error('\n'.join([f'{cmd.command}: {exc_to_str(cmd.failed)}' for cmd in cmds]))
return self.result
logger.debug(f"Test {self.name} on device {self.device.name}: running test")
function(self, **kwargs)
except Exception as e: # pylint: disable=broad-exception-caught
Expand Down

0 comments on commit 908bcc1

Please sign in to comment.