Skip to content

Commit

Permalink
Add static node stress test
Browse files Browse the repository at this point in the history
Signed-off-by: Judy Ng <[email protected]>
  • Loading branch information
judysng committed Mar 25, 2024
1 parent 4e80c57 commit cfc598e
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 41 deletions.
14 changes: 10 additions & 4 deletions tests/integration-tests/configs/scaling_stress_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,13 @@ test-suites:
performance_tests:
test_scaling.py::test_scaling_stress_test:
dimensions:
- regions: ["us-east-1"]
instances: ["c5.large"]
oss: ["alinux2"]
schedulers: ["slurm"]
- regions: [ "us-east-1" ]
instances: [ "c5.large" ]
oss: [ "alinux2" ]
schedulers: [ "slurm" ]
test_scaling.py::test_static_scaling_stress_test:
dimensions:
- regions: [ "us-east-1" ]
instances: [ "c5.large" ]
oss: [ "alinux2" ]
schedulers: [ "slurm" ]
25 changes: 25 additions & 0 deletions tests/integration-tests/tests/common/scaling_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@
import datetime
import json
import logging
import os
import pathlib
import time

import boto3
import yaml
from pykwalify.core import Core

from framework.metrics_publisher import Metric, MetricsPublisher
from remote_command_executor import RemoteCommandExecutor
from retrying import RetryError, retry
Expand All @@ -24,6 +28,27 @@
SCALING_COMMON_DATADIR = pathlib.Path(__file__).parent / "scaling"


def validate_and_get_scaling_test_config(scaling_test_config_file):
"""Get and validate the scaling test parameters"""
scaling_test_schema = str(SCALING_COMMON_DATADIR / "scaling_test_config_schema.yaml")
logging.info("Parsing scaling test config file: %s", scaling_test_config_file)
with open(scaling_test_config_file) as file:
scaling_test_config = yaml.safe_load(file)
logging.info(scaling_test_config)

# Validate scaling test config file against defined schema
logging.info("Validating config file against the schema")
try:
c = Core(source_data=scaling_test_config, schema_files=[scaling_test_schema])
c.validate(raise_exception=True)
except Exception as e:
logging.error("Failed when validating schema: %s", e)
logging.info("Dumping rendered template:\n%s", yaml.dump(scaling_test_config, default_flow_style=False))
raise

return scaling_test_config


