diff --git a/anta/catalog.py b/anta/catalog.py index 7ed4bc718..feb7318ee 100644 --- a/anta/catalog.py +++ b/anta/catalog.py @@ -285,7 +285,6 @@ def __init__( self.tag_to_tests: defaultdict[str | None, set[AntaTestDefinition]] = defaultdict(set) self.tests_without_tags: set[AntaTestDefinition] = set() self.indexes_built: bool = False - self.final_tests_count: int = 0 @property def filename(self) -> Path | None: diff --git a/anta/cli/nrfu/__init__.py b/anta/cli/nrfu/__init__.py index 85a667127..a85277102 100644 --- a/anta/cli/nrfu/__init__.py +++ b/anta/cli/nrfu/__init__.py @@ -103,21 +103,12 @@ def parse_args(self, ctx: click.Context, args: list[str]) -> list[str]: is_flag=True, default=False, ) -@click.option( - "--max-concurrency", - help="Maximum number of tests to run concurrently.", - type=int, - show_envvar=True, - default=50000, - show_default=True, -) # pylint: disable=too-many-arguments def nrfu( ctx: click.Context, inventory: AntaInventory, tags: set[str] | None, catalog: AntaCatalog, - max_concurrency: int, device: tuple[str], test: tuple[str], hide: tuple[str], @@ -145,7 +136,6 @@ def nrfu( ctx.obj["device"] = device ctx.obj["test"] = test ctx.obj["dry_run"] = dry_run - ctx.obj["max_concurrency"] = max_concurrency # Invoke `anta nrfu table` if no command is passed if not ctx.invoked_subcommand: diff --git a/anta/cli/nrfu/utils.py b/anta/cli/nrfu/utils.py index 5a0ff4df9..cfc2e1ed1 100644 --- a/anta/cli/nrfu/utils.py +++ b/anta/cli/nrfu/utils.py @@ -43,7 +43,6 @@ def run_tests(ctx: click.Context) -> None: device = nrfu_ctx_params["device"] or None test = nrfu_ctx_params["test"] or None dry_run = nrfu_ctx_params["dry_run"] - max_concurrency = nrfu_ctx_params["max_concurrency"] catalog = ctx.obj["catalog"] inventory = ctx.obj["inventory"] @@ -59,7 +58,6 @@ def run_tests(ctx: click.Context) -> None: devices=set(device) if device else None, tests=set(test) if test else None, dry_run=dry_run, - max_concurrency=max_concurrency, ) ) if dry_run: diff --git a/anta/runner.py b/anta/runner.py index 890fe83ad..0c3eaf921 100644 --- a/anta/runner.py +++ b/anta/runner.py @@ -30,6 +30,30 @@ logger = logging.getLogger(__name__) DEFAULT_NOFILE = 16384 +"""Default number of open file descriptors for the ANTA process.""" +DEFAULT_MAX_CONCURRENCY = 10000 +"""Default maximum number of tests to run concurrently.""" +DEFAULT_MAX_CONNECTIONS = 100 +"""Default underlying HTTPX client maximum number of connections per device.""" + + +def adjust_max_concurrency() -> int: + """Adjust the maximum number of tests (coroutines) to run concurrently. + + The limit is set to the value of the ANTA_MAX_CONCURRENCY environment variable. + + If the `ANTA_MAX_CONCURRENCY` environment variable is not set or is invalid, `DEFAULT_MAX_CONCURRENCY` is used. + + Returns + ------- + The maximum number of tests to run concurrently. + """ + try: + max_concurrency = int(os.environ.get("ANTA_MAX_CONCURRENCY", DEFAULT_MAX_CONCURRENCY)) + except ValueError as exception: + logger.warning("The ANTA_MAX_CONCURRENCY environment variable value is invalid: %s\nDefault to %s.", exc_to_str(exception), DEFAULT_MAX_CONCURRENCY) + max_concurrency = DEFAULT_MAX_CONCURRENCY + return max_concurrency def adjust_rlimit_nofile() -> tuple[int, int]: @@ -41,7 +65,7 @@ def adjust_rlimit_nofile() -> tuple[int, int]: Returns ------- - tuple[int, int]: The new soft and hard limits for open file descriptors. + The new soft and hard limits for open file descriptors. """ try: nofile = int(os.environ.get("ANTA_NOFILE", DEFAULT_NOFILE)) @@ -76,21 +100,21 @@ def log_cache_statistics(devices: list[AntaDevice]) -> None: logger.info("Caching is not enabled on %s", device.name) -async def run_tests(tests_generator: AsyncGenerator[Coroutine[Any, Any, TestResult], None], limit: int) -> AsyncGenerator[TestResult, None]: +async def run(tests_generator: AsyncGenerator[Coroutine[Any, Any, TestResult], None], limit: int) -> AsyncGenerator[TestResult, None]: """Run tests with a concurrency limit. This function takes an asynchronous generator of test coroutines and runs them with a limit on the number of concurrent tests. It yields test results as each test completes. - Args: - ---- + Parameters + ---------- tests_generator: An asynchronous generator that yields test coroutines. limit: The maximum number of concurrent tests to run. Yields ------ - TestResult: The result of each completed test. + The result of each completed test. """ # NOTE: The `aiter` built-in function is not available in Python 3.9 aws = tests_generator.__aiter__() # pylint: disable=unnecessary-dunder-call @@ -138,7 +162,7 @@ async def setup_inventory(inventory: AntaInventory, tags: set[str] | None, devic Returns ------- - AntaInventory | None: The filtered inventory or None if there are no devices to run tests on. + The filtered AntaInventory or None if there are no devices to run tests on. """ if len(inventory) == 0: logger.info("The inventory is empty, exiting") @@ -163,10 +187,10 @@ async def setup_inventory(inventory: AntaInventory, tags: set[str] | None, devic return selected_inventory -def prepare_tests( +def setup_tests( inventory: AntaInventory, catalog: AntaCatalog, tests: set[str] | None, tags: set[str] | None -) -> defaultdict[AntaDevice, set[AntaTestDefinition]] | None: - """Prepare the tests to run. +) -> tuple[int, defaultdict[AntaDevice, set[AntaTestDefinition]] | None]: + """Set up the tests for the ANTA run. Parameters ---------- @@ -177,15 +201,17 @@ def prepare_tests( Returns ------- - A mapping of devices to the tests to run or None if there are no tests to run. + The total number of tests and a mapping of devices to the tests to run or None if there are no tests to run. """ # Build indexes for the catalog. If `tests` is set, filter the indexes based on these tests catalog.build_indexes(filtered_tests=tests) + total_tests = 0 + # Using a set to avoid inserting duplicate tests device_to_tests: defaultdict[AntaDevice, set[AntaTestDefinition]] = defaultdict(set) - # Create AntaTestRunner tuples from the tags + # Create the mapping of devices to the tests to run for device in inventory.devices: if tags: # If there are CLI tags, only execute tests with matching tags @@ -197,19 +223,19 @@ def prepare_tests( # Then add the tests with matching tags from device tags device_to_tests[device].update(catalog.get_tests_by_tags(device.tags)) - catalog.final_tests_count += len(device_to_tests[device]) + total_tests += len(device_to_tests[device]) - if catalog.final_tests_count == 0: + if total_tests == 0: msg = ( f"There are no tests{f' matching the tags {tags} ' if tags else ' '}to run in the current test catalog and device inventory, please verify your inputs." ) logger.warning(msg) - return None + return total_tests, None - return device_to_tests + return total_tests, device_to_tests -async def generate_tests(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]]) -> AsyncGenerator[Coroutine[Any, Any, TestResult], None]: +async def test_generator(selected_tests: defaultdict[AntaDevice, set[AntaTestDefinition]]) -> AsyncGenerator[Coroutine[Any, Any, TestResult], None]: """Generate the coroutines for the ANTA run. It creates an async generator of coroutines which are created by the `test` method of the AntaTest instances. Each coroutine is a test to run. @@ -220,7 +246,7 @@ async def generate_tests(selected_tests: defaultdict[AntaDevice, set[AntaTestDef Yields ------ - Coroutine[Any, Any, TestResult]: The coroutine (test) to run. + The coroutine (test) to run. """ for device, test_definitions in selected_tests.items(): for test in test_definitions: @@ -246,7 +272,6 @@ async def main( # noqa: PLR0913 manager: ResultManager, inventory: AntaInventory, catalog: AntaCatalog, - max_concurrency: int = 50000, devices: set[str] | None = None, tests: set[str] | None = None, tags: set[str] | None = None, @@ -265,7 +290,6 @@ async def main( # noqa: PLR0913 manager: ResultManager object to populate with the test results. inventory: AntaInventory object that includes the device(s). catalog: AntaCatalog object that includes the list of tests. - max_concurrency: Maximum number of tests to run concurrently. Default is 50000. devices: Devices on which to run tests. None means all devices. These may come from the `--device / -d` CLI option in NRFU. tests: Tests to run against devices. None means all tests. These may come from the `--test / -t` CLI option in NRFU. tags: Tags to filter devices from the inventory. These may come from the `--tags` CLI option in NRFU. @@ -275,6 +299,9 @@ async def main( # noqa: PLR0913 # Adjust the maximum number of open file descriptors for the ANTA process limits = adjust_rlimit_nofile() + # Adjust the maximum number of tests to run concurrently + max_concurrency = adjust_max_concurrency() + if not catalog.tests: logger.info("The list of tests is empty, exiting") return @@ -286,43 +313,45 @@ async def main( # noqa: PLR0913 return with Catchtime(logger=logger, message="Preparing the tests"): - selected_tests = prepare_tests(selected_inventory, catalog, tests, tags) - if selected_tests is None: + total_tests, selected_tests = setup_tests(selected_inventory, catalog, tests, tags) + if total_tests == 0 or selected_tests is None: return + generator = test_generator(selected_tests) + run_info = ( "------------------------------------ ANTA NRFU Run Information -------------------------------------\n" f"Number of devices: {len(inventory)} ({len(selected_inventory)} established)\n" - f"Total number of selected tests: {catalog.final_tests_count}\n" + f"Total number of selected tests: {total_tests}\n" f"Maximum number of tests to run concurrently: {max_concurrency}\n" + f"Maximum number of connections per device: {DEFAULT_MAX_CONNECTIONS}\n" f"Maximum number of open file descriptors for the current ANTA process: {limits[0]}\n" "----------------------------------------------------------------------------------------------------" ) logger.info(run_info) - if catalog.final_tests_count > max_concurrency: + total_potential_connections = len(selected_inventory) * DEFAULT_MAX_CONNECTIONS + + if total_tests > max_concurrency: logger.warning( "The total number of tests is higher than the maximum number of tests to run concurrently.\n" "ANTA will be throttled to run at the maximum number of tests to run concurrently to ensure system stability.\n" "Please consult the ANTA FAQ." ) - if max_concurrency > limits[0]: + if total_potential_connections > limits[0]: logger.warning( - "The maximum number of tests to run concurrently is higher than the open file descriptors limit for this ANTA process.\n" + "The total potential connections to devices is higher than the open file descriptors limit for this ANTA process.\n" "Errors may occur while running the tests.\n" "Please consult the ANTA FAQ." ) - tests_generator = generate_tests(selected_tests) - total_tests = catalog.final_tests_count - # Cleanup no longer needed objects before running the tests del inventory, catalog, selected_tests if dry_run: logger.info("Dry-run mode, exiting before running the tests.") - async for test in tests_generator: + async for test in generator: test.close() return @@ -330,7 +359,7 @@ async def main( # noqa: PLR0913 AntaTest.nrfu_task = AntaTest.progress.add_task("Running NRFU Tests...", total=total_tests) with Catchtime(logger=logger, message="Running ANTA tests"): - async for result in run_tests(tests_generator, limit=max_concurrency): + async for result in run(generator, limit=max_concurrency): manager.add(result) log_cache_statistics(selected_inventory.devices)