diff --git a/tests/integration-tests/configs/scaling_stress_test.yaml b/tests/integration-tests/configs/scaling_stress_test.yaml index 48509191d8..546dd2a84d 100644 --- a/tests/integration-tests/configs/scaling_stress_test.yaml +++ b/tests/integration-tests/configs/scaling_stress_test.yaml @@ -2,7 +2,13 @@ test-suites: performance_tests: test_scaling.py::test_scaling_stress_test: dimensions: - - regions: ["us-east-1"] - instances: ["c5.large"] - oss: ["alinux2"] - schedulers: ["slurm"] + - regions: [ "us-east-1" ] + instances: [ "c5.large" ] + oss: [ "alinux2" ] + schedulers: [ "slurm" ] + test_scaling.py::test_static_scaling_stress_test: + dimensions: + - regions: [ "us-east-1" ] + instances: [ "c5.large" ] + oss: [ "alinux2" ] + schedulers: [ "slurm" ] diff --git a/tests/integration-tests/tests/performance_tests/test_scaling/test_scaling_stress_test/scaling_test_config.yaml b/tests/integration-tests/tests/common/scaling/scaling_test_config.yaml similarity index 100% rename from tests/integration-tests/tests/performance_tests/test_scaling/test_scaling_stress_test/scaling_test_config.yaml rename to tests/integration-tests/tests/common/scaling/scaling_test_config.yaml diff --git a/tests/integration-tests/tests/performance_tests/test_scaling/test_scaling_stress_test/scaling_test_config_schema.yaml b/tests/integration-tests/tests/common/scaling/scaling_test_config_schema.yaml similarity index 100% rename from tests/integration-tests/tests/performance_tests/test_scaling/test_scaling_stress_test/scaling_test_config_schema.yaml rename to tests/integration-tests/tests/common/scaling/scaling_test_config_schema.yaml diff --git a/tests/integration-tests/tests/common/scaling_common.py b/tests/integration-tests/tests/common/scaling_common.py index 4440ff2b8f..df751d6cea 100644 --- a/tests/integration-tests/tests/common/scaling_common.py +++ b/tests/integration-tests/tests/common/scaling_common.py @@ -11,11 +11,14 @@ import datetime import json import logging +import os import pathlib import time import boto3 +import yaml from framework.metrics_publisher import Metric, MetricsPublisher +from pykwalify.core import Core from remote_command_executor import RemoteCommandExecutor from retrying import RetryError, retry from time_utils import seconds @@ -24,6 +27,27 @@ SCALING_COMMON_DATADIR = pathlib.Path(__file__).parent / "scaling" +def validate_and_get_scaling_test_config(scaling_test_config_file): + """Get and validate the scaling test parameters""" + scaling_test_schema = str(SCALING_COMMON_DATADIR / "scaling_test_config_schema.yaml") + logging.info("Parsing scaling test config file: %s", scaling_test_config_file) + with open(scaling_test_config_file) as file: + scaling_test_config = yaml.safe_load(file) + logging.info(scaling_test_config) + + # Validate scaling test config file against defined schema + logging.info("Validating config file against the schema") + try: + c = Core(source_data=scaling_test_config, schema_files=[scaling_test_schema]) + c.validate(raise_exception=True) + except Exception as e: + logging.error("Failed when validating schema: %s", e) + logging.info("Dumping rendered template:\n%s", yaml.dump(scaling_test_config, default_flow_style=False)) + raise + + return scaling_test_config + + def retry_if_scaling_target_not_reached( ec2_capacity_time_series, compute_nodes_time_series, diff --git a/tests/integration-tests/tests/performance_tests/test_scaling.py b/tests/integration-tests/tests/performance_tests/test_scaling.py index cade1b0e96..848aab34d9 100644 --- a/tests/integration-tests/tests/performance_tests/test_scaling.py +++ b/tests/integration-tests/tests/performance_tests/test_scaling.py @@ -4,16 +4,14 @@ import time import pytest -import yaml from assertpy import assert_that, soft_assertions from benchmarks.common.metrics_reporter import produce_benchmark_metrics_report -from pykwalify.core import Core from remote_command_executor import RemoteCommandExecutor from time_utils import minutes from utils import disable_protected_mode from tests.common.assertions import assert_no_msg_in_logs -from tests.common.scaling_common import get_scaling_metrics +from tests.common.scaling_common import get_scaling_metrics, validate_and_get_scaling_test_config @pytest.mark.parametrize( @@ -80,7 +78,7 @@ def _get_scaling_time(capacity_time_series: list, timestamps: list, scaling_targ return scaling_target_time, int((scaling_target_time - start_time).total_seconds()) except ValueError as e: logging.error("Cluster did not scale up to %d nodes", scaling_target) - raise e + raise Exception("Cluster could not scale up to target nodes within the max monitoring time") from e @pytest.mark.usefixtures("scheduler") @@ -111,7 +109,8 @@ def test_scaling_stress_test( - A Metrics Image showing the scale up and scale down using a linear graph with annotations """ # Get the scaling parameters - scaling_test_config = _validate_and_get_scaling_test_config(test_datadir, request) + scaling_test_config_file = request.config.getoption("scaling_test_config") + scaling_test_config = validate_and_get_scaling_test_config(test_datadir, scaling_test_config_file) max_monitoring_time_in_mins = scaling_test_config.get("MaxMonitoringTimeInMins") shared_headnode_storage_type = scaling_test_config.get("SharedHeadNodeStorageType") head_node_instance_type = scaling_test_config.get("HeadNodeInstanceType") @@ -157,26 +156,85 @@ def test_scaling_stress_test( time.sleep(300) -def _validate_and_get_scaling_test_config(test_datadir, request): - """Get and validate the scaling test parameters""" +@pytest.mark.usefixtures("scheduler") +@pytest.mark.parametrize("scaling_strategy", ["all-or-nothing", "best-effort"]) +def test_static_scaling_stress_test( + test_datadir, + instance, + os, + region, + request, + pcluster_config_reader, + scheduler_commands_factory, + clusters_factory, + scaling_strategy, +): + """ + The test scales up a cluster with a large number of static nodes, as opposed to scaling + up and down with dynamic nodes, by updating a cluster to use the target number of static nodes. + This test produces the same metrics and outputs as the dynamic scaling stress test. + """ + # Get the scaling parameters scaling_test_config_file = request.config.getoption("scaling_test_config") - scaling_test_schema = str(test_datadir / "scaling_test_config_schema.yaml") - logging.info("Parsing scaling test config file: %s", scaling_test_config_file) - with open(scaling_test_config_file) as file: - scaling_test_config = yaml.safe_load(file) - logging.info(scaling_test_config) + scaling_test_config = validate_and_get_scaling_test_config(test_datadir, scaling_test_config_file) + max_monitoring_time_in_mins = scaling_test_config.get("MaxMonitoringTimeInMins") + shared_headnode_storage_type = scaling_test_config.get("SharedHeadNodeStorageType") + head_node_instance_type = scaling_test_config.get("HeadNodeInstanceType") + scaling_targets = scaling_test_config.get("ScalingTargets") - # Validate scaling test config file against defined schema - logging.info("Validating config file against the schema") - try: - c = Core(source_data=scaling_test_config, schema_files=[scaling_test_schema]) - c.validate(raise_exception=True) - except Exception as e: - logging.error("Failed when validating schema: %s", e) - logging.info("Dumping rendered template:\n%s", yaml.dump(scaling_test_config, default_flow_style=False)) - raise + # Creating cluster with intended head node instance type and scaling parameters + cluster_config = pcluster_config_reader( + # Prevent nodes being set down before we start monitoring the scale down metrics + scaledown_idletime=max_monitoring_time_in_mins, + head_node_instance_type=head_node_instance_type, + shared_headnode_storage_type=shared_headnode_storage_type, + scaling_strategy=scaling_strategy, + min_cluster_size=0, + max_cluster_size=1, + output_file="downscale-pcluster.config.yaml", + ) + cluster = clusters_factory(cluster_config) + remote_command_executor = RemoteCommandExecutor(cluster) + scheduler_commands = scheduler_commands_factory(remote_command_executor) - return scaling_test_config + # Disable protected mode since bootstrap errors are likely to occur given the large cluster sizes + disable_protected_mode(remote_command_executor) + + with soft_assertions(): + for scaling_target in scaling_targets: + upscale_cluster_config = pcluster_config_reader( + # Prevent nodes being set down before we start monitoring the scale down metrics + scaledown_idletime=max_monitoring_time_in_mins, + head_node_instance_type=head_node_instance_type, + shared_headnode_storage_type=shared_headnode_storage_type, + scaling_strategy=scaling_strategy, + min_cluster_size=scaling_target, + max_cluster_size=scaling_target, + output_file=f"{scaling_target}-upscale-pcluster.config.yaml", + ) + _scale_up_and_down( + cluster, + head_node_instance_type, + instance, + max_monitoring_time_in_mins, + os, + region, + remote_command_executor, + request, + scaling_target, + scaling_strategy, + scheduler_commands, + test_datadir, + is_static=True, + upscale_cluster_config=upscale_cluster_config, + downscale_cluster_config=cluster_config, + ) + + # Make sure the RunInstances Resource Token Bucket is full before starting another scaling up + # ref https://docs.aws.amazon.com/AWSEC2/latest/APIReference/throttling.html + if scaling_target != scaling_targets[-1]: + logging.info("Waiting for the RunInstances Resource Token Bucket to refill") + time.sleep(300) def _scale_up_and_down( @@ -192,19 +250,28 @@ def _scale_up_and_down( scaling_strategy, scheduler_commands, test_datadir, + is_static=False, + upscale_cluster_config=None, + downscale_cluster_config=None, ): # Reset underlying ssh connection to prevent socket closed error remote_command_executor.reset_connection() # Make sure partitions are active cluster.start(wait_running=True) - # Submit a simple job to trigger the launch all compute nodes - scaling_job = { - # Keep job running until we explicitly cancel it and start monitoring scale down - "command": f"srun sleep {minutes(max_monitoring_time_in_mins) // 1000}", - "nodes": scaling_target, - } - job_id = scheduler_commands.submit_command_and_assert_job_accepted(scaling_job) + # Scale up cluster + if is_static: + # Update the cluster with target number of static nodes + cluster.update(str(upscale_cluster_config), force_update="true", wait=False, raise_on_error=False) + else: + # Submit a simple job to trigger the launch all compute nodes + scaling_job = { + # Keep job running until we explicitly cancel it and start monitoring scale down + "command": f"srun sleep {minutes(max_monitoring_time_in_mins) // 1000}", + "nodes": scaling_target, + } + job_id = scheduler_commands.submit_command_and_assert_job_accepted(scaling_job) + # Set start time at minute granularity (to simplify calculation and visualising on CloudWatch) start_time = _datetime_to_minute(datetime.datetime.now(tz=datetime.timezone.utc)) # Monitor the cluster during scale up @@ -216,14 +283,29 @@ def _scale_up_and_down( publish_metrics=True, target_cluster_size=scaling_target, ) + # Extract scale up duration and timestamp from the monitoring metrics collected above _, scale_up_time_ec2 = _get_scaling_time(ec2_capacity_time_series_up, timestamps, scaling_target, start_time) scaling_target_time, scale_up_time_scheduler = _get_scaling_time( compute_nodes_time_series_up, timestamps, scaling_target, start_time ) - # Cancel the running job and scale down the cluster using the update-compute-fleet command - scheduler_commands.cancel_job(job_id) - cluster.stop() + + # Scale down cluster + if is_static: + # Check that a simple job succeeds + scaling_job = { + "command": "srun sleep 10", + "nodes": scaling_target, + } + scheduler_commands.submit_command_and_assert_job_succeeded(scaling_job) + + # Scale down the cluster + cluster.update(str(downscale_cluster_config), force_update="true", wait=False, raise_on_error=False) + else: + # Cancel the running job and scale down the cluster using the update-compute-fleet command + scheduler_commands.cancel_job(job_id) + cluster.stop() + # Monitor the cluster during scale down scale_down_start_timestamp = _datetime_to_minute(datetime.datetime.now(tz=datetime.timezone.utc)) ec2_capacity_time_series_down, compute_nodes_time_series_down, timestamps, end_time = get_scaling_metrics( diff --git a/tests/integration-tests/tests/performance_tests/test_scaling/test_static_scaling_stress_test/pcluster.config.yaml b/tests/integration-tests/tests/performance_tests/test_scaling/test_static_scaling_stress_test/pcluster.config.yaml new file mode 100644 index 0000000000..0151991381 --- /dev/null +++ b/tests/integration-tests/tests/performance_tests/test_scaling/test_static_scaling_stress_test/pcluster.config.yaml @@ -0,0 +1,26 @@ +Image: + Os: {{ os }} +HeadNode: + {% if shared_headnode_storage_type %} + SharedStorageType: {{ shared_headnode_storage_type }} + {% endif %} + InstanceType: {{ head_node_instance_type }} + Networking: + SubnetId: {{ public_subnet_id }} + Ssh: + KeyName: {{ key_name }} +Scheduling: + Scheduler: {{ scheduler }} + ScalingStrategy: {{ scaling_strategy }} + SlurmSettings: + ScaledownIdletime: {{ scaledown_idletime }} + SlurmQueues: + - Name: queue-0 + ComputeResources: + - Name: compute-resource-0 + InstanceType: {{ instance }} + MinCount: {{ min_cluster_size }} + MaxCount: {{ max_cluster_size }} + Networking: + SubnetIds: + - {{ private_subnet_id }} diff --git a/tests/integration-tests/tests/performance_tests/test_scaling/test_static_scaling_stress_test/results/baseline.json b/tests/integration-tests/tests/performance_tests/test_scaling/test_static_scaling_stress_test/results/baseline.json new file mode 100644 index 0000000000..32c08e0657 --- /dev/null +++ b/tests/integration-tests/tests/performance_tests/test_scaling/test_static_scaling_stress_test/results/baseline.json @@ -0,0 +1,52 @@ +{ + "c5.large": { + "1000": { + "best-effort": { + "scale_up_time_ec2": 360, + "scale_up_time_scheduler": 600, + "scale_down_time": 300 + }, + "all-or-nothing": { + "scale_up_time_ec2": 360, + "scale_up_time_scheduler": 600, + "scale_down_time": 300 + } + }, + "2000": { + "best-effort": { + "scale_up_time_ec2": 420, + "scale_up_time_scheduler": 720, + "scale_down_time": 360 + }, + "all-or-nothing": { + "scale_up_time_ec2": 420, + "scale_up_time_scheduler": 720, + "scale_down_time": 360 + } + }, + "3000": { + "best-effort": { + "scale_up_time_ec2": 720, + "scale_up_time_scheduler": 1020, + "scale_down_time": 480 + }, + "all-or-nothing": { + "scale_up_time_ec2": 720, + "scale_up_time_scheduler": 1020, + "scale_down_time": 480 + } + }, + "4000": { + "best-effort": { + "scale_up_time_ec2": 900, + "scale_up_time_scheduler": 1200, + "scale_down_time": 540 + }, + "all-or-nothing": { + "scale_up_time_ec2": 900, + "scale_up_time_scheduler": 1200, + "scale_down_time": 540 + } + } + } +}