Skip to content

Commit

Permalink
Remove CLI option
Browse files Browse the repository at this point in the history
  • Loading branch information
carl-baillargeon committed Aug 28, 2024
1 parent ccb02dc commit 4879120
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 43 deletions.
1 change: 0 additions & 1 deletion anta/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ def __init__(
self.tag_to_tests: defaultdict[str | None, set[AntaTestDefinition]] = defaultdict(set)
self.tests_without_tags: set[AntaTestDefinition] = set()
self.indexes_built: bool = False
self.final_tests_count: int = 0

@property
def filename(self) -> Path | None:
Expand Down
10 changes: 0 additions & 10 deletions anta/cli/nrfu/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,12 @@ def parse_args(self, ctx: click.Context, args: list[str]) -> list[str]:
is_flag=True,
default=False,
)
@click.option(
"--max-concurrency",
help="Maximum number of tests to run concurrently.",
type=int,
show_envvar=True,
default=50000,
show_default=True,
)
# pylint: disable=too-many-arguments
def nrfu(
ctx: click.Context,
inventory: AntaInventory,
tags: set[str] | None,
catalog: AntaCatalog,
max_concurrency: int,
device: tuple[str],
test: tuple[str],
hide: tuple[str],
Expand Down Expand Up @@ -145,7 +136,6 @@ def nrfu(
ctx.obj["device"] = device
ctx.obj["test"] = test
ctx.obj["dry_run"] = dry_run
ctx.obj["max_concurrency"] = max_concurrency

# Invoke `anta nrfu table` if no command is passed
if not ctx.invoked_subcommand:
Expand Down
2 changes: 0 additions & 2 deletions anta/cli/nrfu/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def run_tests(ctx: click.Context) -> None:
device = nrfu_ctx_params["device"] or None
test = nrfu_ctx_params["test"] or None
dry_run = nrfu_ctx_params["dry_run"]
max_concurrency = nrfu_ctx_params["max_concurrency"]

catalog = ctx.obj["catalog"]
inventory = ctx.obj["inventory"]
Expand All @@ -59,7 +58,6 @@ def run_tests(ctx: click.Context) -> None:
devices=set(device) if device else None,
tests=set(test) if test else None,
dry_run=dry_run,
max_concurrency=max_concurrency,
)
)
if dry_run:
Expand Down
89 changes: 59 additions & 30 deletions anta/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,30 @@
logger = logging.getLogger(__name__)

DEFAULT_NOFILE = 16384
"""Default number of open file descriptors for the ANTA process."""
DEFAULT_MAX_CONCURRENCY = 10000
"""Default maximum number of tests to run concurrently."""
DEFAULT_MAX_CONNECTIONS = 100
"""Default underlying HTTPX client maximum number of connections per device."""


def adjust_max_concurrency() -> int:
"""Adjust the maximum number of tests (coroutines) to run concurrently.
The limit is set to the value of the ANTA_MAX_CONCURRENCY environment variable.
If the `ANTA_MAX_CONCURRENCY` environment variable is not set or is invalid, `DEFAULT_MAX_CONCURRENCY` is used.
Returns
-------
The maximum number of tests to run concurrently.
"""
try:
max_concurrency = int(os.environ.get("ANTA_MAX_CONCURRENCY", DEFAULT_MAX_CONCURRENCY))
except ValueError as exception:
logger.warning("The ANTA_MAX_CONCURRENCY environment variable value is invalid: %s\nDefault to %s.", exc_to_str(exception), DEFAULT_MAX_CONCURRENCY)
max_concurrency = DEFAULT_MAX_CONCURRENCY
return max_concurrency


