diff --git a/packages/aws-library/src/aws_library/ec2/_client.py b/packages/aws-library/src/aws_library/ec2/_client.py index a40cf794304..276423415a5 100644 --- a/packages/aws-library/src/aws_library/ec2/_client.py +++ b/packages/aws-library/src/aws_library/ec2/_client.py @@ -181,7 +181,8 @@ async def launch_instances( ) instance_ids = [i["InstanceId"] for i in instances["Instances"]] _logger.info( - "New instances launched: %s, waiting for them to start now...", + "%s New instances launched: %s, waiting for them to start now...", + len(instance_ids), instance_ids, ) diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/autoscaling.py b/packages/pytest-simcore/src/pytest_simcore/helpers/autoscaling.py new file mode 100644 index 00000000000..2d6c278d92c --- /dev/null +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/autoscaling.py @@ -0,0 +1,73 @@ +from collections.abc import Callable + +import arrow +from aws_library.ec2 import EC2InstanceData +from models_library.generated_models.docker_rest_api import ( + Availability, + Node, + NodeState, +) +from pytest_mock import MockType +from simcore_service_autoscaling.models import AssociatedInstance, Cluster +from simcore_service_autoscaling.utils.utils_docker import ( + _OSPARC_NODE_TERMINATION_PROCESS_LABEL_KEY, + _OSPARC_SERVICE_READY_LABEL_KEY, + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY, +) + + +def assert_cluster_state( + spied_cluster_analysis: MockType, *, expected_calls: int, expected_num_machines: int +) -> Cluster: + assert spied_cluster_analysis.call_count == expected_calls + + assert isinstance(spied_cluster_analysis.spy_return, Cluster) + assert ( + spied_cluster_analysis.spy_return.total_number_of_machines() + == expected_num_machines + ) + print("current cluster state:", spied_cluster_analysis.spy_return) + cluster = spied_cluster_analysis.spy_return + spied_cluster_analysis.reset_mock() + return cluster + + +def create_fake_association( + create_fake_node: Callable[..., Node], + drained_machine_id: str | None, + terminating_machine_id: str | None, +): + fake_node_to_instance_map = {} + + async def _fake_node_creator( + _nodes: list[Node], ec2_instances: list[EC2InstanceData] + ) -> tuple[list[AssociatedInstance], list[EC2InstanceData]]: + def _create_fake_node_with_labels(instance: EC2InstanceData) -> Node: + if instance not in fake_node_to_instance_map: + fake_node = create_fake_node() + assert fake_node.spec + fake_node.spec.availability = Availability.active + assert fake_node.status + fake_node.status.state = NodeState.ready + assert fake_node.spec.labels + fake_node.spec.labels |= { + _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY: arrow.utcnow().isoformat(), + _OSPARC_SERVICE_READY_LABEL_KEY: ( + "true" if instance.id != drained_machine_id else "false" + ), + } + if instance.id == terminating_machine_id: + fake_node.spec.labels |= { + _OSPARC_NODE_TERMINATION_PROCESS_LABEL_KEY: arrow.utcnow().isoformat() + } + fake_node_to_instance_map[instance] = fake_node + return fake_node_to_instance_map[instance] + + associated_instances = [ + AssociatedInstance(node=_create_fake_node_with_labels(i), ec2_instance=i) + for i in ec2_instances + ] + + return associated_instances, [] + + return _fake_node_creator diff --git a/scripts/maintenance/computational-clusters/autoscaled_monitor/core.py b/scripts/maintenance/computational-clusters/autoscaled_monitor/core.py index c0c4ba7bed6..540b4581ab6 100755 --- a/scripts/maintenance/computational-clusters/autoscaled_monitor/core.py +++ b/scripts/maintenance/computational-clusters/autoscaled_monitor/core.py @@ -138,7 +138,6 @@ def _print_dynamic_instances( f"{utils.color_encode_with_state(instance.name, instance.ec2_instance)}", f"ID: {instance.ec2_instance.instance_id}", f"AMI: {instance.ec2_instance.image_id}", - f"AMI name: {instance.ec2_instance.image.name}", f"Type: {instance.ec2_instance.instance_type}", f"Up: {utils.timedelta_formatting(time_now - instance.ec2_instance.launch_time, color_code=True)}", f"ExtIP: {instance.ec2_instance.public_ip_address}", @@ -183,7 +182,6 @@ def _print_computational_clusters( f"Name: {cluster.primary.name}", f"ID: {cluster.primary.ec2_instance.id}", f"AMI: {cluster.primary.ec2_instance.image_id}", - f"AMI name: {cluster.primary.ec2_instance.image.name}", f"Type: {cluster.primary.ec2_instance.instance_type}", f"Up: {utils.timedelta_formatting(time_now - cluster.primary.ec2_instance.launch_time, color_code=True)}", f"ExtIP: {cluster.primary.ec2_instance.public_ip_address}", @@ -229,7 +227,6 @@ def _print_computational_clusters( f"Name: {worker.name}", f"ID: {worker.ec2_instance.id}", f"AMI: {worker.ec2_instance.image_id}", - f"AMI name: {worker.ec2_instance.image.name}", f"Type: {worker.ec2_instance.instance_type}", f"Up: {utils.timedelta_formatting(time_now - worker.ec2_instance.launch_time, color_code=True)}", f"ExtIP: {worker.ec2_instance.public_ip_address}", diff --git a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py index 8d5ff16dd9a..f86c555a253 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py +++ b/services/autoscaling/src/simcore_service_autoscaling/modules/auto_scaling_core.py @@ -16,7 +16,6 @@ Resources, ) from aws_library.ec2._errors import EC2TooManyInstancesError -from aws_library.ec2._models import AWSTagValue from fastapi import FastAPI from models_library.generated_models.docker_rest_api import Node, NodeState from servicelib.logging_utils import log_catch, log_context @@ -265,9 +264,7 @@ async def _make_pending_buffer_ec2s_join_cluster( await ec2_client.set_instances_tags( buffer_ec2_ready_for_command, tags={ - DOCKER_JOIN_COMMAND_EC2_TAG_KEY: AWSTagValue( - ssm_command.command_id - ), + DOCKER_JOIN_COMMAND_EC2_TAG_KEY: ssm_command.command_id, }, ) return cluster @@ -389,13 +386,18 @@ async def _activate_drained_nodes( if node.assigned_tasks ] - # activate these nodes now - await asyncio.gather( - *( - _activate_and_notify(app, auto_scaling_mode, node) - for node in nodes_to_activate + if not nodes_to_activate: + return cluster + + with log_context( + _logger, logging.INFO, f"activate {len(nodes_to_activate)} drained nodes" + ): + await asyncio.gather( + *( + _activate_and_notify(app, auto_scaling_mode, node) + for node in nodes_to_activate + ) ) - ) new_active_node_ids = {node.ec2_instance.id for node in nodes_to_activate} remaining_drained_nodes = [ node @@ -424,12 +426,17 @@ async def _start_buffer_instances( if not instances_to_start: return cluster # change the buffer machine to an active one - await get_ec2_client(app).set_instances_tags( - instances_to_start, - tags=get_activated_buffer_ec2_tags(app, auto_scaling_mode), - ) + with log_context( + _logger, logging.INFO, f"start {len(instances_to_start)} buffer machines" + ): + await get_ec2_client(app).set_instances_tags( + instances_to_start, + tags=get_activated_buffer_ec2_tags(app, auto_scaling_mode), + ) - started_instances = await get_ec2_client(app).start_instances(instances_to_start) + started_instances = await get_ec2_client(app).start_instances( + instances_to_start + ) started_instance_ids = [i.id for i in started_instances] return dataclasses.replace( @@ -541,7 +548,8 @@ async def _assign_tasks_to_current_cluster( if unassigned_tasks: _logger.info( - "the current cluster should cope with %s tasks, %s are unnassigned/queued tasks and will need new EC2s", + "the current cluster should cope with %s tasks, %s are unnassigned/queued " + "tasks and need to wait or get new EC2s", len(tasks) - len(unassigned_tasks), len(unassigned_tasks), ) @@ -617,9 +625,10 @@ async def _find_needed_instances( _logger.exception("Unexpected error:") _logger.info( - "found following needed instances: %s", + "found following %s needed instances: %s", + len(needed_new_instance_types_for_tasks), [ - f"{i.instance_type.name=}:{i.instance_type.resources} with {len(i.assigned_tasks)} tasks" + f"{i.instance_type.name}:{i.instance_type.resources} takes {len(i.assigned_tasks)} task{'s' if len(i.assigned_tasks)>1 else ''}" for i in needed_new_instance_types_for_tasks ], ) @@ -811,39 +820,6 @@ async def _launch_instances( return new_pending_instances -async def _scale_up_cluster( - app: FastAPI, - cluster: Cluster, - unassigned_tasks: list, - auto_scaling_mode: BaseAutoscaling, - allowed_instance_types: list[EC2InstanceType], -) -> Cluster: - app_settings: ApplicationSettings = app.state.settings - assert app_settings.AUTOSCALING_EC2_ACCESS # nosec - assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec - - # let's start these - if needed_ec2_instances := await _find_needed_instances( - app, unassigned_tasks, allowed_instance_types, cluster, auto_scaling_mode - ): - await auto_scaling_mode.log_message_from_tasks( - app, - unassigned_tasks, - "service is pending due to missing resources, scaling up cluster now...", - level=logging.INFO, - ) - new_pending_instances = await _launch_instances( - app, needed_ec2_instances, unassigned_tasks, auto_scaling_mode - ) - cluster.pending_ec2s.extend( - [NonAssociatedInstance(ec2_instance=i) for i in new_pending_instances] - ) - # NOTE: to check the logs of UserData in EC2 instance - # run: tail -f -n 1000 /var/log/cloud-init-output.log in the instance - - return cluster - - async def _find_drainable_nodes( app: FastAPI, cluster: Cluster ) -> list[AssociatedInstance]: @@ -899,23 +875,25 @@ async def _deactivate_empty_nodes(app: FastAPI, cluster: Cluster) -> Cluster: if not active_empty_instances: return cluster - # drain this empty nodes - updated_nodes: list[Node] = await asyncio.gather( - *( - utils_docker.set_node_osparc_ready( - app_settings, - docker_client, - node.node, - ready=False, + with log_context( + _logger, logging.INFO, f"drain {len(active_empty_instances)} empty nodes" + ): + updated_nodes: list[Node] = await asyncio.gather( + *( + utils_docker.set_node_osparc_ready( + app_settings, + docker_client, + node.node, + ready=False, + ) + for node in active_empty_instances ) - for node in active_empty_instances - ) - ) - if updated_nodes: - _logger.info( - "following nodes were set to drain: '%s'", - f"{[node.description.hostname for node in updated_nodes if node.description]}", ) + if updated_nodes: + _logger.info( + "following nodes were set to drain: '%s'", + f"{[node.description.hostname for node in updated_nodes if node.description]}", + ) newly_drained_instances = [ AssociatedInstance(node=node, ec2_instance=instance.ec2_instance) for instance, node in zip(active_empty_instances, updated_nodes, strict=True) @@ -945,7 +923,7 @@ async def _find_terminateable_instances( for instance in cluster.drained_nodes: node_last_updated = utils_docker.get_node_last_readyness_update(instance.node) elapsed_time_since_drained = ( - datetime.datetime.now(datetime.timezone.utc) - node_last_updated + datetime.datetime.now(datetime.UTC) - node_last_updated ) _logger.debug("%s", f"{node_last_updated=}, {elapsed_time_since_drained=}") if ( @@ -985,6 +963,9 @@ async def _try_scale_down_cluster(app: FastAPI, cluster: Cluster) -> Cluster: get_docker_client(app), instance.node ) new_terminating_instances.append(instance) + new_terminating_instance_ids = [ + i.ec2_instance.id for i in new_terminating_instances + ] # instances that are in the termination process and already waited long enough are terminated. now = arrow.utcnow().datetime @@ -1016,12 +997,18 @@ async def _try_scale_down_cluster(app: FastAPI, cluster: Cluster) -> Cluster: still_drained_nodes = [ i for i in cluster.drained_nodes + if i.ec2_instance.id + not in (new_terminating_instance_ids + terminated_instance_ids) + ] + still_terminating_nodes = [ + i + for i in cluster.terminating_nodes if i.ec2_instance.id not in terminated_instance_ids ] return dataclasses.replace( cluster, drained_nodes=still_drained_nodes, - terminating_nodes=cluster.terminating_nodes + new_terminating_instances, + terminating_nodes=still_terminating_nodes + new_terminating_instances, terminated_instances=cluster.terminated_instances + [ NonAssociatedInstance(ec2_instance=i.ec2_instance) @@ -1043,7 +1030,7 @@ async def _notify_based_on_machine_type( app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_START_TIME ) launch_time_to_tasks: dict[datetime.datetime, list] = collections.defaultdict(list) - now = datetime.datetime.now(datetime.timezone.utc) + now = datetime.datetime.now(datetime.UTC) for instance in instances: launch_time_to_tasks[ instance.ec2_instance.launch_time @@ -1116,64 +1103,99 @@ async def _drain_retired_nodes( ) -async def _autoscale_cluster( +async def _scale_down_unused_cluster_instances( app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling, - allowed_instance_types: list[EC2InstanceType], ) -> Cluster: - # 1. check if we have pending tasks and resolve them by activating some drained nodes - unrunnable_tasks = await auto_scaling_mode.list_unrunnable_tasks(app) - _logger.info("found %s unrunnable tasks", len(unrunnable_tasks)) - # NOTE: this function predicts how dask will assign a task to a machine - queued_or_missing_instance_tasks, cluster = await _assign_tasks_to_current_cluster( - app, unrunnable_tasks, cluster, auto_scaling_mode - ) - # 2. try to activate drained nodes to cover some of the tasks - cluster = await _activate_drained_nodes(app, cluster, auto_scaling_mode) + await auto_scaling_mode.try_retire_nodes(app) + cluster = await _deactivate_empty_nodes(app, cluster) + return await _try_scale_down_cluster(app, cluster) - # 3. start buffer instances to cover the remaining tasks - cluster = await _start_buffer_instances(app, cluster, auto_scaling_mode) - # 4. let's check if there are still pending tasks or if the reserve was used +async def _scale_up_cluster( + app: FastAPI, + cluster: Cluster, + auto_scaling_mode: BaseAutoscaling, + allowed_instance_types: list[EC2InstanceType], + unassigned_tasks: list, +) -> Cluster: app_settings = get_application_settings(app) assert app_settings.AUTOSCALING_EC2_INSTANCES # nosec - if queued_or_missing_instance_tasks or ( + if not unassigned_tasks and ( len(cluster.buffer_drained_nodes) - < app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER + >= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MACHINES_BUFFER ): - if ( - cluster.total_number_of_machines() - < app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES - ): - _logger.info( - "%s unrunnable tasks could not be assigned, slowly trying to scale up...", - len(queued_or_missing_instance_tasks), - ) - cluster = await _scale_up_cluster( - app, - cluster, - queued_or_missing_instance_tasks, - auto_scaling_mode, - allowed_instance_types, - ) + return cluster - elif ( - len(queued_or_missing_instance_tasks) == len(unrunnable_tasks) == 0 - and cluster.can_scale_down() + if ( + cluster.total_number_of_machines() + >= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES ): _logger.info( - "there is %s waiting task, slowly and gracefully scaling down...", - len(queued_or_missing_instance_tasks), + "cluster already hit the maximum allowed amount of instances (%s), not scaling up. " + "%s tasks will wait until instances are free.", + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + len(unassigned_tasks), + ) + return cluster + + # now we scale up + assert app_settings.AUTOSCALING_EC2_ACCESS # nosec + + # let's start these + if needed_ec2_instances := await _find_needed_instances( + app, unassigned_tasks, allowed_instance_types, cluster, auto_scaling_mode + ): + await auto_scaling_mode.log_message_from_tasks( + app, + unassigned_tasks, + "service is pending due to missing resources, scaling up cluster now...", + level=logging.INFO, ) - # NOTE: we only scale down in case we did not just scale up. The swarm needs some time to adjust - await auto_scaling_mode.try_retire_nodes(app) - cluster = await _deactivate_empty_nodes(app, cluster) - cluster = await _try_scale_down_cluster(app, cluster) + new_pending_instances = await _launch_instances( + app, needed_ec2_instances, unassigned_tasks, auto_scaling_mode + ) + cluster.pending_ec2s.extend( + [NonAssociatedInstance(ec2_instance=i) for i in new_pending_instances] + ) + # NOTE: to check the logs of UserData in EC2 instance + # run: tail -f -n 1000 /var/log/cloud-init-output.log in the instance return cluster +async def _autoscale_cluster( + app: FastAPI, + cluster: Cluster, + auto_scaling_mode: BaseAutoscaling, + allowed_instance_types: list[EC2InstanceType], +) -> Cluster: + # 1. check if we have pending tasks + unnasigned_pending_tasks = await auto_scaling_mode.list_unrunnable_tasks(app) + _logger.info("found %s pending tasks", len(unnasigned_pending_tasks)) + # NOTE: this function predicts how the backend will assign tasks + still_pending_tasks, cluster = await _assign_tasks_to_current_cluster( + app, unnasigned_pending_tasks, cluster, auto_scaling_mode + ) + + # 2. activate available drained nodes to cover some of the tasks + cluster = await _activate_drained_nodes(app, cluster, auto_scaling_mode) + + # 3. start buffer instances to cover the remaining tasks + cluster = await _start_buffer_instances(app, cluster, auto_scaling_mode) + + # 4. scale down unused instances + cluster = await _scale_down_unused_cluster_instances( + app, cluster, auto_scaling_mode + ) + + # 5. scale up if necessary + return await _scale_up_cluster( + app, cluster, auto_scaling_mode, allowed_instance_types, still_pending_tasks + ) + + async def _notify_autoscaling_status( app: FastAPI, cluster: Cluster, auto_scaling_mode: BaseAutoscaling ) -> None: diff --git a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py index 4758c91a12f..65caa0f40b1 100644 --- a/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py +++ b/services/autoscaling/src/simcore_service_autoscaling/utils/utils_docker.py @@ -82,19 +82,26 @@ ).validate_python("io.simcore.osparc-node-termination-started") +def _get_node_creation_date(node: Node) -> datetime.datetime: + assert node.created_at # nosec + return arrow.get(node.created_at).datetime + + async def get_monitored_nodes( docker_client: AutoscalingDocker, node_labels: list[DockerLabelKey] ) -> list[Node]: node_label_filters = [f"{label}=true" for label in node_labels] + [ f"{label}" for label in _OSPARC_SERVICE_READY_LABEL_KEYS ] - return TypeAdapter(list[Node]).validate_python( + list_of_nodes = TypeAdapter(list[Node]).validate_python( await docker_client.nodes.list(filters={"node.label": node_label_filters}) ) + list_of_nodes.sort(key=_get_node_creation_date) + return list_of_nodes async def get_worker_nodes(docker_client: AutoscalingDocker) -> list[Node]: - return TypeAdapter(list[Node]).validate_python( + list_of_nodes = TypeAdapter(list[Node]).validate_python( await docker_client.nodes.list( filters={ "role": ["worker"], @@ -104,6 +111,8 @@ async def get_worker_nodes(docker_client: AutoscalingDocker) -> list[Node]: } ) ) + list_of_nodes.sort(key=_get_node_creation_date) + return list_of_nodes async def remove_nodes( diff --git a/services/autoscaling/tests/unit/conftest.py b/services/autoscaling/tests/unit/conftest.py index eccee9967b5..4a48f2776b6 100644 --- a/services/autoscaling/tests/unit/conftest.py +++ b/services/autoscaling/tests/unit/conftest.py @@ -11,7 +11,7 @@ from collections.abc import AsyncIterator, Awaitable, Callable, Iterator from copy import deepcopy from pathlib import Path -from typing import Any, Final, cast, get_args +from typing import Any, Final, TypeAlias, cast, get_args from unittest import mock import aiodocker @@ -46,6 +46,7 @@ TaskSpec, ) from pydantic import ByteSize, PositiveInt, TypeAdapter +from pytest_mock import MockType from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.host import get_localhost_ip from pytest_simcore.helpers.logging_tools import log_context @@ -69,6 +70,7 @@ Cluster, DaskTaskResources, ) +from simcore_service_autoscaling.modules import auto_scaling_core from simcore_service_autoscaling.modules.docker import AutoscalingDocker from simcore_service_autoscaling.modules.ec2 import SimcoreEC2API from simcore_service_autoscaling.utils.utils_docker import ( @@ -176,7 +178,11 @@ def app_with_docker_join_drained( @pytest.fixture(scope="session") def fake_ssm_settings() -> SSMSettings: assert "json_schema_extra" in SSMSettings.model_config - return SSMSettings(**SSMSettings.model_config["json_schema_extra"]["examples"][0]) + assert isinstance(SSMSettings.model_config["json_schema_extra"], dict) + assert isinstance(SSMSettings.model_config["json_schema_extra"]["examples"], list) + return SSMSettings.model_validate( + SSMSettings.model_config["json_schema_extra"]["examples"][0] + ) @pytest.fixture @@ -220,6 +226,11 @@ def app_environment( delenvs_from_dict(monkeypatch, mock_env_devel_environment, raising=False) return setenvs_from_dict(monkeypatch, {**external_envfile_dict}) + assert "json_schema_extra" in EC2InstanceBootSpecific.model_config + assert isinstance(EC2InstanceBootSpecific.model_config["json_schema_extra"], dict) + assert isinstance( + EC2InstanceBootSpecific.model_config["json_schema_extra"]["examples"], list + ) envs = setenvs_from_dict( monkeypatch, { @@ -263,6 +274,11 @@ def mocked_ec2_instances_envs( aws_allowed_ec2_instance_type_names: list[InstanceTypeType], aws_instance_profile: str, ) -> EnvVarsDict: + assert "json_schema_extra" in EC2InstanceBootSpecific.model_config + assert isinstance(EC2InstanceBootSpecific.model_config["json_schema_extra"], dict) + assert isinstance( + EC2InstanceBootSpecific.model_config["json_schema_extra"]["examples"], list + ) envs = setenvs_from_dict( monkeypatch, { @@ -271,10 +287,13 @@ def mocked_ec2_instances_envs( "EC2_INSTANCES_SUBNET_ID": aws_subnet_id, "EC2_INSTANCES_ALLOWED_TYPES": json.dumps( { - ec2_type_name: random.choice( # noqa: S311 - EC2InstanceBootSpecific.model_config["json_schema_extra"][ - "examples" - ] + ec2_type_name: cast( + dict, + random.choice( # noqa: S311 + EC2InstanceBootSpecific.model_config["json_schema_extra"][ + "examples" + ] + ), ) | {"ami_id": aws_ami_id} for ec2_type_name in aws_allowed_ec2_instance_type_names @@ -491,22 +510,23 @@ def create_fake_node(faker: Faker) -> Callable[..., DockerNode]: def _creator(**node_overrides) -> DockerNode: default_config = { "ID": faker.uuid4(), - "Version": ObjectVersion(Index=faker.pyint()), - "CreatedAt": datetime.datetime.now(tz=datetime.timezone.utc).isoformat(), - "UpdatedAt": datetime.datetime.now(tz=datetime.timezone.utc).isoformat(), + "Version": ObjectVersion(index=faker.pyint()), + "CreatedAt": datetime.datetime.now(tz=datetime.UTC).isoformat(), + "UpdatedAt": datetime.datetime.now(tz=datetime.UTC).isoformat(), "Description": NodeDescription( - Hostname=faker.pystr(), - Resources=ResourceObject( - NanoCPUs=int(9 * 1e9), MemoryBytes=256 * 1024 * 1024 * 1024 + hostname=faker.pystr(), + resources=ResourceObject( + nano_cp_us=int(9 * 1e9), + memory_bytes=TypeAdapter(ByteSize).validate_python("256GiB"), ), ), "Spec": NodeSpec( - Name=None, - Labels=faker.pydict(allowed_types=(str,)), - Role=None, - Availability=Availability.drain, + name=None, + labels=faker.pydict(allowed_types=(str,)), + role=None, + availability=Availability.drain, ), - "Status": NodeStatus(State=NodeState.unknown, Message=None, Addr=None), + "Status": NodeStatus(state=NodeState.unknown, message=None, addr=None), } default_config.update(**node_overrides) return DockerNode(**default_config) @@ -529,7 +549,7 @@ def task_template() -> dict[str, Any]: _GIGA_NANO_CPU = 10**9 -NUM_CPUS = PositiveInt +NUM_CPUS: TypeAlias = PositiveInt @pytest.fixture @@ -704,6 +724,7 @@ async def _assert_wait_for_service_state( after=after_log(ctx.logger, logging.DEBUG), ) async def _() -> None: + assert service.id services = await async_docker_client.services.list( filters={"id": service.id} ) @@ -761,7 +782,9 @@ def aws_allowed_ec2_instance_type_names_env( @pytest.fixture def host_cpu_count() -> int: - return psutil.cpu_count() + cpus = psutil.cpu_count() + assert cpus is not None + return cpus @pytest.fixture @@ -853,9 +876,7 @@ async def _fake_set_node_availability( returned_node.spec.availability = ( Availability.active if available else Availability.drain ) - returned_node.updated_at = datetime.datetime.now( - tz=datetime.timezone.utc - ).isoformat() + returned_node.updated_at = datetime.datetime.now(tz=datetime.UTC).isoformat() return returned_node return mocker.patch( @@ -890,7 +911,7 @@ async def fake_tag_node( @pytest.fixture -def patch_ec2_client_launch_instancess_min_number_of_instances( +def patch_ec2_client_launch_instances_min_number_of_instances( mocker: MockerFixture, ) -> mock.Mock: """the moto library always returns min number of instances instead of max number of instances which makes @@ -954,7 +975,7 @@ def _creator( return AssociatedInstance( node=node, ec2_instance=fake_ec2_instance_data( - launch_time=datetime.datetime.now(datetime.timezone.utc) + launch_time=datetime.datetime.now(datetime.UTC) - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - datetime.timedelta( days=faker.pyint(min_value=0, max_value=100), @@ -1002,3 +1023,22 @@ def with_short_ec2_instances_max_start_time( "EC2_INSTANCES_MAX_START_TIME": f"{short_ec2_instance_max_start_time}", }, ) + + +@pytest.fixture +async def spied_cluster_analysis(mocker: MockerFixture) -> MockType: + return mocker.spy(auto_scaling_core, "_analyze_current_cluster") + + +@pytest.fixture +async def mocked_associate_ec2_instances_with_nodes(mocker: MockerFixture) -> mock.Mock: + async def _( + nodes: list[DockerNode], ec2_instances: list[EC2InstanceData] + ) -> tuple[list[AssociatedInstance], list[EC2InstanceData]]: + return [], ec2_instances + + return mocker.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.associate_ec2_instances_with_nodes", + autospec=True, + side_effect=_, + ) diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py index f9e0e4c416d..6e7a0d7c828 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_computational.py @@ -11,10 +11,10 @@ import datetime import logging from collections import defaultdict -from collections.abc import Callable, Iterator +from collections.abc import Awaitable, Callable, Iterator from copy import deepcopy from dataclasses import dataclass -from typing import Any +from typing import Any, Final, cast from unittest import mock import arrow @@ -32,7 +32,11 @@ from models_library.generated_models.docker_rest_api import NodeState, NodeStatus from models_library.rabbitmq_messages import RabbitAutoscalingStatusMessage from pydantic import ByteSize, TypeAdapter -from pytest_mock import MockerFixture +from pytest_mock import MockerFixture, MockType +from pytest_simcore.helpers.autoscaling import ( + assert_cluster_state, + create_fake_association, +) from pytest_simcore.helpers.aws_ec2 import assert_autoscaled_computational_ec2_instances from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict, setenvs_from_dict from simcore_service_autoscaling.core.settings import ApplicationSettings @@ -43,7 +47,6 @@ ) from simcore_service_autoscaling.modules.dask import DaskTaskResources from simcore_service_autoscaling.modules.docker import get_docker_client -from simcore_service_autoscaling.modules.ec2 import SimcoreEC2API from simcore_service_autoscaling.utils.utils_docker import ( _OSPARC_NODE_EMPTY_DATETIME_LABEL_KEY, _OSPARC_NODE_TERMINATION_PROCESS_LABEL_KEY, @@ -182,6 +185,126 @@ def ec2_instance_custom_tags( } +@pytest.fixture +def create_dask_task_resources() -> ( + Callable[[InstanceTypeType | None, Resources], DaskTaskResources] +): + def _do( + ec2_instance_type: InstanceTypeType | None, task_resource: Resources + ) -> DaskTaskResources: + resources = _dask_task_resources_from_resources(task_resource) + if ec2_instance_type is not None: + resources[create_ec2_resource_constraint_key(ec2_instance_type)] = 1 + return resources + + return _do + + +@pytest.fixture +def mock_dask_get_worker_has_results_in_memory(mocker: MockerFixture) -> mock.Mock: + return mocker.patch( + "simcore_service_autoscaling.modules.dask.get_worker_still_has_results_in_memory", + return_value=0, + autospec=True, + ) + + +@pytest.fixture +def mock_dask_get_worker_used_resources(mocker: MockerFixture) -> mock.Mock: + return mocker.patch( + "simcore_service_autoscaling.modules.dask.get_worker_used_resources", + return_value=Resources.create_as_empty(), + autospec=True, + ) + + +@pytest.fixture +def mock_dask_is_worker_connected(mocker: MockerFixture) -> mock.Mock: + return mocker.patch( + "simcore_service_autoscaling.modules.dask.is_worker_connected", + return_value=True, + autospec=True, + ) + + +async def _create_task_with_resources( + ec2_client: EC2Client, + dask_task_imposed_ec2_type: InstanceTypeType | None, + task_resources: Resources | None, + create_dask_task_resources: Callable[ + [InstanceTypeType | None, Resources], DaskTaskResources + ], + create_dask_task: Callable[[DaskTaskResources], distributed.Future], +) -> distributed.Future: + if dask_task_imposed_ec2_type and not task_resources: + instance_types = await ec2_client.describe_instance_types( + InstanceTypes=[dask_task_imposed_ec2_type] + ) + assert instance_types + assert "InstanceTypes" in instance_types + assert instance_types["InstanceTypes"] + assert "MemoryInfo" in instance_types["InstanceTypes"][0] + assert "SizeInMiB" in instance_types["InstanceTypes"][0]["MemoryInfo"] + task_resources = Resources( + cpus=1, + ram=TypeAdapter(ByteSize).validate_python( + f"{instance_types['InstanceTypes'][0]['MemoryInfo']['SizeInMiB']}MiB", + ), + ) + + assert task_resources + dask_task_resources = create_dask_task_resources( + dask_task_imposed_ec2_type, task_resources + ) + dask_future = create_dask_task(dask_task_resources) + assert dask_future + return dask_future + + +@dataclass(kw_only=True) +class _ScaleUpParams: + imposed_instance_type: InstanceTypeType | None + task_resources: Resources | None + num_tasks: int + expected_instance_type: InstanceTypeType + expected_num_instances: int + + +_RESOURCE_TO_DASK_RESOURCE_MAP: Final[dict[str, str]] = {"CPUS": "CPU", "RAM": "RAM"} + + +def _dask_task_resources_from_resources(resources: Resources) -> DaskTaskResources: + return { + _RESOURCE_TO_DASK_RESOURCE_MAP[res_key.upper()]: res_value + for res_key, res_value in resources.model_dump().items() + } + + +@pytest.fixture +async def create_tasks_batch( + ec2_client: EC2Client, + create_dask_task: Callable[[DaskTaskResources], distributed.Future], + create_dask_task_resources: Callable[ + [InstanceTypeType | None, Resources], DaskTaskResources + ], +) -> Callable[[_ScaleUpParams], Awaitable[list[distributed.Future]]]: + async def _(scale_up_params: _ScaleUpParams) -> list[distributed.Future]: + return await asyncio.gather( + *( + _create_task_with_resources( + ec2_client, + scale_up_params.imposed_instance_type, + scale_up_params.task_resources, + create_dask_task_resources, + create_dask_task, + ) + for _ in range(scale_up_params.num_tasks) + ) + ) + + return _ + + async def test_cluster_scaling_with_no_tasks_does_nothing( minimal_configuration: None, app_settings: ApplicationSettings, @@ -259,103 +382,52 @@ async def test_cluster_scaling_with_task_with_too_much_resources_starts_nothing( ) -@pytest.fixture -def create_dask_task_resources() -> Callable[..., DaskTaskResources]: - def _do( - ec2_instance_type: InstanceTypeType | None, ram: ByteSize - ) -> DaskTaskResources: - resources = DaskTaskResources( - { - "RAM": int(ram), - } - ) - if ec2_instance_type is not None: - resources[create_ec2_resource_constraint_key(ec2_instance_type)] = 1 - return resources - - return _do - - -@pytest.fixture -def mock_dask_get_worker_has_results_in_memory(mocker: MockerFixture) -> mock.Mock: - return mocker.patch( - "simcore_service_autoscaling.modules.dask.get_worker_still_has_results_in_memory", - return_value=0, - autospec=True, - ) - - -@pytest.fixture -def mock_dask_get_worker_used_resources(mocker: MockerFixture) -> mock.Mock: - return mocker.patch( - "simcore_service_autoscaling.modules.dask.get_worker_used_resources", - return_value=Resources.create_as_empty(), - autospec=True, - ) - - -@pytest.fixture -def mock_dask_is_worker_connected(mocker: MockerFixture) -> mock.Mock: - return mocker.patch( - "simcore_service_autoscaling.modules.dask.is_worker_connected", - return_value=True, - autospec=True, - ) - - -async def _create_task_with_resources( - ec2_client: EC2Client, - dask_task_imposed_ec2_type: InstanceTypeType | None, - dask_ram: ByteSize | None, - create_dask_task_resources: Callable[..., DaskTaskResources], - create_dask_task: Callable[[DaskTaskResources], distributed.Future], -) -> distributed.Future: - if dask_task_imposed_ec2_type and not dask_ram: - instance_types = await ec2_client.describe_instance_types( - InstanceTypes=[dask_task_imposed_ec2_type] - ) - assert instance_types - assert "InstanceTypes" in instance_types - assert instance_types["InstanceTypes"] - assert "MemoryInfo" in instance_types["InstanceTypes"][0] - assert "SizeInMiB" in instance_types["InstanceTypes"][0]["MemoryInfo"] - dask_ram = TypeAdapter(ByteSize).validate_python( - f"{instance_types['InstanceTypes'][0]['MemoryInfo']['SizeInMiB']}MiB", - ) - dask_task_resources = create_dask_task_resources( - dask_task_imposed_ec2_type, dask_ram - ) - dask_future = create_dask_task(dask_task_resources) - assert dask_future - return dask_future - - -@pytest.mark.acceptance_test() +@pytest.mark.acceptance_test @pytest.mark.parametrize( - "dask_task_imposed_ec2_type, dask_ram, expected_ec2_type", + "scale_up_params", [ pytest.param( - None, - TypeAdapter(ByteSize).validate_python("128Gib"), - "r5n.4xlarge", + _ScaleUpParams( + imposed_instance_type=None, + task_resources=Resources( + cpus=1, ram=TypeAdapter(ByteSize).validate_python("128Gib") + ), + num_tasks=1, + expected_instance_type="r5n.4xlarge", + expected_num_instances=1, + ), id="No explicit instance defined", ), pytest.param( - "g4dn.2xlarge", - None, - "g4dn.2xlarge", + _ScaleUpParams( + imposed_instance_type="g4dn.2xlarge", + task_resources=None, + num_tasks=1, + expected_instance_type="g4dn.2xlarge", + expected_num_instances=1, + ), id="Explicitely ask for g4dn.2xlarge and use all the resources", ), pytest.param( - "r5n.8xlarge", - TypeAdapter(ByteSize).validate_python("116Gib"), - "r5n.8xlarge", + _ScaleUpParams( + imposed_instance_type="r5n.8xlarge", + task_resources=Resources( + cpus=1, ram=TypeAdapter(ByteSize).validate_python("116Gib") + ), + num_tasks=1, + expected_instance_type="r5n.8xlarge", + expected_num_instances=1, + ), id="Explicitely ask for r5n.8xlarge and set the resources", ), pytest.param( - "r5n.8xlarge", - None, - "r5n.8xlarge", + _ScaleUpParams( + imposed_instance_type="r5n.8xlarge", + task_resources=None, + num_tasks=1, + expected_instance_type="r5n.8xlarge", + expected_num_instances=1, + ), id="Explicitely ask for r5n.8xlarge and use all the resources", ), ], @@ -364,7 +436,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 minimal_configuration: None, app_settings: ApplicationSettings, initialized_app: FastAPI, - create_dask_task: Callable[[DaskTaskResources], distributed.Future], + create_tasks_batch: Callable[[_ScaleUpParams], Awaitable[list[distributed.Future]]], ec2_client: EC2Client, mock_docker_tag_node: mock.Mock, fake_node: DockerNode, @@ -377,26 +449,17 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 mock_dask_is_worker_connected: mock.Mock, mocker: MockerFixture, dask_spec_local_cluster: distributed.SpecCluster, - create_dask_task_resources: Callable[..., DaskTaskResources], - dask_task_imposed_ec2_type: InstanceTypeType | None, - dask_ram: ByteSize | None, - expected_ec2_type: InstanceTypeType, with_drain_nodes_labelled: bool, ec2_instance_custom_tags: dict[str, str], + scale_up_params: _ScaleUpParams, ): # we have nothing running now all_instances = await ec2_client.describe_instances() assert not all_instances["Reservations"] # create a task that needs more power - dask_future = await _create_task_with_resources( - ec2_client, - dask_task_imposed_ec2_type, - dask_ram, - create_dask_task_resources, - create_dask_task, - ) - + dask_futures = await create_tasks_batch(scale_up_params) + assert dask_futures # this should trigger a scaling up as we have no nodes await auto_scale_cluster( app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() @@ -406,8 +469,8 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 await assert_autoscaled_computational_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) @@ -443,8 +506,8 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 instances = await assert_autoscaled_computational_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) @@ -456,9 +519,9 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 mock_docker_find_node_with_name_returns_fake_node.assert_called_once() mock_docker_find_node_with_name_returns_fake_node.reset_mock() expected_docker_node_tags = { - DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: expected_ec2_type + DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY: scale_up_params.expected_instance_type } - assert mock_docker_tag_node.call_count == 2 + assert mock_docker_tag_node.call_count == 3 assert fake_node.spec assert fake_node.spec.labels fake_attached_node = deepcopy(fake_node) @@ -525,7 +588,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 # now we have 1 monitored node that needs to be mocked fake_attached_node.spec.labels[_OSPARC_SERVICE_READY_LABEL_KEY] = "true" fake_attached_node.status = NodeStatus( - State=NodeState.ready, Message=None, Addr=None + state=NodeState.ready, message=None, addr=None ) fake_attached_node.spec.availability = Availability.active assert fake_attached_node.description @@ -557,14 +620,15 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 assert mock_dask_get_worker_used_resources.call_count == 2 * num_useless_calls mock_dask_get_worker_used_resources.reset_mock() mock_docker_find_node_with_name_returns_fake_node.assert_not_called() - mock_docker_tag_node.assert_not_called() + assert mock_docker_tag_node.call_count == num_useless_calls + mock_docker_tag_node.reset_mock() mock_docker_set_node_availability.assert_not_called() # check the number of instances did not change and is still running await assert_autoscaled_computational_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) @@ -577,7 +641,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 # # 4. now scaling down, as we deleted all the tasks # - del dask_future + del dask_futures await auto_scale_cluster(app=initialized_app, auto_scaling_mode=auto_scaling_mode) mock_dask_is_worker_connected.assert_called_once() mock_dask_is_worker_connected.reset_mock() @@ -647,8 +711,8 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 await assert_autoscaled_computational_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) @@ -658,7 +722,7 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 fake_attached_node.spec.labels[_OSPARC_SERVICE_READY_LABEL_KEY] = "false" fake_attached_node.spec.labels[ _OSPARC_SERVICES_READY_DATETIME_LABEL_KEY - ] = datetime.datetime.now(tz=datetime.timezone.utc).isoformat() + ] = datetime.datetime.now(tz=datetime.UTC).isoformat() # the node will be not be terminated before the timeout triggers assert app_settings.AUTOSCALING_EC2_INSTANCES @@ -676,15 +740,15 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 await assert_autoscaled_computational_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) # now changing the last update timepoint will trigger the node removal and shutdown the ec2 instance fake_attached_node.spec.labels[_OSPARC_SERVICES_READY_DATETIME_LABEL_KEY] = ( - datetime.datetime.now(tz=datetime.timezone.utc) + datetime.datetime.now(tz=datetime.UTC) - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION - datetime.timedelta(seconds=1) ).isoformat() @@ -694,8 +758,8 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 await assert_autoscaled_computational_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) @@ -726,8 +790,8 @@ async def test_cluster_scaling_up_and_down( # noqa: PLR0915 await assert_autoscaled_computational_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="terminated", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) @@ -741,7 +805,9 @@ async def test_cluster_does_not_scale_up_if_defined_instance_is_not_allowed( app_settings: ApplicationSettings, initialized_app: FastAPI, create_dask_task: Callable[[DaskTaskResources], distributed.Future], - create_dask_task_resources: Callable[..., DaskTaskResources], + create_dask_task_resources: Callable[ + [InstanceTypeType | None, Resources], DaskTaskResources + ], ec2_client: EC2Client, faker: Faker, caplog: pytest.LogCaptureFixture, @@ -752,7 +818,8 @@ async def test_cluster_does_not_scale_up_if_defined_instance_is_not_allowed( # create a task that needs more power dask_task_resources = create_dask_task_resources( - faker.pystr(), TypeAdapter(ByteSize).validate_python("128GiB") + cast(InstanceTypeType, faker.pystr()), + Resources(cpus=1, ram=TypeAdapter(ByteSize).validate_python("128GiB")), ) dask_future = create_dask_task(dask_task_resources) assert dask_future @@ -777,7 +844,9 @@ async def test_cluster_does_not_scale_up_if_defined_instance_is_not_fitting_reso app_settings: ApplicationSettings, initialized_app: FastAPI, create_dask_task: Callable[[DaskTaskResources], distributed.Future], - create_dask_task_resources: Callable[..., DaskTaskResources], + create_dask_task_resources: Callable[ + [InstanceTypeType | None, Resources], DaskTaskResources + ], ec2_client: EC2Client, faker: Faker, caplog: pytest.LogCaptureFixture, @@ -788,7 +857,8 @@ async def test_cluster_does_not_scale_up_if_defined_instance_is_not_fitting_reso # create a task that needs more power dask_task_resources = create_dask_task_resources( - "t2.xlarge", TypeAdapter(ByteSize).validate_python("128GiB") + "t2.xlarge", + Resources(cpus=1, ram=TypeAdapter(ByteSize).validate_python("128GiB")), ) dask_future = create_dask_task(dask_task_resources) assert dask_future @@ -808,47 +878,12 @@ async def test_cluster_does_not_scale_up_if_defined_instance_is_not_fitting_reso assert "Unexpected error:" in error_messages[0] -@dataclass(frozen=True) -class _ScaleUpParams: - task_resources: Resources - num_tasks: int - expected_instance_type: str - expected_num_instances: int - - -def _dask_task_resources_from_resources(resources: Resources) -> DaskTaskResources: - return { - res_key.upper(): res_value - for res_key, res_value in resources.model_dump().items() - } - - -@pytest.fixture -def patch_ec2_client_launch_instancess_min_number_of_instances( - mocker: MockerFixture, -) -> mock.Mock: - """the moto library always returns min number of instances instead of max number of instances which makes - it difficult to test scaling to multiple of machines. this should help""" - original_fct = SimcoreEC2API.launch_instances - - async def _change_parameters(*args, **kwargs) -> list[EC2InstanceData]: - new_kwargs = kwargs | {"min_number_of_instances": kwargs["number_of_instances"]} - print(f"patching launch_instances with: {new_kwargs}") - return await original_fct(*args, **new_kwargs) - - return mocker.patch.object( - SimcoreEC2API, - "launch_instances", - autospec=True, - side_effect=_change_parameters, - ) - - @pytest.mark.parametrize( "scale_up_params", [ pytest.param( _ScaleUpParams( + imposed_instance_type=None, task_resources=Resources( cpus=5, ram=TypeAdapter(ByteSize).validate_python("36Gib") ), @@ -861,11 +896,11 @@ async def _change_parameters(*args, **kwargs) -> list[EC2InstanceData]: ], ) async def test_cluster_scaling_up_starts_multiple_instances( - patch_ec2_client_launch_instancess_min_number_of_instances: mock.Mock, + patch_ec2_client_launch_instances_min_number_of_instances: mock.Mock, minimal_configuration: None, app_settings: ApplicationSettings, initialized_app: FastAPI, - create_dask_task: Callable[[DaskTaskResources], distributed.Future], + create_tasks_batch: Callable[[_ScaleUpParams], Awaitable[list[distributed.Future]]], ec2_client: EC2Client, mock_docker_tag_node: mock.Mock, scale_up_params: _ScaleUpParams, @@ -880,16 +915,7 @@ async def test_cluster_scaling_up_starts_multiple_instances( assert not all_instances["Reservations"] # create several tasks that needs more power - dask_futures = await asyncio.gather( - *( - asyncio.get_event_loop().run_in_executor( - None, - create_dask_task, - _dask_task_resources_from_resources(scale_up_params.task_resources), - ) - for _ in range(scale_up_params.num_tasks) - ) - ) + dask_futures = await create_tasks_batch(scale_up_params) assert dask_futures # run the code @@ -902,7 +928,7 @@ async def test_cluster_scaling_up_starts_multiple_instances( ec2_client, expected_num_reservations=1, expected_num_instances=scale_up_params.expected_num_instances, - expected_instance_type="g3.4xlarge", + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) @@ -922,15 +948,29 @@ async def test_cluster_scaling_up_starts_multiple_instances( mock_rabbitmq_post_message.reset_mock() +@pytest.mark.parametrize( + "scale_up_params", + [ + pytest.param( + _ScaleUpParams( + imposed_instance_type="r5n.8xlarge", + task_resources=None, + num_tasks=1, + expected_instance_type="r5n.8xlarge", + expected_num_instances=1, + ), + id="Impose r5n.8xlarge without resources", + ), + ], +) async def test_cluster_scaling_up_more_than_allowed_max_starts_max_instances_and_not_more( - patch_ec2_client_launch_instancess_min_number_of_instances: mock.Mock, + patch_ec2_client_launch_instances_min_number_of_instances: mock.Mock, minimal_configuration: None, app_settings: ApplicationSettings, initialized_app: FastAPI, - create_dask_task: Callable[[DaskTaskResources], distributed.Future], + create_tasks_batch: Callable[[_ScaleUpParams], Awaitable[list[distributed.Future]]], ec2_client: EC2Client, dask_spec_local_cluster: distributed.SpecCluster, - create_dask_task_resources: Callable[..., DaskTaskResources], mock_docker_tag_node: mock.Mock, mock_rabbitmq_post_message: mock.Mock, mock_docker_find_node_with_name_returns_fake_node: mock.Mock, @@ -939,29 +979,23 @@ async def test_cluster_scaling_up_more_than_allowed_max_starts_max_instances_and mock_dask_get_worker_has_results_in_memory: mock.Mock, mock_dask_get_worker_used_resources: mock.Mock, ec2_instance_custom_tags: dict[str, str], + scale_up_params: _ScaleUpParams, ): - ec2_instance_type = "r5n.8xlarge" - # we have nothing running now all_instances = await ec2_client.describe_instances() assert not all_instances["Reservations"] assert app_settings.AUTOSCALING_EC2_INSTANCES assert app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES > 0 - num_tasks = 3 * app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + # override the number of tasks + scale_up_params.num_tasks = ( + 3 * app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + ) + scale_up_params.expected_num_instances = ( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + ) # create the tasks - task_futures = await asyncio.gather( - *( - _create_task_with_resources( - ec2_client, - ec2_instance_type, - None, - create_dask_task_resources, - create_dask_task, - ) - for _ in range(num_tasks) - ) - ) + task_futures = await create_tasks_batch(scale_up_params) assert all(task_futures) # this should trigger a scaling up as we have no nodes @@ -971,8 +1005,8 @@ async def test_cluster_scaling_up_more_than_allowed_max_starts_max_instances_and await assert_autoscaled_computational_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, - expected_instance_type=ec2_instance_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) @@ -990,7 +1024,7 @@ async def test_cluster_scaling_up_more_than_allowed_max_starts_max_instances_and initialized_app, dask_spec_local_cluster.scheduler_address, instances_running=0, - instances_pending=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + instances_pending=scale_up_params.expected_num_instances, ) mock_rabbitmq_post_message.reset_mock() @@ -1003,22 +1037,24 @@ async def test_cluster_scaling_up_more_than_allowed_max_starts_max_instances_and await assert_autoscaled_computational_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, - expected_instance_type=ec2_instance_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) async def test_cluster_scaling_up_more_than_allowed_with_multiple_types_max_starts_max_instances_and_not_more( - patch_ec2_client_launch_instancess_min_number_of_instances: mock.Mock, + patch_ec2_client_launch_instances_min_number_of_instances: mock.Mock, minimal_configuration: None, app_settings: ApplicationSettings, initialized_app: FastAPI, create_dask_task: Callable[[DaskTaskResources], distributed.Future], ec2_client: EC2Client, dask_spec_local_cluster: distributed.SpecCluster, - create_dask_task_resources: Callable[..., DaskTaskResources], + create_dask_task_resources: Callable[ + [InstanceTypeType | None, Resources], DaskTaskResources + ], mock_docker_tag_node: mock.Mock, mock_rabbitmq_post_message: mock.Mock, mock_docker_find_node_with_name_returns_fake_node: mock.Mock, @@ -1106,12 +1142,18 @@ async def test_cluster_scaling_up_more_than_allowed_with_multiple_types_max_star @pytest.mark.parametrize( - "dask_task_imposed_ec2_type, dask_ram, expected_ec2_type", + "scale_up_params", [ pytest.param( - None, - TypeAdapter(ByteSize).validate_python("128Gib"), - "r5n.4xlarge", + _ScaleUpParams( + imposed_instance_type=None, + task_resources=Resources( + cpus=1, ram=TypeAdapter(ByteSize).validate_python("128Gib") + ), + num_tasks=1, + expected_instance_type="r5n.4xlarge", + expected_num_instances=1, + ), id="No explicit instance defined", ), ], @@ -1121,18 +1163,15 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( minimal_configuration: None, app_settings: ApplicationSettings, initialized_app: FastAPI, - create_dask_task: Callable[[DaskTaskResources], distributed.Future], + create_tasks_batch: Callable[[_ScaleUpParams], Awaitable[list[distributed.Future]]], ec2_client: EC2Client, - dask_task_imposed_ec2_type: InstanceTypeType | None, - dask_ram: ByteSize | None, - create_dask_task_resources: Callable[..., DaskTaskResources], dask_spec_local_cluster: distributed.SpecCluster, - expected_ec2_type: InstanceTypeType, mock_find_node_with_name_returns_none: mock.Mock, mock_docker_tag_node: mock.Mock, mock_rabbitmq_post_message: mock.Mock, short_ec2_instance_max_start_time: datetime.timedelta, ec2_instance_custom_tags: dict[str, str], + scale_up_params: _ScaleUpParams, ): assert app_settings.AUTOSCALING_EC2_INSTANCES assert ( @@ -1143,14 +1182,9 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( all_instances = await ec2_client.describe_instances() assert not all_instances["Reservations"] # create a task that needs more power - dask_future = await _create_task_with_resources( - ec2_client, - dask_task_imposed_ec2_type, - dask_ram, - create_dask_task_resources, - create_dask_task, - ) - assert dask_future + dask_futures = await create_tasks_batch(scale_up_params) + assert dask_futures + # this should trigger a scaling up as we have no nodes await auto_scale_cluster( app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() @@ -1160,8 +1194,8 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( instances = await assert_autoscaled_computational_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) @@ -1203,8 +1237,8 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( instances = await assert_autoscaled_computational_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), ) @@ -1216,7 +1250,7 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( initialized_app, dask_spec_local_cluster.scheduler_address, instances_running=0, - instances_pending=1, + instances_pending=scale_up_params.expected_num_instances, ) mock_rabbitmq_post_message.reset_mock() assert instances @@ -1247,7 +1281,10 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( all_instances = await ec2_client.describe_instances() assert len(all_instances["Reservations"]) == 2 assert "Instances" in all_instances["Reservations"][0] - assert len(all_instances["Reservations"][0]["Instances"]) == 1 + assert ( + len(all_instances["Reservations"][0]["Instances"]) + == scale_up_params.expected_num_instances + ) assert "State" in all_instances["Reservations"][0]["Instances"][0] assert "Name" in all_instances["Reservations"][0]["Instances"][0]["State"] assert ( @@ -1256,9 +1293,304 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( ) assert "Instances" in all_instances["Reservations"][1] - assert len(all_instances["Reservations"][1]["Instances"]) == 1 + assert ( + len(all_instances["Reservations"][1]["Instances"]) + == scale_up_params.expected_num_instances + ) assert "State" in all_instances["Reservations"][1]["Instances"][0] assert "Name" in all_instances["Reservations"][1]["Instances"][0]["State"] assert ( all_instances["Reservations"][1]["Instances"][0]["State"]["Name"] == "running" ) + + +@pytest.mark.parametrize( + "with_docker_join_drained", ["with_AUTOSCALING_DOCKER_JOIN_DRAINED"], indirect=True +) +@pytest.mark.parametrize( + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) +@pytest.mark.parametrize( + "scale_up_params1, scale_up_params2", + [ + pytest.param( + _ScaleUpParams( + imposed_instance_type="g3.4xlarge", # 1 GPU, 16 CPUs, 122GiB + task_resources=Resources( + cpus=16, ram=TypeAdapter(ByteSize).validate_python("30Gib") + ), + num_tasks=12, + expected_instance_type="g3.4xlarge", # 1 GPU, 16 CPUs, 122GiB + expected_num_instances=10, + ), + _ScaleUpParams( + imposed_instance_type="g4dn.8xlarge", # 32CPUs, 128GiB + task_resources=Resources( + cpus=32, ram=TypeAdapter(ByteSize).validate_python("20480MB") + ), + num_tasks=7, + expected_instance_type="g4dn.8xlarge", # 32CPUs, 128GiB + expected_num_instances=7, + ), + id="A batch of services requiring g3.4xlarge and a batch requiring g4dn.8xlarge", + ), + ], +) +async def test_cluster_adapts_machines_on_the_fly( + patch_ec2_client_launch_instances_min_number_of_instances: mock.Mock, + minimal_configuration: None, + ec2_client: EC2Client, + initialized_app: FastAPI, + app_settings: ApplicationSettings, + create_tasks_batch: Callable[[_ScaleUpParams], Awaitable[list[distributed.Future]]], + ec2_instance_custom_tags: dict[str, str], + scale_up_params1: _ScaleUpParams, + scale_up_params2: _ScaleUpParams, + mocked_associate_ec2_instances_with_nodes: mock.Mock, + mock_docker_set_node_availability: mock.Mock, + mock_dask_is_worker_connected: mock.Mock, + create_fake_node: Callable[..., DockerNode], + mock_docker_tag_node: mock.Mock, + spied_cluster_analysis: MockType, + mocker: MockerFixture, +): + # pre-requisites + assert app_settings.AUTOSCALING_EC2_INSTANCES + assert app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES > 0 + assert ( + scale_up_params1.num_tasks + >= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + ), "this test requires to run a first batch of more services than the maximum number of instances allowed" + # we have nothing running now + all_instances = await ec2_client.describe_instances() + assert not all_instances["Reservations"] + + # + # 1. create the first batch of services requiring the initial machines + first_batch_tasks = await create_tasks_batch(scale_up_params1) + assert first_batch_tasks + + # it will only scale once and do nothing else + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + await assert_autoscaled_computational_ec2_instances( + ec2_client, + expected_num_reservations=1, + expected_num_instances=scale_up_params1.expected_num_instances, + expected_instance_type=scale_up_params1.expected_instance_type, + expected_instance_state="running", + expected_additional_tag_keys=list(ec2_instance_custom_tags), + ) + + assert_cluster_state( + spied_cluster_analysis, + expected_calls=1, + expected_num_machines=0, + ) + mocked_associate_ec2_instances_with_nodes.assert_called_once_with([], []) + mocked_associate_ec2_instances_with_nodes.reset_mock() + mocked_associate_ec2_instances_with_nodes.side_effect = create_fake_association( + create_fake_node, None, None + ) + mock_docker_tag_node.assert_not_called() + mock_dask_is_worker_connected.assert_not_called() + + # + # 2. now the machines are associated + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + analyzed_cluster = assert_cluster_state( + spied_cluster_analysis, + expected_calls=1, + expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + ) + mocked_associate_ec2_instances_with_nodes.assert_called_once() + mock_docker_tag_node.assert_called() + assert ( + mock_docker_tag_node.call_count + == app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + ) + assert analyzed_cluster.active_nodes + + # + # 3. now we start the second batch of services requiring a different type of machines + second_batch_tasks = await create_tasks_batch(scale_up_params2) + assert second_batch_tasks + + # scaling will do nothing since we have hit the maximum number of machines + for _ in range(3): + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + await assert_autoscaled_computational_ec2_instances( + ec2_client, + expected_num_reservations=1, + expected_num_instances=scale_up_params1.expected_num_instances, + expected_instance_type=scale_up_params1.expected_instance_type, + expected_instance_state="running", + expected_additional_tag_keys=list(ec2_instance_custom_tags), + ) + analyzed_cluster = assert_cluster_state( + spied_cluster_analysis, + expected_calls=3, + expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + ) + assert analyzed_cluster.active_nodes + assert not analyzed_cluster.drained_nodes + + # + # 4.now we simulate that some of the services in the 1st batch have completed and that we are 1 below the max + # a machine should switch off and another type should be started (just pop the future out of scope) + for _ in range( + scale_up_params1.num_tasks + - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + + 1 + ): + first_batch_tasks.pop() + + # first call to auto_scale_cluster will mark 1 node as empty + with mock.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.set_node_found_empty", + autospec=True, + ) as mock_docker_set_node_found_empty: + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + analyzed_cluster = assert_cluster_state( + spied_cluster_analysis, + expected_calls=1, + expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + ) + assert analyzed_cluster.active_nodes + assert not analyzed_cluster.drained_nodes + # the last machine is found empty + mock_docker_set_node_found_empty.assert_called_with( + mock.ANY, + analyzed_cluster.active_nodes[-1].node, + empty=True, + ) + + # now we mock the get_node_found_empty so the next call will actually drain the machine + with mock.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.get_node_empty_since", + autospec=True, + return_value=arrow.utcnow().datetime + - 1.5 + * app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING, + ) as mocked_get_node_empty_since: + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + mocked_get_node_empty_since.assert_called_once() + analyzed_cluster = assert_cluster_state( + spied_cluster_analysis, + expected_calls=1, + expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + ) + assert analyzed_cluster.active_nodes + assert not analyzed_cluster.drained_nodes + # now scaling again should find the drained machine + drained_machine_instance_id = analyzed_cluster.active_nodes[-1].ec2_instance.id + mocked_associate_ec2_instances_with_nodes.side_effect = create_fake_association( + create_fake_node, drained_machine_instance_id, None + ) + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + analyzed_cluster = assert_cluster_state( + spied_cluster_analysis, + expected_calls=1, + expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + ) + assert analyzed_cluster.active_nodes + assert analyzed_cluster.drained_nodes + + # this will initiate termination now + with mock.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.get_node_last_readyness_update", + autospec=True, + return_value=arrow.utcnow().datetime + - 1.5 + * app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION, + ): + mock_docker_tag_node.reset_mock() + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + analyzed_cluster = assert_cluster_state( + spied_cluster_analysis, + expected_calls=1, + expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + ) + mock_docker_tag_node.assert_called_with( + mock.ANY, + analyzed_cluster.drained_nodes[-1].node, + tags=mock.ANY, + available=False, + ) + + # scaling again should find the terminating machine + mocked_associate_ec2_instances_with_nodes.side_effect = create_fake_association( + create_fake_node, drained_machine_instance_id, drained_machine_instance_id + ) + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + analyzed_cluster = assert_cluster_state( + spied_cluster_analysis, + expected_calls=1, + expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + ) + assert analyzed_cluster.active_nodes + assert not analyzed_cluster.drained_nodes + assert analyzed_cluster.terminating_nodes + + # now this will terminate it and straight away start a new machine type + with mock.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.get_node_termination_started_since", + autospec=True, + return_value=arrow.utcnow().datetime + - 1.5 + * app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION, + ): + mocked_docker_remove_node = mocker.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.remove_nodes", + return_value=None, + autospec=True, + ) + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=ComputationalAutoscaling() + ) + mocked_docker_remove_node.assert_called_once() + + # now let's check what we have + all_instances = await ec2_client.describe_instances() + assert len(all_instances["Reservations"]) == 2, "there should be 2 Reservations" + reservation1 = all_instances["Reservations"][0] + assert "Instances" in reservation1 + assert len(reservation1["Instances"]) == ( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + ), f"expected {app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES} EC2 instances, found {len(reservation1['Instances'])}" + for instance in reservation1["Instances"]: + assert "InstanceType" in instance + assert instance["InstanceType"] == scale_up_params1.expected_instance_type + assert "InstanceId" in instance + assert "State" in instance + assert "Name" in instance["State"] + if instance["InstanceId"] == drained_machine_instance_id: + assert instance["State"]["Name"] == "terminated" + else: + assert instance["State"]["Name"] == "running" + + reservation2 = all_instances["Reservations"][1] + assert "Instances" in reservation2 + assert ( + len(reservation2["Instances"]) == 1 + ), f"expected 1 EC2 instances, found {len(reservation2['Instances'])}" + for instance in reservation2["Instances"]: + assert "InstanceType" in instance + assert instance["InstanceType"] == scale_up_params2.expected_instance_type diff --git a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py index 461baee21fa..4aa6c302fca 100644 --- a/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py +++ b/services/autoscaling/tests/unit/test_modules_auto_scaling_dynamic.py @@ -9,6 +9,7 @@ import asyncio import datetime import logging +import random from collections.abc import AsyncIterator, Awaitable, Callable, Iterator, Sequence from copy import deepcopy from dataclasses import dataclass @@ -38,12 +39,15 @@ from pydantic import ByteSize, TypeAdapter from pytest_mock import MockType from pytest_mock.plugin import MockerFixture +from pytest_simcore.helpers.autoscaling import ( + assert_cluster_state, + create_fake_association, +) from pytest_simcore.helpers.aws_ec2 import assert_autoscaled_dynamic_ec2_instances from pytest_simcore.helpers.logging_tools import log_context from pytest_simcore.helpers.monkeypatch_envs import EnvVarsDict from simcore_service_autoscaling.core.settings import ApplicationSettings from simcore_service_autoscaling.models import AssociatedInstance, Cluster -from simcore_service_autoscaling.modules import auto_scaling_core from simcore_service_autoscaling.modules.auto_scaling_core import ( _activate_drained_nodes, _find_terminateable_instances, @@ -217,24 +221,6 @@ def _assert_rabbit_autoscaling_message_sent( assert mock_rabbitmq_post_message.call_args == mock.call(app, expected_message) -async def test_cluster_scaling_with_no_services_does_nothing( - minimal_configuration: None, - app_settings: ApplicationSettings, - initialized_app: FastAPI, - mock_launch_instances: mock.Mock, - mock_terminate_instances: mock.Mock, - mock_rabbitmq_post_message: mock.Mock, -): - await auto_scale_cluster( - app=initialized_app, auto_scaling_mode=DynamicAutoscaling() - ) - mock_launch_instances.assert_not_called() - mock_terminate_instances.assert_not_called() - _assert_rabbit_autoscaling_message_sent( - mock_rabbitmq_post_message, app_settings, initialized_app - ) - - @pytest.fixture def instance_type_filters( ec2_instance_custom_tags: dict[str, str], @@ -254,13 +240,72 @@ def instance_type_filters( ] +@dataclass(frozen=True) +class _ScaleUpParams: + imposed_instance_type: InstanceTypeType | None + service_resources: Resources + num_services: int + expected_instance_type: InstanceTypeType + expected_num_instances: int + + @pytest.fixture -async def spied_cluster_analysis(mocker: MockerFixture) -> MockType: - return mocker.spy(auto_scaling_core, "_analyze_current_cluster") +async def create_services_batch( + create_service: Callable[ + [dict[str, Any], dict[DockerLabelKey, str], str, list[str]], Awaitable[Service] + ], + task_template: dict[str, Any], + create_task_reservations: Callable[[int, int], dict[str, Any]], + service_monitored_labels: dict[DockerLabelKey, str], + osparc_docker_label_keys: StandardSimcoreDockerLabels, +) -> Callable[[_ScaleUpParams], Awaitable[list[Service]]]: + async def _(scale_up_params: _ScaleUpParams) -> list[Service]: + return await asyncio.gather( + *( + create_service( + task_template + | create_task_reservations( + int(scale_up_params.service_resources.cpus), + scale_up_params.service_resources.ram, + ), + service_monitored_labels + | osparc_docker_label_keys.to_simcore_runtime_docker_labels(), + "pending", + ( + [ + f"node.labels.{DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY}=={scale_up_params.imposed_instance_type}" + ] + if scale_up_params.imposed_instance_type + else [] + ), + ) + for _ in range(scale_up_params.num_services) + ) + ) + + return _ + + +async def test_cluster_scaling_with_no_services_does_nothing( + minimal_configuration: None, + app_settings: ApplicationSettings, + initialized_app: FastAPI, + mock_launch_instances: mock.Mock, + mock_terminate_instances: mock.Mock, + mock_rabbitmq_post_message: mock.Mock, +): + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + ) + mock_launch_instances.assert_not_called() + mock_terminate_instances.assert_not_called() + _assert_rabbit_autoscaling_message_sent( + mock_rabbitmq_post_message, app_settings, initialized_app + ) async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expected_machines( - patch_ec2_client_launch_instancess_min_number_of_instances: mock.Mock, + patch_ec2_client_launch_instances_min_number_of_instances: mock.Mock, minimal_configuration: None, mock_machines_buffer: int, app_settings: ApplicationSettings, @@ -362,28 +407,34 @@ async def test_cluster_scaling_with_no_services_and_machine_buffer_starts_expect ) +@pytest.mark.parametrize( + "scale_up_params", + [ + pytest.param( + _ScaleUpParams( + imposed_instance_type=None, + service_resources=Resources( + cpus=4, ram=TypeAdapter(ByteSize).validate_python("128000Gib") + ), + num_services=1, + expected_instance_type="r5n.4xlarge", + expected_num_instances=1, + ), + id="No explicit instance defined", + ), + ], +) async def test_cluster_scaling_with_service_asking_for_too_much_resources_starts_nothing( minimal_configuration: None, - service_monitored_labels: dict[DockerLabelKey, str], app_settings: ApplicationSettings, initialized_app: FastAPI, - create_service: Callable[ - [dict[str, Any], dict[DockerLabelKey, str], str], Awaitable[Service] - ], - task_template: dict[str, Any], - create_task_reservations: Callable[[int, int], dict[str, Any]], + create_services_batch: Callable[[_ScaleUpParams], Awaitable[list[Service]]], mock_launch_instances: mock.Mock, mock_terminate_instances: mock.Mock, mock_rabbitmq_post_message: mock.Mock, + scale_up_params: _ScaleUpParams, ): - task_template_with_too_many_resource = task_template | create_task_reservations( - 1000, 0 - ) - await create_service( - task_template_with_too_many_resource, - service_monitored_labels, - "pending", - ) + await create_services_batch(scale_up_params) await auto_scale_cluster( app=initialized_app, auto_scaling_mode=DynamicAutoscaling() @@ -395,38 +446,11 @@ async def test_cluster_scaling_with_service_asking_for_too_much_resources_starts ) -@dataclass(frozen=True) -class _ScaleUpParams: - imposed_instance_type: str | None - service_resources: Resources - num_services: int - expected_instance_type: InstanceTypeType - expected_num_instances: int - - -def _assert_cluster_state( - spied_cluster_analysis: MockType, *, expected_calls: int, expected_num_machines: int -) -> None: - assert spied_cluster_analysis.call_count > 0 - - assert isinstance(spied_cluster_analysis.spy_return, Cluster) - assert ( - spied_cluster_analysis.spy_return.total_number_of_machines() - == expected_num_machines - ) - - async def _test_cluster_scaling_up_and_down( # noqa: PLR0915 *, - service_monitored_labels: dict[DockerLabelKey, str], - osparc_docker_label_keys: StandardSimcoreDockerLabels, app_settings: ApplicationSettings, initialized_app: FastAPI, - create_service: Callable[ - [dict[str, Any], dict[DockerLabelKey, str], str, list[str]], Awaitable[Service] - ], - task_template: dict[str, Any], - create_task_reservations: Callable[[int, int], dict[str, Any]], + create_services_batch: Callable[[_ScaleUpParams], Awaitable[list[Service]]], ec2_client: EC2Client, mock_docker_tag_node: mock.Mock, fake_node: Node, @@ -452,34 +476,13 @@ async def _test_cluster_scaling_up_and_down( # noqa: PLR0915 ), "This test is not made to work with more than 1 expected instance. so please adapt if needed" # create the service(s) - created_docker_services = await asyncio.gather( - *( - create_service( - task_template - | create_task_reservations( - int(scale_up_params.service_resources.cpus), - scale_up_params.service_resources.ram, - ), - service_monitored_labels - | osparc_docker_label_keys.to_simcore_runtime_docker_labels(), - "pending", - ( - [ - f"node.labels.{DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY}=={scale_up_params.imposed_instance_type}" - ] - if scale_up_params.imposed_instance_type - else [] - ), - ) - for _ in range(scale_up_params.num_services) - ) - ) + created_docker_services = await create_services_batch(scale_up_params) # this should trigger a scaling up as we have no nodes await auto_scale_cluster( app=initialized_app, auto_scaling_mode=DynamicAutoscaling() ) - _assert_cluster_state( + assert_cluster_state( spied_cluster_analysis, expected_calls=1, expected_num_machines=0 ) @@ -528,7 +531,7 @@ async def _assert_wait_for_ec2_instances_running() -> list[InstanceTypeDef]: await auto_scale_cluster( app=initialized_app, auto_scaling_mode=DynamicAutoscaling() ) - _assert_cluster_state( + assert_cluster_state( spied_cluster_analysis, expected_calls=1, expected_num_machines=1 ) @@ -556,7 +559,7 @@ async def _assert_wait_for_ec2_instances_running() -> list[InstanceTypeDef]: mock_find_node_with_name_returns_fake_node.assert_called_once() mock_find_node_with_name_returns_fake_node.reset_mock() - assert mock_docker_tag_node.call_count == 2 + assert mock_docker_tag_node.call_count == 3 assert fake_node.spec assert fake_node.spec.labels # check attach call @@ -653,7 +656,7 @@ async def _assert_wait_for_ec2_instances_running() -> list[InstanceTypeDef]: # now we have 1 monitored node that needs to be mocked fake_attached_node.spec.labels[_OSPARC_SERVICE_READY_LABEL_KEY] = "true" fake_attached_node.status = NodeStatus( - State=NodeState.ready, Message=None, Addr=None + state=NodeState.ready, message=None, addr=None ) fake_attached_node.spec.availability = Availability.active fake_attached_node.description.hostname = internal_dns_name @@ -676,7 +679,8 @@ async def _assert_wait_for_ec2_instances_running() -> list[InstanceTypeDef]: assert mock_compute_node_used_resources.call_count == num_useless_calls * 2 mock_compute_node_used_resources.reset_mock() mock_find_node_with_name_returns_fake_node.assert_not_called() - mock_docker_tag_node.assert_not_called() + assert mock_docker_tag_node.call_count == num_useless_calls + mock_docker_tag_node.reset_mock() mock_docker_set_node_availability.assert_not_called() # check the number of instances did not change and is still running instances = await assert_autoscaled_dynamic_ec2_instances( @@ -899,7 +903,7 @@ async def _assert_wait_for_ec2_instances_terminated() -> None: await _assert_wait_for_ec2_instances_terminated() -@pytest.mark.acceptance_test() +@pytest.mark.acceptance_test @pytest.mark.parametrize( "scale_up_params", [ @@ -943,15 +947,9 @@ async def _assert_wait_for_ec2_instances_terminated() -> None: ) async def test_cluster_scaling_up_and_down( minimal_configuration: None, - service_monitored_labels: dict[DockerLabelKey, str], - osparc_docker_label_keys: StandardSimcoreDockerLabels, app_settings: ApplicationSettings, initialized_app: FastAPI, - create_service: Callable[ - [dict[str, Any], dict[DockerLabelKey, str], str, list[str]], Awaitable[Service] - ], - task_template: dict[str, Any], - create_task_reservations: Callable[[int, int], dict[str, Any]], + create_services_batch: Callable[[_ScaleUpParams], Awaitable[list[Service]]], ec2_client: EC2Client, mock_docker_tag_node: mock.Mock, fake_node: Node, @@ -968,13 +966,9 @@ async def test_cluster_scaling_up_and_down( spied_cluster_analysis: MockType, ): await _test_cluster_scaling_up_and_down( - service_monitored_labels=service_monitored_labels, - osparc_docker_label_keys=osparc_docker_label_keys, app_settings=app_settings, initialized_app=initialized_app, - create_service=create_service, - task_template=task_template, - create_task_reservations=create_task_reservations, + create_services_batch=create_services_batch, ec2_client=ec2_client, mock_docker_tag_node=mock_docker_tag_node, fake_node=fake_node, @@ -1021,15 +1015,9 @@ async def test_cluster_scaling_up_and_down_against_aws( disable_buffers_pool_background_task: None, mocked_redis_server: None, external_envfile_dict: EnvVarsDict, - service_monitored_labels: dict[DockerLabelKey, str], - osparc_docker_label_keys: StandardSimcoreDockerLabels, app_settings: ApplicationSettings, initialized_app: FastAPI, - create_service: Callable[ - [dict[str, Any], dict[DockerLabelKey, str], str, list[str]], Awaitable[Service] - ], - task_template: dict[str, Any], - create_task_reservations: Callable[[int, int], dict[str, Any]], + create_services_batch: Callable[[_ScaleUpParams], Awaitable[list[Service]]], ec2_client: EC2Client, mock_docker_tag_node: mock.Mock, fake_node: Node, @@ -1054,13 +1042,9 @@ async def test_cluster_scaling_up_and_down_against_aws( f" The passed external ENV allows for {list(external_ec2_instances_allowed_types)}" ) await _test_cluster_scaling_up_and_down( - service_monitored_labels=service_monitored_labels, - osparc_docker_label_keys=osparc_docker_label_keys, app_settings=app_settings, initialized_app=initialized_app, - create_service=create_service, - task_template=task_template, - create_task_reservations=create_task_reservations, + create_services_batch=create_services_batch, ec2_client=ec2_client, mock_docker_tag_node=mock_docker_tag_node, fake_node=fake_node, @@ -1109,17 +1093,11 @@ async def test_cluster_scaling_up_and_down_against_aws( ], ) async def test_cluster_scaling_up_starts_multiple_instances( - patch_ec2_client_launch_instancess_min_number_of_instances: mock.Mock, + patch_ec2_client_launch_instances_min_number_of_instances: mock.Mock, minimal_configuration: None, - service_monitored_labels: dict[DockerLabelKey, str], - osparc_docker_label_keys: StandardSimcoreDockerLabels, app_settings: ApplicationSettings, initialized_app: FastAPI, - create_service: Callable[ - [dict[str, Any], dict[DockerLabelKey, str], str, list[str]], Awaitable[Service] - ], - task_template: dict[str, Any], - create_task_reservations: Callable[[int, int], dict[str, Any]], + create_services_batch: Callable[[_ScaleUpParams], Awaitable[list[Service]]], ec2_client: EC2Client, mock_docker_tag_node: mock.Mock, scale_up_params: _ScaleUpParams, @@ -1134,28 +1112,7 @@ async def test_cluster_scaling_up_starts_multiple_instances( assert not all_instances["Reservations"] # create several tasks that needs more power - await asyncio.gather( - *( - create_service( - task_template - | create_task_reservations( - int(scale_up_params.service_resources.cpus), - scale_up_params.service_resources.ram, - ), - service_monitored_labels - | osparc_docker_label_keys.to_simcore_runtime_docker_labels(), - "pending", - ( - [ - f"node.labels.{DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY}=={scale_up_params.imposed_instance_type}" - ] - if scale_up_params.imposed_instance_type - else [] - ), - ) - for _ in range(scale_up_params.num_services) - ) - ) + await create_services_batch(scale_up_params) # run the code await auto_scale_cluster( @@ -1188,12 +1145,316 @@ async def test_cluster_scaling_up_starts_multiple_instances( @pytest.mark.parametrize( - "docker_service_imposed_ec2_type, docker_service_ram, expected_ec2_type", + "with_docker_join_drained", ["with_AUTOSCALING_DOCKER_JOIN_DRAINED"], indirect=True +) +@pytest.mark.parametrize( + "with_drain_nodes_labelled", + ["with_AUTOSCALING_DRAIN_NODES_WITH_LABELS"], + indirect=True, +) +@pytest.mark.parametrize( + "scale_up_params1, scale_up_params2", + [ + pytest.param( + _ScaleUpParams( + imposed_instance_type="g3.4xlarge", # 1 GPU, 16 CPUs, 122GiB + service_resources=Resources( + cpus=16, ram=TypeAdapter(ByteSize).validate_python("30Gib") + ), + num_services=12, + expected_instance_type="g3.4xlarge", # 1 GPU, 16 CPUs, 122GiB + expected_num_instances=10, + ), + _ScaleUpParams( + imposed_instance_type="g4dn.8xlarge", # 32CPUs, 128GiB + service_resources=Resources( + cpus=32, ram=TypeAdapter(ByteSize).validate_python("20480MB") + ), + num_services=7, + expected_instance_type="g4dn.8xlarge", # 32CPUs, 128GiB + expected_num_instances=7, + ), + id="A batch of services requiring g3.4xlarge and a batch requiring g4dn.8xlarge", + ), + ], +) +async def test_cluster_adapts_machines_on_the_fly( # noqa: PLR0915 + patch_ec2_client_launch_instances_min_number_of_instances: mock.Mock, + minimal_configuration: None, + ec2_client: EC2Client, + initialized_app: FastAPI, + app_settings: ApplicationSettings, + create_services_batch: Callable[[_ScaleUpParams], Awaitable[list[Service]]], + ec2_instance_custom_tags: dict[str, str], + instance_type_filters: Sequence[FilterTypeDef], + async_docker_client: aiodocker.Docker, + scale_up_params1: _ScaleUpParams, + scale_up_params2: _ScaleUpParams, + mocked_associate_ec2_instances_with_nodes: mock.Mock, + create_fake_node: Callable[..., Node], + mock_docker_tag_node: mock.Mock, + mock_compute_node_used_resources: mock.Mock, + spied_cluster_analysis: MockType, + mocker: MockerFixture, +): + # pre-requisites + assert app_settings.AUTOSCALING_EC2_INSTANCES + assert app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES > 0 + assert ( + scale_up_params1.num_services + >= app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + ), "this test requires to run a first batch of more services than the maximum number of instances allowed" + # we have nothing running now + all_instances = await ec2_client.describe_instances() + assert not all_instances["Reservations"] + + # + # 1. create the first batch of services requiring the initial machines + first_batch_services = await create_services_batch(scale_up_params1) + + # it will only scale once and do nothing else + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + ) + await assert_autoscaled_dynamic_ec2_instances( + ec2_client, + expected_num_reservations=1, + expected_num_instances=scale_up_params1.expected_num_instances, + expected_instance_type=scale_up_params1.expected_instance_type, + expected_instance_state="running", + expected_additional_tag_keys=list(ec2_instance_custom_tags), + instance_filters=instance_type_filters, + ) + assert_cluster_state( + spied_cluster_analysis, + expected_calls=1, + expected_num_machines=0, + ) + mocked_associate_ec2_instances_with_nodes.assert_called_once_with([], []) + mocked_associate_ec2_instances_with_nodes.reset_mock() + mocked_associate_ec2_instances_with_nodes.side_effect = create_fake_association( + create_fake_node, None, None + ) + + # + # 2. now the machines are associated + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + ) + analyzed_cluster = assert_cluster_state( + spied_cluster_analysis, + expected_calls=1, + expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + ) + mocked_associate_ec2_instances_with_nodes.assert_called_once() + mock_docker_tag_node.assert_called() + assert ( + mock_docker_tag_node.call_count + == app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + ) + assert analyzed_cluster.active_nodes + + # + # 3. now we start the second batch of services requiring a different type of machines + await create_services_batch(scale_up_params2) + + # scaling will do nothing since we have hit the maximum number of machines + for _ in range(3): + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + ) + await assert_autoscaled_dynamic_ec2_instances( + ec2_client, + expected_num_reservations=1, + expected_num_instances=scale_up_params1.expected_num_instances, + expected_instance_type=scale_up_params1.expected_instance_type, + expected_instance_state="running", + expected_additional_tag_keys=list(ec2_instance_custom_tags), + instance_filters=instance_type_filters, + ) + + analyzed_cluster = assert_cluster_state( + spied_cluster_analysis, + expected_calls=3, + expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + ) + assert analyzed_cluster.active_nodes + assert not analyzed_cluster.drained_nodes + + # + # 4.now we simulate that some of the services in the 1st batch have completed and that we are 1 below the max + # a machine should switch off and another type should be started + completed_services_to_stop = random.sample( + first_batch_services, + scale_up_params1.num_services + - app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + + 1, + ) + await asyncio.gather( + *( + async_docker_client.services.delete(s.id) + for s in completed_services_to_stop + if s.id + ) + ) + + # first call to auto_scale_cluster will mark 1 node as empty + with mock.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.set_node_found_empty", + autospec=True, + ) as mock_docker_set_node_found_empty: + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + ) + analyzed_cluster = assert_cluster_state( + spied_cluster_analysis, + expected_calls=1, + expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + ) + assert analyzed_cluster.active_nodes + assert not analyzed_cluster.drained_nodes + # the last machine is found empty + mock_docker_set_node_found_empty.assert_called_with( + mock.ANY, + analyzed_cluster.active_nodes[-1].node, + empty=True, + ) + + # now we mock the get_node_found_empty so the next call will actually drain the machine + with mock.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.get_node_empty_since", + autospec=True, + return_value=arrow.utcnow().datetime + - 1.5 + * app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_DRAINING, + ) as mocked_get_node_empty_since: + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + ) + mocked_get_node_empty_since.assert_called_once() + analyzed_cluster = assert_cluster_state( + spied_cluster_analysis, + expected_calls=1, + expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + ) + assert analyzed_cluster.active_nodes + assert not analyzed_cluster.drained_nodes + # now scaling again should find the drained machine + drained_machine_instance_id = analyzed_cluster.active_nodes[-1].ec2_instance.id + mocked_associate_ec2_instances_with_nodes.side_effect = create_fake_association( + create_fake_node, drained_machine_instance_id, None + ) + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + ) + analyzed_cluster = assert_cluster_state( + spied_cluster_analysis, + expected_calls=1, + expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + ) + assert analyzed_cluster.active_nodes + assert analyzed_cluster.drained_nodes + + # this will initiate termination now + with mock.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.get_node_last_readyness_update", + autospec=True, + return_value=arrow.utcnow().datetime + - 1.5 + * app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION, + ): + mock_docker_tag_node.reset_mock() + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + ) + analyzed_cluster = assert_cluster_state( + spied_cluster_analysis, + expected_calls=1, + expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + ) + mock_docker_tag_node.assert_called_with( + mock.ANY, + analyzed_cluster.drained_nodes[-1].node, + tags=mock.ANY, + available=False, + ) + + # scaling again should find the terminating machine + mocked_associate_ec2_instances_with_nodes.side_effect = create_fake_association( + create_fake_node, drained_machine_instance_id, drained_machine_instance_id + ) + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + ) + analyzed_cluster = assert_cluster_state( + spied_cluster_analysis, + expected_calls=1, + expected_num_machines=app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES, + ) + assert analyzed_cluster.active_nodes + assert not analyzed_cluster.drained_nodes + assert analyzed_cluster.terminating_nodes + + # now this will terminate it and straight away start a new machine type + with mock.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.get_node_termination_started_since", + autospec=True, + return_value=arrow.utcnow().datetime + - 1.5 + * app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_TIME_BEFORE_TERMINATION, + ): + mocked_docker_remove_node = mocker.patch( + "simcore_service_autoscaling.modules.auto_scaling_core.utils_docker.remove_nodes", + return_value=None, + autospec=True, + ) + await auto_scale_cluster( + app=initialized_app, auto_scaling_mode=DynamicAutoscaling() + ) + mocked_docker_remove_node.assert_called_once() + + # now let's check what we have + all_instances = await ec2_client.describe_instances() + assert len(all_instances["Reservations"]) == 2, "there should be 2 Reservations" + reservation1 = all_instances["Reservations"][0] + assert "Instances" in reservation1 + assert len(reservation1["Instances"]) == ( + app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES + ), f"expected {app_settings.AUTOSCALING_EC2_INSTANCES.EC2_INSTANCES_MAX_INSTANCES} EC2 instances, found {len(reservation1['Instances'])}" + for instance in reservation1["Instances"]: + assert "InstanceType" in instance + assert instance["InstanceType"] == scale_up_params1.expected_instance_type + assert "InstanceId" in instance + assert "State" in instance + assert "Name" in instance["State"] + if instance["InstanceId"] == drained_machine_instance_id: + assert instance["State"]["Name"] == "terminated" + else: + assert instance["State"]["Name"] == "running" + + reservation2 = all_instances["Reservations"][1] + assert "Instances" in reservation2 + assert ( + len(reservation2["Instances"]) == 1 + ), f"expected 1 EC2 instances, found {len(reservation2['Instances'])}" + for instance in reservation2["Instances"]: + assert "InstanceType" in instance + assert instance["InstanceType"] == scale_up_params2.expected_instance_type + + +@pytest.mark.parametrize( + "scale_up_params", [ pytest.param( - None, - TypeAdapter(ByteSize).validate_python("128Gib"), - "r5n.4xlarge", + _ScaleUpParams( + imposed_instance_type=None, + service_resources=Resources( + cpus=4, ram=TypeAdapter(ByteSize).validate_python("128Gib") + ), + num_services=1, + expected_instance_type="r5n.4xlarge", + expected_num_instances=1, + ), id="No explicit instance defined", ), ], @@ -1201,24 +1462,17 @@ async def test_cluster_scaling_up_starts_multiple_instances( async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( with_short_ec2_instances_max_start_time: EnvVarsDict, minimal_configuration: None, - service_monitored_labels: dict[DockerLabelKey, str], app_settings: ApplicationSettings, initialized_app: FastAPI, - create_service: Callable[ - [dict[str, Any], dict[DockerLabelKey, str], str, list[str]], Awaitable[Service] - ], - task_template: dict[str, Any], - create_task_reservations: Callable[[int, int], dict[str, Any]], + create_services_batch: Callable[[_ScaleUpParams], Awaitable[list[Service]]], ec2_client: EC2Client, - docker_service_imposed_ec2_type: InstanceTypeType | None, - docker_service_ram: ByteSize, - expected_ec2_type: InstanceTypeType, mock_find_node_with_name_returns_none: mock.Mock, mock_docker_tag_node: mock.Mock, mock_rabbitmq_post_message: mock.Mock, short_ec2_instance_max_start_time: datetime.timedelta, ec2_instance_custom_tags: dict[str, str], instance_type_filters: Sequence[FilterTypeDef], + scale_up_params: _ScaleUpParams, ): assert app_settings.AUTOSCALING_EC2_INSTANCES assert ( @@ -1228,19 +1482,8 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( # we have nothing running now all_instances = await ec2_client.describe_instances() assert not all_instances["Reservations"] - # create a service - await create_service( - task_template | create_task_reservations(4, docker_service_ram), - service_monitored_labels, - "pending", - ( - [ - f"node.labels.{DOCKER_TASK_EC2_INSTANCE_TYPE_PLACEMENT_CONSTRAINT_KEY}=={ docker_service_imposed_ec2_type}" - ] - if docker_service_imposed_ec2_type - else [] - ), - ) + await create_services_batch(scale_up_params) + # this should trigger a scaling up as we have no nodes await auto_scale_cluster( app=initialized_app, auto_scaling_mode=DynamicAutoscaling() @@ -1250,8 +1493,8 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( instances = await assert_autoscaled_dynamic_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), instance_filters=instance_type_filters, @@ -1266,7 +1509,7 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( app_settings, initialized_app, instances_running=0, - instances_pending=1, + instances_pending=scale_up_params.expected_num_instances, ) mock_rabbitmq_post_message.reset_mock() @@ -1293,8 +1536,8 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( instances = await assert_autoscaled_dynamic_ec2_instances( ec2_client, expected_num_reservations=1, - expected_num_instances=1, - expected_instance_type=expected_ec2_type, + expected_num_instances=scale_up_params.expected_num_instances, + expected_instance_type=scale_up_params.expected_instance_type, expected_instance_state="running", expected_additional_tag_keys=list(ec2_instance_custom_tags), instance_filters=instance_type_filters, @@ -1306,7 +1549,7 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( app_settings, initialized_app, instances_running=0, - instances_pending=1, + instances_pending=scale_up_params.expected_num_instances, ) mock_rabbitmq_post_message.reset_mock() assert instances @@ -1337,7 +1580,10 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( all_instances = await ec2_client.describe_instances() assert len(all_instances["Reservations"]) == 2 assert "Instances" in all_instances["Reservations"][0] - assert len(all_instances["Reservations"][0]["Instances"]) == 1 + assert ( + len(all_instances["Reservations"][0]["Instances"]) + == scale_up_params.expected_num_instances + ) assert "State" in all_instances["Reservations"][0]["Instances"][0] assert "Name" in all_instances["Reservations"][0]["Instances"][0]["State"] assert ( @@ -1346,7 +1592,10 @@ async def test_long_pending_ec2_is_detected_as_broken_terminated_and_restarted( ) assert "Instances" in all_instances["Reservations"][1] - assert len(all_instances["Reservations"][1]["Instances"]) == 1 + assert ( + len(all_instances["Reservations"][1]["Instances"]) + == scale_up_params.expected_num_instances + ) assert "State" in all_instances["Reservations"][1]["Instances"][0] assert "Name" in all_instances["Reservations"][1]["Instances"][0]["State"] assert ( diff --git a/services/autoscaling/tests/unit/test_utils_docker.py b/services/autoscaling/tests/unit/test_utils_docker.py index 3f9677112bb..90f214ee530 100644 --- a/services/autoscaling/tests/unit/test_utils_docker.py +++ b/services/autoscaling/tests/unit/test_utils_docker.py @@ -169,6 +169,25 @@ async def test_get_monitored_nodes_with_valid_label( ) +async def test_get_monitored_nodes_are_sorted_according_to_creation_date( + mocker: MockerFixture, + autoscaling_docker: AutoscalingDocker, + create_fake_node: Callable[..., Node], + faker: Faker, +): + fake_nodes = [ + create_fake_node(CreatedAt=faker.date_time(tzinfo=datetime.UTC).isoformat()) + for _ in range(10) + ] + mocked_aiodocker = mocker.patch.object(autoscaling_docker, "nodes", autospec=True) + mocked_aiodocker.list.return_value = fake_nodes + monitored_nodes = await get_monitored_nodes(autoscaling_docker, node_labels=[]) + assert len(monitored_nodes) == len(fake_nodes) + sorted_fake_nodes = sorted(fake_nodes, key=lambda node: arrow.get(node.created_at)) + assert monitored_nodes == sorted_fake_nodes + assert monitored_nodes[0].created_at < monitored_nodes[1].created_at + + async def test_worker_nodes( autoscaling_docker: AutoscalingDocker, host_node: Node,