Skip to content

Commit

Permalink
Add static node stress test
Browse files Browse the repository at this point in the history
Signed-off-by: Judy Ng <[email protected]>
  • Loading branch information
judysng committed Mar 25, 2024
1 parent 2c693cd commit d1f6007
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 36 deletions.
14 changes: 10 additions & 4 deletions tests/integration-tests/configs/scaling_stress_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ test-suites:
performance_tests:
test_scaling.py::test_scaling_stress_test:
dimensions:
- regions: ["us-east-1"]
instances: ["c5.large"]
oss: ["alinux2"]
schedulers: ["slurm"]
- regions: [ "us-east-1" ]
instances: [ "c5.large" ]
oss: [ "alinux2" ]
schedulers: [ "slurm" ]
test_scaling.py::test_static_scaling_stress_test:
dimensions:
- regions: [ "us-east-1" ]
instances: [ "c5.large" ]
oss: [ "alinux2" ]
schedulers: [ "slurm" ]
23 changes: 23 additions & 0 deletions tests/integration-tests/tests/common/scaling_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import time

import boto3
import yaml
from framework.metrics_publisher import Metric, MetricsPublisher
from pykwalify.core import Core
from remote_command_executor import RemoteCommandExecutor
from retrying import RetryError, retry
from time_utils import seconds
Expand All @@ -24,6 +26,27 @@
SCALING_COMMON_DATADIR = pathlib.Path(__file__).parent / "scaling"


def validate_and_get_scaling_test_config(scaling_test_config_file):
"""Get and validate the scaling test parameters"""
scaling_test_schema = str(SCALING_COMMON_DATADIR / "scaling_test_config_schema.yaml")
logging.info("Parsing scaling test config file: %s", scaling_test_config_file)
with open(scaling_test_config_file) as file:
scaling_test_config = yaml.safe_load(file)
logging.info(scaling_test_config)

# Validate scaling test config file against defined schema
logging.info("Validating config file against the schema")
try:
c = Core(source_data=scaling_test_config, schema_files=[scaling_test_schema])
c.validate(raise_exception=True)
except Exception as e:
logging.error("Failed when validating schema: %s", e)
logging.info("Dumping rendered template:\n%s", yaml.dump(scaling_test_config, default_flow_style=False))
raise

return scaling_test_config