def adjust_rlimit_nofile() -> tuple[int, int]:
Expand All @@ -41,7 +65,7 @@ def adjust_rlimit_nofile() -> tuple[int, int]:
Returns
-------
tuple[int, int]: The new soft and hard limits for open file descriptors.
The new soft and hard limits for open file descriptors.
"""
try:
nofile = int(os.environ.get("ANTA_NOFILE", DEFAULT_NOFILE))
Expand Down Expand Up @@ -76,21 +100,21 @@ def log_cache_statistics(devices: list[AntaDevice]) -> None:
logger.info("Caching is not enabled on %s", device.name)


async def run_tests(tests_generator: AsyncGenerator[Coroutine[Any, Any, TestResult], None], limit: int) -> AsyncGenerator[TestResult, None]:
async def run(tests_generator: AsyncGenerator[Coroutine[Any, Any, TestResult], None], limit: int) -> AsyncGenerator[TestResult, None]:
"""Run tests with a concurrency limit.
This function takes an asynchronous generator of test coroutines and runs them
with a limit on the number of concurrent tests. It yields test results as each
test completes.
Args:
----
Parameters
----------
tests_generator: An asynchronous generator that yields test coroutines.
limit: The maximum number of concurrent tests to run.
Yields
------
TestResult: The result of each completed test.
The result of each completed test.
"""
# NOTE: The `aiter` built-in function is not available in Python 3.9
aws = tests_generator.__aiter__() # pylint: disable=unnecessary-dunder-call
Expand Down Expand Up @@ -138,7 +162,7 @@ async def setup_inventory(inventory: AntaInventory, tags: set[str] | None, devic
Returns
-------
AntaInventory | None: The filtered inventory or None if there are no devices to run tests on.
The filtered AntaInventory or None if there are no devices to run tests on.
"""
if len(inventory) == 0:
logger.info("The inventory is empty, exiting")
Expand All @@ -163,10 +187,10 @@ async def setup_inventory(inventory: AntaInventory, tags: set[str] | None, devic
return selected_inventory


def prepare_tests(
def setup_tests(
inventory: AntaInventory, catalog: AntaCatalog, tests: set[str] | None, tags: set[str] | None
) -> defaultdict[AntaDevice, set[AntaTestDefinition]] | None:
"""Prepare the tests to run.
) -> tuple[int, defaultdict[AntaDevice, set[AntaTestDefinition]] | None]:
"""Set up the tests for the ANTA run.
Parameters
----------
Expand All @@ -177,15 +201,17 @@ def prepare_tests(
Returns
-------
A mapping of devices to the tests to run or None if there are no tests to run.
The total number of tests and a mapping of devices to the tests to run or None if there are no tests to run.
"""
# Build indexes for the catalog. If `tests` is set, filter the indexes based on these tests
catalog.build_indexes(filtered_tests=tests)

total_tests = 0

# Using a set to avoid inserting duplicate tests
device_to_tests: defaultdict[AntaDevice, set[AntaTestDefinition]] = defaultdict(set)

# Create AntaTestRunner tuples from the tags
# Create the mapping of devices to the tests to run
for device in inventory.devices:
if tags:
# If there are CLI tags, only execute tests with matching tags
Expand All @@ -197,19 +223,19 @@ def prepare_tests(
# Then add the tests with matching tags from device tags
device_to_tests[device].update(catalog.get_tests_by_tags(device.tags))

catalog.final_tests_count += len(device_to_tests[device])
total_tests += len(device_to_tests[device])

if catalog.final_tests_count == 0:
if total_tests == 0:
msg = (
f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current test catalog and device inventory, please verify your inputs."
)
logger.warning(msg)
return None
return total_tests, None

return device_to_tests
return total_tests, device_to_tests


async def generate_tests(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]]) -> AsyncGenerator[Coroutine[Any, Any, TestResult], None]:
async def test_generator(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]]) -> AsyncGenerator[Coroutine[Any, Any, TestResult], None]:
"""Generate the coroutines for the ANTA run.
It creates an async generator of coroutines which are created by the `test` method of the AntaTest instances. Each coroutine is a test to run.
Expand All @@ -220,7 +246,7 @@ async def generate_tests(selected_tests: defaultdict[AntaDevice, set[AntaTestDef
Yields
------
Coroutine[Any, Any, TestResult]: The coroutine (test) to run.
The coroutine (test) to run.
"""
for device, test_definitions in selected_tests.items():
for test in test_definitions:
Expand All @@ -246,7 +272,6 @@ async def main( # noqa: PLR0913
manager: ResultManager,
inventory: AntaInventory,
catalog: AntaCatalog,
max_concurrency: int = 50000,
devices: set[str] | None = None,
tests: set[str] | None = None,
tags: set[str] | None = None,
Expand All @@ -265,7 +290,6 @@ async def main( # noqa: PLR0913
manager: ResultManager object to populate with the test results.
inventory: AntaInventory object that includes the device(s).
catalog: AntaCatalog object that includes the list of tests.
max_concurrency: Maximum number of tests to run concurrently. Default is 50000.
devices: Devices on which to run tests. None means all devices. These may come from the `--device / -d` CLI option in NRFU.
tests: Tests to run against devices. None means all tests. These may come from the `--test / -t` CLI option in NRFU.
tags: Tags to filter devices from the inventory. These may come from the `--tags` CLI option in NRFU.
Expand All @@ -275,6 +299,9 @@ async def main( # noqa: PLR0913
# Adjust the maximum number of open file descriptors for the ANTA process
limits = adjust_rlimit_nofile()

# Adjust the maximum number of tests to run concurrently
max_concurrency = adjust_max_concurrency()

if not catalog.tests:
logger.info("The list of tests is empty, exiting")
return
Expand All @@ -286,51 +313,53 @@ async def main( # noqa: PLR0913
return

with Catchtime(logger=logger, message="Preparing the tests"):
selected_tests = prepare_tests(selected_inventory, catalog, tests, tags)
if selected_tests is None:
total_tests, selected_tests = setup_tests(selected_inventory, catalog, tests, tags)
if total_tests == 0 or selected_tests is None:
return

generator = test_generator(selected_tests)

run_info = (
"------------------------------------ ANTA NRFU Run Information -------------------------------------\n"
f"Number of devices: {len(inventory)} ({len(selected_inventory)} established)\n"
f"Total number of selected tests: {catalog.final_tests_count}\n"
f"Total number of selected tests: {total_tests}\n"
f"Maximum number of tests to run concurrently: {max_concurrency}\n"
f"Maximum number of connections per device: {DEFAULT_MAX_CONNECTIONS}\n"
f"Maximum number of open file descriptors for the current ANTA process: {limits[0]}\n"
"----------------------------------------------------------------------------------------------------"
)

logger.info(run_info)

if catalog.final_tests_count > max_concurrency:
total_potential_connections = len(selected_inventory) * DEFAULT_MAX_CONNECTIONS

if total_tests > max_concurrency:
logger.warning(
"The total number of tests is higher than the maximum number of tests to run concurrently.\n"
"ANTA will be throttled to run at the maximum number of tests to run concurrently to ensure system stability.\n"
"Please consult the ANTA FAQ."
)
if max_concurrency > limits[0]:
if total_potential_connections > limits[0]:
logger.warning(
"The maximum number of tests to run concurrently is higher than the open file descriptors limit for this ANTA process.\n"
"The total potential connections to devices is higher than the open file descriptors limit for this ANTA process.\n"
"Errors may occur while running the tests.\n"
"Please consult the ANTA FAQ."
)

tests_generator = generate_tests(selected_tests)
total_tests = catalog.final_tests_count

# Cleanup no longer needed objects before running the tests
del inventory, catalog, selected_tests

if dry_run:
logger.info("Dry-run mode, exiting before running the tests.")
async for test in tests_generator:
async for test in generator:
test.close()
return

if AntaTest.progress is not None:
AntaTest.nrfu_task = AntaTest.progress.add_task("Running NRFU Tests...", total=total_tests)

with Catchtime(logger=logger, message="Running ANTA tests"):
async for result in run_tests(tests_generator, limit=max_concurrency):
async for result in run(generator, limit=max_concurrency):
manager.add(result)

log_cache_statistics(selected_inventory.devices)

0 comments on commit 4879120

Please sign in to comment.