def retry_if_scaling_target_not_reached(
ec2_capacity_time_series,
compute_nodes_time_series,
Expand Down
162 changes: 125 additions & 37 deletions tests/integration-tests/tests/performance_tests/test_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@
import time

import pytest
import yaml
from assertpy import assert_that, soft_assertions
from benchmarks.common.metrics_reporter import produce_benchmark_metrics_report
from pykwalify.core import Core
from remote_command_executor import RemoteCommandExecutor
from time_utils import minutes
from utils import disable_protected_mode

from tests.common.assertions import assert_no_msg_in_logs
from tests.common.scaling_common import get_scaling_metrics
from tests.common.scaling_common import get_scaling_metrics, validate_and_get_scaling_test_config


@pytest.mark.parametrize(
Expand Down Expand Up @@ -80,9 +78,96 @@ def _get_scaling_time(capacity_time_series: list, timestamps: list, scaling_targ
return scaling_target_time, int((scaling_target_time - start_time).total_seconds())
except ValueError as e:
logging.error("Cluster did not scale up to %d nodes", scaling_target)
raise e
raise Exception("Cluster could not scale up to target nodes within the max monitoring time") from e


@pytest.mark.usefixtures("scheduler")
@pytest.mark.parametrize("scaling_strategy", ["all-or-nothing", "best-effort"])
def test_static_scaling_stress_test(
test_datadir,
instance,
os,
region,
request,
pcluster_config_reader,
scheduler_commands_factory,
clusters_factory,
scaling_strategy,
):
"""
The test scales up a cluster with a large number of static nodes, as opposed to scaling
up and down with dynamic nodes.
This test records the amount of time it takes to create the cluster with the target number
of static nodes and then the amount of time it takes to start running a job once it has been
submitted. It compares the time to the baselines.
This test doesn't upload metrics like the dynamic ones because the nodes start up in cluster creation,
so we can't monitor the number of nodes from here. So, we just do a check that the time it takes is within
the baseline.
"""
# Get the scaling parameters
scaling_test_config_file = request.config.getoption("scaling_test_config")
scaling_test_config = validate_and_get_scaling_test_config(test_datadir, scaling_test_config_file)
max_monitoring_time_in_mins = scaling_test_config.get("MaxMonitoringTimeInMins")
shared_headnode_storage_type = scaling_test_config.get("SharedHeadNodeStorageType")
head_node_instance_type = scaling_test_config.get("HeadNodeInstanceType")
scaling_targets = scaling_test_config.get("ScalingTargets")

# Creating cluster with intended head node instance type and scaling parameters
cluster_config = pcluster_config_reader(
# Prevent nodes being set down before we start monitoring the scale down metrics
scaledown_idletime=max_monitoring_time_in_mins,
head_node_instance_type=head_node_instance_type,
shared_headnode_storage_type=shared_headnode_storage_type,
scaling_strategy=scaling_strategy,
min_cluster_size=0,
max_cluster_size=1,
output_file="downscale-pcluster.config.yaml"
)
cluster = clusters_factory(cluster_config)
remote_command_executor = RemoteCommandExecutor(cluster)
scheduler_commands = scheduler_commands_factory(remote_command_executor)

# Disable protected mode since bootstrap errors are likely to occur given the large cluster sizes
disable_protected_mode(remote_command_executor)

with soft_assertions():
for scaling_target in scaling_targets:
upscale_cluster_config = pcluster_config_reader(
# Prevent nodes being set down before we start monitoring the scale down metrics
scaledown_idletime=max_monitoring_time_in_mins,
head_node_instance_type=head_node_instance_type,
shared_headnode_storage_type=shared_headnode_storage_type,
scaling_strategy=scaling_strategy,
min_cluster_size=scaling_target,
max_cluster_size=scaling_target,
output_file=f"{scaling_target}-upscale-pcluster.config.yaml"
)
_scale_up_and_down(
cluster,
head_node_instance_type,
instance,
max_monitoring_time_in_mins,
os,
region,
remote_command_executor,
request,
scaling_target,
scaling_strategy,
scheduler_commands,
test_datadir,
is_static=True,
upscale_cluster_config=upscale_cluster_config,
downscale_cluster_config=cluster_config,
)

# Make sure the RunInstances Resource Token Bucket is full before starting another scaling up
# ref https://docs.aws.amazon.com/AWSEC2/latest/APIReference/throttling.html
if scaling_target != scaling_targets[-1]:
logging.info("Waiting for the RunInstances Resource Token Bucket to refill")
time.sleep(300)

@pytest.mark.usefixtures("scheduler")
@pytest.mark.parametrize("scaling_strategy", ["all-or-nothing", "best-effort"])
def test_scaling_stress_test(
Expand Down Expand Up @@ -111,7 +196,8 @@ def test_scaling_stress_test(
- A Metrics Image showing the scale up and scale down using a linear graph with annotations
"""
# Get the scaling parameters
scaling_test_config = _validate_and_get_scaling_test_config(test_datadir, request)
scaling_test_config_file = request.config.getoption("scaling_test_config")
scaling_test_config = validate_and_get_scaling_test_config(test_datadir, scaling_test_config_file)
max_monitoring_time_in_mins = scaling_test_config.get("MaxMonitoringTimeInMins")
shared_headnode_storage_type = scaling_test_config.get("SharedHeadNodeStorageType")
head_node_instance_type = scaling_test_config.get("HeadNodeInstanceType")
Expand Down Expand Up @@ -157,28 +243,6 @@ def test_scaling_stress_test(
time.sleep(300)


def _validate_and_get_scaling_test_config(test_datadir, request):
"""Get and validate the scaling test parameters"""
scaling_test_config_file = request.config.getoption("scaling_test_config")
scaling_test_schema = str(test_datadir / "scaling_test_config_schema.yaml")
logging.info("Parsing scaling test config file: %s", scaling_test_config_file)
with open(scaling_test_config_file) as file:
scaling_test_config = yaml.safe_load(file)
logging.info(scaling_test_config)

# Validate scaling test config file against defined schema
logging.info("Validating config file against the schema")
try:
c = Core(source_data=scaling_test_config, schema_files=[scaling_test_schema])
c.validate(raise_exception=True)
except Exception as e:
logging.error("Failed when validating schema: %s", e)
logging.info("Dumping rendered template:\n%s", yaml.dump(scaling_test_config, default_flow_style=False))
raise

return scaling_test_config


def _scale_up_and_down(
cluster,
head_node_instance_type,
Expand All @@ -192,19 +256,28 @@ def _scale_up_and_down(
scaling_strategy,
scheduler_commands,
test_datadir,
is_static=False,
upscale_cluster_config=None,
downscale_cluster_config=None,
):
# Reset underlying ssh connection to prevent socket closed error
remote_command_executor.reset_connection()
# Make sure partitions are active
cluster.start(wait_running=True)

# Submit a simple job to trigger the launch all compute nodes
scaling_job = {
# Keep job running until we explicitly cancel it and start monitoring scale down
"command": f"srun sleep {minutes(max_monitoring_time_in_mins) // 1000}",
"nodes": scaling_target,
}
job_id = scheduler_commands.submit_command_and_assert_job_accepted(scaling_job)
# Scale up cluster
if is_static:
# Update the cluster with target number of static nodes
cluster.update(str(upscale_cluster_config), force_update="true", wait=False, raise_on_error=False)
else:
# Submit a simple job to trigger the launch all compute nodes
scaling_job = {
# Keep job running until we explicitly cancel it and start monitoring scale down
"command": f"srun sleep {minutes(max_monitoring_time_in_mins) // 1000}",
"nodes": scaling_target,
}
job_id = scheduler_commands.submit_command_and_assert_job_accepted(scaling_job)

# Set start time at minute granularity (to simplify calculation and visualising on CloudWatch)
start_time = _datetime_to_minute(datetime.datetime.now(tz=datetime.timezone.utc))
# Monitor the cluster during scale up
Expand All @@ -216,14 +289,29 @@ def _scale_up_and_down(
publish_metrics=True,
target_cluster_size=scaling_target,
)

# Extract scale up duration and timestamp from the monitoring metrics collected above
_, scale_up_time_ec2 = _get_scaling_time(ec2_capacity_time_series_up, timestamps, scaling_target, start_time)
scaling_target_time, scale_up_time_scheduler = _get_scaling_time(
compute_nodes_time_series_up, timestamps, scaling_target, start_time
)
# Cancel the running job and scale down the cluster using the update-compute-fleet command
scheduler_commands.cancel_job(job_id)
cluster.stop()

# Scale down cluster
if is_static:
# Check that a simple job succeeds
scaling_job = {
"command": "srun sleep 10",
"nodes": scaling_target,
}
scheduler_commands.submit_command_and_assert_job_succeeded(scaling_job)

# Scale down the cluster
cluster.update(str(downscale_cluster_config), force_update="true", wait=False, raise_on_error=False)
else:
# Cancel the running job and scale down the cluster using the update-compute-fleet command
scheduler_commands.cancel_job(job_id)
cluster.stop()

# Monitor the cluster during scale down
scale_down_start_timestamp = _datetime_to_minute(datetime.datetime.now(tz=datetime.timezone.utc))
ec2_capacity_time_series_down, compute_nodes_time_series_down, timestamps, end_time = get_scaling_metrics(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
Image:
Os: {{ os }}
HeadNode:
{% if shared_headnode_storage_type %}
SharedStorageType: {{ shared_headnode_storage_type }}
{% endif %}
InstanceType: {{ head_node_instance_type }}
Networking:
SubnetId: {{ public_subnet_id }}
Ssh:
KeyName: {{ key_name }}
Scheduling:
Scheduler: {{ scheduler }}
ScalingStrategy: {{ scaling_strategy }}
SlurmSettings:
ScaledownIdletime: {{ scaledown_idletime }}
SlurmQueues:
- Name: queue-0
ComputeResources:
- Name: compute-resource-0
InstanceType: {{ instance }}
MinCount: {{ min_cluster_size }}
MaxCount: {{ max_cluster_size }}
Networking:
SubnetIds:
- {{ private_subnet_id }}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{
"c5.large": {
"1000": {
"best-effort": {
"scale_up_time_ec2": 360,
"scale_up_time_scheduler": 600,
"scale_down_time": 300
},
"all-or-nothing": {
"scale_up_time_ec2": 360,
"scale_up_time_scheduler": 600,
"scale_down_time": 300
}
},
"2000": {
"best-effort": {
"scale_up_time_ec2": 420,
"scale_up_time_scheduler": 720,
"scale_down_time": 360
},
"all-or-nothing": {
"scale_up_time_ec2": 420,
"scale_up_time_scheduler": 720,
"scale_down_time": 360
}
},
"3000": {
"best-effort": {
"scale_up_time_ec2": 720,
"scale_up_time_scheduler": 1020,
"scale_down_time": 480
},
"all-or-nothing": {
"scale_up_time_ec2": 720,
"scale_up_time_scheduler": 1020,
"scale_down_time": 480
}
},
"4000": {
"best-effort": {
"scale_up_time_ec2": 900,
"scale_up_time_scheduler": 1200,
"scale_down_time": 540
},
"all-or-nothing": {
"scale_up_time_ec2": 900,
"scale_up_time_scheduler": 1200,
"scale_down_time": 540
}
}
}
}

0 comments on commit cfc598e

Please sign in to comment.