Skip to content

Commit

Permalink
testing
Browse files Browse the repository at this point in the history
Signed-off-by: Judy Ng <[email protected]>
  • Loading branch information
judysng committed Mar 22, 2024
1 parent 89f69b6 commit 5fb53d8
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 97 deletions.
2 changes: 1 addition & 1 deletion tests/integration-tests/configs/scaling_stress_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ test-suites:
performance_tests:
test_scaling.py::test_static_scaling_stress_test:
dimensions:
- regions: [ "eu-west-1" ]
- regions: [ "us-east-1" ]
instances: [ "c5.large" ]
oss: [ "alinux2" ]
schedulers: [ "slurm" ]
211 changes: 120 additions & 91 deletions tests/integration-tests/tests/performance_tests/test_scaling.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def _get_scaling_time(capacity_time_series: list, timestamps: list, scaling_targ
return scaling_target_time, int((scaling_target_time - start_time).total_seconds())
except ValueError as e:
logging.error("Cluster did not scale up to %d nodes", scaling_target)
raise e
raise Exception("Cluster could not scale up to target nodes within the max monitoring time") from e


@pytest.mark.usefixtures("scheduler")
Expand Down Expand Up @@ -115,50 +115,56 @@ def test_static_scaling_stress_test(
head_node_instance_type = scaling_test_config.get("HeadNodeInstanceType")
scaling_targets = scaling_test_config.get("ScalingTargets")

for scaling_target in scaling_targets:
# Creating cluster with intended head node instance type and scaling parameters
cluster_config = pcluster_config_reader(
# Prevent nodes being set down before we start monitoring the scale down metrics
scaledown_idletime=max_monitoring_time_in_mins,
min_cluster_size=scaling_target,
max_cluster_size=scaling_target,
head_node_instance_type=head_node_instance_type,
shared_headnode_storage_type=shared_headnode_storage_type,
scaling_strategy=scaling_strategy,
)

# Create cluster and get creation start/end time
cluster = clusters_factory(cluster_config)
cluster_start_time = _datetime_to_minute(cluster.create_start_time)
cluster_end_time = _datetime_to_minute(cluster.create_end_time)
cluster_create_time = int((cluster_end_time - cluster_start_time).total_seconds())
# Creating cluster with intended head node instance type and scaling parameters
cluster_config = pcluster_config_reader(
# Prevent nodes being set down before we start monitoring the scale down metrics
scaledown_idletime=max_monitoring_time_in_mins,
head_node_instance_type=head_node_instance_type,
shared_headnode_storage_type=shared_headnode_storage_type,
scaling_strategy=scaling_strategy,
min_cluster_size=0,
max_cluster_size=1,
output_file="downscale-pcluster.config.yaml"
)
cluster = clusters_factory(cluster_config)
remote_command_executor = RemoteCommandExecutor(cluster)
scheduler_commands = scheduler_commands_factory(remote_command_executor)

# Run a job and get the time it takes for the job to start running
remote_command_executor = RemoteCommandExecutor(cluster)
scheduler_commands = scheduler_commands_factory(remote_command_executor)
with soft_assertions():
for scaling_target in scaling_targets:
upscale_cluster_config = pcluster_config_reader(
# Prevent nodes being set down before we start monitoring the scale down metrics
scaledown_idletime=max_monitoring_time_in_mins,
head_node_instance_type=head_node_instance_type,
shared_headnode_storage_type=shared_headnode_storage_type,
scaling_strategy=scaling_strategy,
min_cluster_size=scaling_target,
max_cluster_size=scaling_target,
output_file=f"{scaling_target}-upscale-pcluster.config.yaml"
)
_scale_up_and_down(
cluster,
head_node_instance_type,
instance,
max_monitoring_time_in_mins,
os,
region,
remote_command_executor,
request,
scaling_target,
scaling_strategy,
scheduler_commands,
test_datadir,
is_static=True,
upscale_cluster_config=upscale_cluster_config,
downscale_cluster_config=cluster_config,
)

scaling_job = {
"command": f"srun sleep 10",
"nodes": scaling_target,
}
job_id = scheduler_commands.submit_command_and_assert_job_accepted(scaling_job)
start_time = _datetime_to_minute(datetime.datetime.now(tz=datetime.timezone.utc))
scheduler_commands.wait_job_running(job_id)
end_time = _datetime_to_minute(datetime.datetime.now(tz=datetime.timezone.utc))
scheduler_commands.cancel_job(job_id)
job_start_time = int((end_time - start_time).total_seconds())

scaling_results = {
"Region": region,
"OS": os,
"ComputeNode": instance,
"HeadNode": head_node_instance_type,
"ScalingTarget": scaling_target,
"ScalingStrategy": scaling_strategy,
"ClusterCreateTime": cluster_create_time,
"JobStartTime": job_start_time
}
logging.info(f"Scaling Results: {scaling_results}")
# Make sure the RunInstances Resource Token Bucket is full before starting another scaling up
# ref https://docs.aws.amazon.com/AWSEC2/latest/APIReference/throttling.html
if scaling_target != scaling_targets[-1]:
logging.info("Waiting for the RunInstances Resource Token Bucket to refill")
time.sleep(300)

@pytest.mark.usefixtures("scheduler")
@pytest.mark.parametrize("scaling_strategy", ["all-or-nothing", "best-effort"])
Expand Down Expand Up @@ -269,19 +275,28 @@ def _scale_up_and_down(
scaling_strategy,
scheduler_commands,
test_datadir,
is_static=False,
upscale_cluster_config=None,
downscale_cluster_config=None,
):
# Reset underlying ssh connection to prevent socket closed error
remote_command_executor.reset_connection()
# Make sure partitions are active
cluster.start(wait_running=True)

# Submit a simple job to trigger the launch all compute nodes
scaling_job = {
# Keep job running until we explicitly cancel it and start monitoring scale down
"command": f"srun sleep {minutes(max_monitoring_time_in_mins) // 1000}",
"nodes": scaling_target,
}
job_id = scheduler_commands.submit_command_and_assert_job_accepted(scaling_job)
# Scale up cluster
if is_static:
# Update the cluster with target number of static nodes
cluster.update(str(upscale_cluster_config), force_update="true", wait=False, raise_on_error=False)
else:
# Submit a simple job to trigger the launch all compute nodes
scaling_job = {
# Keep job running until we explicitly cancel it and start monitoring scale down
"command": f"srun sleep {minutes(max_monitoring_time_in_mins) // 1000}",
"nodes": scaling_target,
}
job_id = scheduler_commands.submit_command_and_assert_job_accepted(scaling_job)

# Set start time at minute granularity (to simplify calculation and visualising on CloudWatch)
start_time = _datetime_to_minute(datetime.datetime.now(tz=datetime.timezone.utc))
# Monitor the cluster during scale up
Expand All @@ -298,9 +313,23 @@ def _scale_up_and_down(
scaling_target_time, scale_up_time_scheduler = _get_scaling_time(
compute_nodes_time_series_up, timestamps, scaling_target, start_time
)
# Cancel the running job and scale down the cluster using the update-compute-fleet command
scheduler_commands.cancel_job(job_id)
cluster.stop()

# Scale down cluster
if is_static:
# Check that a simple job succeeds
scaling_job = {
"command": "srun sleep 10",
"nodes": scaling_target,
}
scheduler_commands.submit_command_and_assert_job_succeeded(scaling_job)

# Scale down the cluster
cluster.update(str(downscale_cluster_config), force_update="true", wait=False, raise_on_error=False)
else:
# Cancel the running job and scale down the cluster using the update-compute-fleet command
scheduler_commands.cancel_job(job_id)
cluster.stop()

# Monitor the cluster during scale down
scale_down_start_timestamp = _datetime_to_minute(datetime.datetime.now(tz=datetime.timezone.utc))
ec2_capacity_time_series_down, compute_nodes_time_series_down, timestamps, end_time = get_scaling_metrics(
Expand Down Expand Up @@ -337,41 +366,41 @@ def _scale_up_and_down(
scaling_target_time=_datetime_to_minute(scaling_target_time),
)

# Verify that there was no EC2 over-scaling
assert_that(max(ec2_capacity_time_series_up)).is_equal_to(scaling_target)
# Verify that there was no Slurm nodes over-scaling
assert_that(max(compute_nodes_time_series_up)).is_equal_to(scaling_target)
# Verify all Slurm nodes were removed on scale down
assert_that(compute_nodes_time_series_down[-1]).is_equal_to(0)

with open(str(test_datadir / "results" / "baseline.json"), encoding="utf-8") as baseline_file:
baseline_dict = json.loads(baseline_file.read())
try:
baseline_scale_up_time_ec2 = int(
baseline_dict.get(instance).get(str(scaling_target)).get(scaling_strategy).get("scale_up_time_ec2")
)
baseline_scale_up_time_scheduler = int(
baseline_dict.get(instance).get(str(scaling_target)).get(scaling_strategy).get("scale_up_time_scheduler")
)
baseline_scale_down_time = int(
baseline_dict.get(instance).get(str(scaling_target)).get(scaling_strategy).get("scale_down_time")
)

# Verify scale up time for EC2
assert_that(scale_up_time_ec2, f"Scaling target {scaling_target} EC2 scale up time").is_less_than_or_equal_to(
baseline_scale_up_time_ec2
)
# Verify scale up time for scheduler (EC2 + bootstrap)
assert_that(
scale_up_time_scheduler, f"Scaling target {scaling_target} scheduler scale up time"
).is_less_than_or_equal_to(baseline_scale_up_time_scheduler)
# Verify scale down time
assert_that(scale_down_time, f"Scaling target {scaling_target} scale down time").is_less_than_or_equal_to(
baseline_scale_down_time
)
except AttributeError:
logging.warning(
f"Baseline for ComputeNode ({instance}), ScalingTarget ({scaling_target}), "
f"ScalingStrategy ({scaling_strategy}) not found. "
f"You need to build it in {str(test_datadir / 'results' / 'baseline.json')}"
)
# # Verify that there was no EC2 over-scaling
# assert_that(max(ec2_capacity_time_series_up)).is_equal_to(scaling_target)
# # Verify that there was no Slurm nodes over-scaling
# assert_that(max(compute_nodes_time_series_up)).is_equal_to(scaling_target)
# # Verify all Slurm nodes were removed on scale down
# assert_that(compute_nodes_time_series_down[-1]).is_equal_to(0)
#
# with open(str(test_datadir / "results" / "baseline.json"), encoding="utf-8") as baseline_file:
# baseline_dict = json.loads(baseline_file.read())
# try:
# baseline_scale_up_time_ec2 = int(
# baseline_dict.get(instance).get(str(scaling_target)).get(scaling_strategy).get("scale_up_time_ec2")
# )
# baseline_scale_up_time_scheduler = int(
# baseline_dict.get(instance).get(str(scaling_target)).get(scaling_strategy).get("scale_up_time_scheduler")
# )
# baseline_scale_down_time = int(
# baseline_dict.get(instance).get(str(scaling_target)).get(scaling_strategy).get("scale_down_time")
# )
#
# # Verify scale up time for EC2
# assert_that(scale_up_time_ec2, f"Scaling target {scaling_target} EC2 scale up time").is_less_than_or_equal_to(
# baseline_scale_up_time_ec2
# )
# # Verify scale up time for scheduler (EC2 + bootstrap)
# assert_that(
# scale_up_time_scheduler, f"Scaling target {scaling_target} scheduler scale up time"
# ).is_less_than_or_equal_to(baseline_scale_up_time_scheduler)
# # Verify scale down time
# assert_that(scale_down_time, f"Scaling target {scaling_target} scale down time").is_less_than_or_equal_to(
# baseline_scale_down_time
# )
# except AttributeError:
# logging.warning(
# f"Baseline for ComputeNode ({instance}), ScalingTarget ({scaling_target}), "
# f"ScalingStrategy ({scaling_strategy}) not found. "
# f"You need to build it in {str(test_datadir / 'results' / 'baseline.json')}"
# )
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,9 @@ Scheduling:
- Name: queue-0
ComputeResources:
- Name: compute-resource-0
Instances:
- InstanceType: {{ instance }}
InstanceType: {{ instance }}
MinCount: {{ min_cluster_size }}
MaxCount: {{ max_cluster_size }}
Networking:
SubnetIds:
- {{ private_subnet_id }}
DevSettings:
Timeouts:
HeadNodeBootstrapTimeout: 3600

0 comments on commit 5fb53d8

Please sign in to comment.