def retry_if_scaling_target_not_reached(
ec2_capacity_time_series,
compute_nodes_time_series,
Expand Down
146 changes: 114 additions & 32 deletions tests/integration-tests/tests/performance_tests/test_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@
import time

import pytest
import yaml
from assertpy import assert_that, soft_assertions
from benchmarks.common.metrics_reporter import produce_benchmark_metrics_report
from pykwalify.core import Core
from remote_command_executor import RemoteCommandExecutor
from time_utils import minutes
from utils import disable_protected_mode

from tests.common.assertions import assert_no_msg_in_logs
from tests.common.scaling_common import get_scaling_metrics
from tests.common.scaling_common import get_scaling_metrics, validate_and_get_scaling_test_config


@pytest.mark.parametrize(
Expand Down Expand Up @@ -80,7 +78,7 @@ def _get_scaling_time(capacity_time_series: list, timestamps: list, scaling_targ
return scaling_target_time, int((scaling_target_time - start_time).total_seconds())
except ValueError as e:
logging.error("Cluster did not scale up to %d nodes", scaling_target)
raise e
raise Exception("Cluster could not scale up to target nodes within the max monitoring time") from e


@pytest.mark.usefixtures("scheduler")
Expand Down Expand Up @@ -111,7 +109,8 @@ def test_scaling_stress_test(
- A Metrics Image showing the scale up and scale down using a linear graph with annotations
"""
# Get the scaling parameters
scaling_test_config = _validate_and_get_scaling_test_config(test_datadir, request)
scaling_test_config_file = request.config.getoption("scaling_test_config")
scaling_test_config = validate_and_get_scaling_test_config(scaling_test_config_file)
max_monitoring_time_in_mins = scaling_test_config.get("MaxMonitoringTimeInMins")
shared_headnode_storage_type = scaling_test_config.get("SharedHeadNodeStorageType")
head_node_instance_type = scaling_test_config.get("HeadNodeInstanceType")
Expand Down Expand Up @@ -157,26 +156,85 @@ def test_scaling_stress_test(
time.sleep(300)


def _validate_and_get_scaling_test_config(test_datadir, request):
"""Get and validate the scaling test parameters"""
@pytest.mark.usefixtures("scheduler")
@pytest.mark.parametrize("scaling_strategy", ["all-or-nothing", "best-effort"])
def test_static_scaling_stress_test(
test_datadir,
instance,
os,
region,
request,
pcluster_config_reader,
scheduler_commands_factory,
clusters_factory,
scaling_strategy,
):
"""
The test scales up a cluster with a large number of static nodes, as opposed to scaling
up and down with dynamic nodes, by updating a cluster to use the target number of static nodes.
This test produces the same metrics and outputs as the dynamic scaling stress test.
"""
# Get the scaling parameters
scaling_test_config_file = request.config.getoption("scaling_test_config")
scaling_test_schema = str(test_datadir / "scaling_test_config_schema.yaml")
logging.info("Parsing scaling test config file: %s", scaling_test_config_file)
with open(scaling_test_config_file) as file:
scaling_test_config = yaml.safe_load(file)
logging.info(scaling_test_config)
scaling_test_config = validate_and_get_scaling_test_config(scaling_test_config_file)
max_monitoring_time_in_mins = scaling_test_config.get("MaxMonitoringTimeInMins")
shared_headnode_storage_type = scaling_test_config.get("SharedHeadNodeStorageType")
head_node_instance_type = scaling_test_config.get("HeadNodeInstanceType")
scaling_targets = scaling_test_config.get("ScalingTargets")

# Validate scaling test config file against defined schema
logging.info("Validating config file against the schema")
try:
c = Core(source_data=scaling_test_config, schema_files=[scaling_test_schema])
c.validate(raise_exception=True)
except Exception as e:
logging.error("Failed when validating schema: %s", e)
logging.info("Dumping rendered template:\n%s", yaml.dump(scaling_test_config, default_flow_style=False))
raise
# Creating cluster with intended head node instance type and scaling parameters
cluster_config = pcluster_config_reader(
# Prevent nodes being set down before we start monitoring the scale down metrics
scaledown_idletime=max_monitoring_time_in_mins,
head_node_instance_type=head_node_instance_type,
shared_headnode_storage_type=shared_headnode_storage_type,
scaling_strategy=scaling_strategy,
min_cluster_size=0,
max_cluster_size=1,
output_file="downscale-pcluster.config.yaml",
)
cluster = clusters_factory(cluster_config)
remote_command_executor = RemoteCommandExecutor(cluster)
scheduler_commands = scheduler_commands_factory(remote_command_executor)

return scaling_test_config
# Disable protected mode since bootstrap errors are likely to occur given the large cluster sizes
disable_protected_mode(remote_command_executor)

with soft_assertions():
for scaling_target in scaling_targets:
upscale_cluster_config = pcluster_config_reader(
# Prevent nodes being set down before we start monitoring the scale down metrics
scaledown_idletime=max_monitoring_time_in_mins,
head_node_instance_type=head_node_instance_type,
shared_headnode_storage_type=shared_headnode_storage_type,
scaling_strategy=scaling_strategy,
min_cluster_size=scaling_target,
max_cluster_size=scaling_target,
output_file=f"{scaling_target}-upscale-pcluster.config.yaml",
)
_scale_up_and_down(
cluster,
head_node_instance_type,
instance,
max_monitoring_time_in_mins,
os,
region,
remote_command_executor,
request,
scaling_target,
scaling_strategy,
scheduler_commands,
test_datadir,
is_static=True,
upscale_cluster_config=upscale_cluster_config,
downscale_cluster_config=cluster_config,
)

# Make sure the RunInstances Resource Token Bucket is full before starting another scaling up
# ref https://docs.aws.amazon.com/AWSEC2/latest/APIReference/throttling.html
if scaling_target != scaling_targets[-1]:
logging.info("Waiting for the RunInstances Resource Token Bucket to refill")
time.sleep(300)


def _scale_up_and_down(
Expand All @@ -192,19 +250,28 @@ def _scale_up_and_down(
scaling_strategy,
scheduler_commands,
test_datadir,
is_static=False,
upscale_cluster_config=None,
downscale_cluster_config=None,
):
# Reset underlying ssh connection to prevent socket closed error
remote_command_executor.reset_connection()
# Make sure partitions are active
cluster.start(wait_running=True)

# Submit a simple job to trigger the launch all compute nodes
scaling_job = {
# Keep job running until we explicitly cancel it and start monitoring scale down
"command": f"srun sleep {minutes(max_monitoring_time_in_mins) // 1000}",
"nodes": scaling_target,
}
job_id = scheduler_commands.submit_command_and_assert_job_accepted(scaling_job)
# Scale up cluster
if is_static:
# Update the cluster with target number of static nodes
cluster.update(str(upscale_cluster_config), force_update="true", wait=False, raise_on_error=False)
else:
# Submit a simple job to trigger the launch all compute nodes
scaling_job = {
# Keep job running until we explicitly cancel it and start monitoring scale down
"command": f"srun sleep {minutes(max_monitoring_time_in_mins) // 1000}",
"nodes": scaling_target,
}
job_id = scheduler_commands.submit_command_and_assert_job_accepted(scaling_job)

# Set start time at minute granularity (to simplify calculation and visualising on CloudWatch)
start_time = _datetime_to_minute(datetime.datetime.now(tz=datetime.timezone.utc))
# Monitor the cluster during scale up
Expand All @@ -216,14 +283,29 @@ def _scale_up_and_down(
publish_metrics=True,
target_cluster_size=scaling_target,
)

# Extract scale up duration and timestamp from the monitoring metrics collected above
_, scale_up_time_ec2 = _get_scaling_time(ec2_capacity_time_series_up, timestamps, scaling_target, start_time)
scaling_target_time, scale_up_time_scheduler = _get_scaling_time(
compute_nodes_time_series_up, timestamps, scaling_target, start_time
)
# Cancel the running job and scale down the cluster using the update-compute-fleet command
scheduler_commands.cancel_job(job_id)
cluster.stop()

# Scale down cluster
if is_static:
# Check that a simple job succeeds
scaling_job = {
"command": "srun sleep 10",
"nodes": scaling_target,
}
scheduler_commands.submit_command_and_assert_job_succeeded(scaling_job)

# Scale down the cluster
cluster.update(str(downscale_cluster_config), force_update="true", wait=False, raise_on_error=False)
else:
# Cancel the running job and scale down the cluster using the update-compute-fleet command
scheduler_commands.cancel_job(job_id)
cluster.stop()

# Monitor the cluster during scale down
scale_down_start_timestamp = _datetime_to_minute(datetime.datetime.now(tz=datetime.timezone.utc))
ec2_capacity_time_series_down, compute_nodes_time_series_down, timestamps, end_time = get_scaling_metrics(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Image:
Os: {{ os }}
HeadNode:
{% if shared_headnode_storage_type %}
SharedStorageType: {{ shared_headnode_storage_type }}
{% endif %}
InstanceType: {{ head_node_instance_type }}
Networking:
SubnetId: {{ public_subnet_id }}
Ssh:
KeyName: {{ key_name }}
Scheduling:
Scheduler: {{ scheduler }}
ScalingStrategy: {{ scaling_strategy }}
SlurmSettings:
ScaledownIdletime: {{ scaledown_idletime }}
SlurmQueues:
- Name: queue-0
ComputeResources:
- Name: compute-resource-0
InstanceType: {{ instance }}
MinCount: {{ min_cluster_size }}
MaxCount: {{ max_cluster_size }}
Networking:
SubnetIds:
- {{ private_subnet_id }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{
"c5.large": {
"1000": {
"best-effort": {
"scale_up_time_ec2": 360,
"scale_up_time_scheduler": 600,
"scale_down_time": 300
},
"all-or-nothing": {
"scale_up_time_ec2": 360,
"scale_up_time_scheduler": 600,
"scale_down_time": 300
}
},
"2000": {
"best-effort": {
"scale_up_time_ec2": 420,
"scale_up_time_scheduler": 720,
"scale_down_time": 360
},
"all-or-nothing": {
"scale_up_time_ec2": 420,
"scale_up_time_scheduler": 720,
"scale_down_time": 360
}
},
"3000": {
"best-effort": {
"scale_up_time_ec2": 720,
"scale_up_time_scheduler": 1020,
"scale_down_time": 480
},
"all-or-nothing": {
"scale_up_time_ec2": 720,
"scale_up_time_scheduler": 1020,
"scale_down_time": 480
}
},
"4000": {
"best-effort": {
"scale_up_time_ec2": 900,
"scale_up_time_scheduler": 1200,
"scale_down_time": 540
},
"all-or-nothing": {
"scale_up_time_ec2": 900,
"scale_up_time_scheduler": 1200,
"scale_down_time": 540
}
}
}
}

0 comments on commit d1f6007

Please sign in to comment.