From e75e81697d2085a24a234bc27f3d1c8c97bff504 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 5 Dec 2024 11:45:01 +0100 Subject: [PATCH 1/4] rename --- .../director/src/simcore_service_director/producer.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/services/director/src/simcore_service_director/producer.py b/services/director/src/simcore_service_director/producer.py index 907e7a8e04e..81d1accf23d 100644 --- a/services/director/src/simcore_service_director/producer.py +++ b/services/director/src/simcore_service_director/producer.py @@ -1078,7 +1078,7 @@ async def _get_node_details( async def get_services_details( - app: FastAPI, user_id: str | None, study_id: str | None + app: FastAPI, user_id: str | None, project_id: str | None ) -> list[dict]: app_settings = get_application_settings(app) async with docker_utils.docker_client() as client: # pylint: disable=not-async-context-manager @@ -1091,9 +1091,10 @@ async def get_services_details( filters.append( f"{_to_simcore_runtime_docker_label_key('user_id')}=" + user_id ) - if study_id: + if project_id: filters.append( - f"{_to_simcore_runtime_docker_label_key('project_id')}=" + study_id + f"{_to_simcore_runtime_docker_label_key('project_id')}=" + + project_id ) list_running_services = await client.services.list( filters={"label": filters} @@ -1104,7 +1105,7 @@ async def get_services_details( for service in list_running_services ] except aiodocker.DockerError as err: - msg = f"Error while accessing container for {user_id=}, {study_id=}" + msg = f"Error while accessing container for {user_id=}, {project_id=}" raise GenericDockerError(err=msg) from err From 3bd5424680f3e0ce8d6d34f766a3ee2404e032a4 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 5 Dec 2024 13:40:45 +0100 Subject: [PATCH 2/4] rerouted retrieve via dynamic-scheduler --- .../dynamic_scheduler/services.py | 25 +++++++++- services/docker-compose.yml | 1 + .../api/rpc/_services.py | 15 +++++- .../core/settings.py | 14 ++++++ .../services/director_v2/_public_client.py | 21 ++++++++- .../services/director_v2/_thin_client.py | 17 ++++++- .../services/scheduler_interface.py | 21 ++++++++- .../director_v2/_core_dynamic_services.py | 47 ------------------- .../director_v2/api.py | 4 -- .../dynamic_scheduler/api.py | 21 ++++++++- .../dynamic_scheduler/settings.py | 10 ++++ .../projects/_nodes_handlers.py | 4 +- .../projects/projects_api.py | 16 ++++++- 13 files changed, 155 insertions(+), 61 deletions(-) diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py index 3dcc9ed502f..dd97ef26359 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py @@ -1,7 +1,10 @@ import logging from typing import Final -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler import DYNAMIC_SCHEDULER_RPC_NAMESPACE from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, @@ -10,6 +13,7 @@ from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle from models_library.projects_nodes_io import NodeID from models_library.rabbitmq_basic_types import RPCMethodName +from models_library.services_types import ServicePortKey from pydantic import NonNegativeInt, TypeAdapter from servicelib.logging_utils import log_decorator from servicelib.rabbitmq import RabbitMQRPCClient @@ -73,3 +77,22 @@ async def stop_dynamic_service( timeout_s=timeout_s, ) assert result is None # nosec + + +@log_decorator(_logger, level=logging.DEBUG) +async def retrieve_data_on_ports( + rabbitmq_rpc_client: RabbitMQRPCClient, + *, + node_id: NodeID, + port_keys: list[ServicePortKey], + timeout_s: NonNegativeInt, +) -> RetrieveDataOutEnveloped: + result = await rabbitmq_rpc_client.request( + DYNAMIC_SCHEDULER_RPC_NAMESPACE, + _RPC_METHOD_NAME_ADAPTER.validate_python("retrieve_data_on_ports"), + node_id=node_id, + port_keys=port_keys, + timeout_s=timeout_s, + ) + assert isinstance(result, RetrieveDataOutEnveloped) # nosec + return result diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 265d29e56ed..766c117244d 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -568,6 +568,7 @@ services: DYNAMIC_SCHEDULER_PROFILING: ${DYNAMIC_SCHEDULER_PROFILING} DYNAMIC_SCHEDULER_TRACING: ${DYNAMIC_SCHEDULER_TRACING} DYNAMIC_SCHEDULER_UI_STORAGE_SECRET: ${DYNAMIC_SCHEDULER_UI_STORAGE_SECRET} + DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT: ${DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT} TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT: ${TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT} TRACING_OPENTELEMETRY_COLLECTOR_PORT: ${TRACING_OPENTELEMETRY_COLLECTOR_PORT} static-webserver: diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py index 76a8e30ceec..6ae5b3503a1 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py @@ -1,11 +1,15 @@ from fastapi import FastAPI -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, DynamicServiceStop, ) from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey from servicelib.rabbitmq import RPCRouter from servicelib.rabbitmq.rpc_interfaces.dynamic_scheduler.errors import ( ServiceWaitingForManualInterventionError, @@ -45,3 +49,12 @@ async def stop_dynamic_service( return await scheduler_interface.stop_dynamic_service( app, dynamic_service_stop=dynamic_service_stop ) + + +@router.expose() +async def retrieve_data_on_ports( + app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey] +) -> RetrieveDataOutEnveloped: + return await scheduler_interface.retrieve_data_on_ports( + app, node_id=node_id, port_keys=port_keys + ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py index 9f046943344..0315cdc187f 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/core/settings.py @@ -62,12 +62,26 @@ class _BaseApplicationSettings(BaseApplicationSettings, MixinLoggingSettings): DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT: datetime.timedelta = Field( default=datetime.timedelta(minutes=60), + validation_alias=AliasChoices( + "DYNAMIC_SIDECAR_API_SAVE_RESTORE_STATE_TIMEOUT", + "DYNAMIC_SCHEDULER_STOP_SERVICE_TIMEOUT", + ), description=( "Time to wait before timing out when stopping a dynamic service. " "Since services require data to be stopped, this operation is timed out after 1 hour" ), ) + DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: datetime.timedelta = Field( + default=datetime.timedelta(minutes=60), + description=( + "When dynamic services upload and download data from storage, " + "sometimes very big payloads are involved. In order to handle " + "such payloads it is required to have long timeouts which " + "allow the service to finish the operation." + ), + ) + DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: bool = Field( default=False, description=( diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py index 5ee4ae3bcac..2cfbb5886f7 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py @@ -2,12 +2,16 @@ from typing import Any from fastapi import FastAPI, status -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, ) from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey from pydantic import TypeAdapter from servicelib.fastapi.app_state import SingletonInAppStateMixin from servicelib.fastapi.http_client import AttachLifespanMixin, HasClientSetupInterface @@ -73,7 +77,7 @@ async def stop_dynamic_service( node_id: NodeID, simcore_user_agent: str, save_state: bool, - timeout: datetime.timedelta + timeout: datetime.timedelta # noqa: ASYNC109 ) -> None: try: await self.thin_client.delete_dynamic_service( @@ -98,6 +102,19 @@ async def stop_dynamic_service( raise + async def retrieve_data_on_ports( + self, + *, + node_id: NodeID, + port_keys: list[ServicePortKey], + timeout: datetime.timedelta # noqa: ASYNC109 + ) -> RetrieveDataOutEnveloped: + response = await self.thin_client.dynamic_service_retrieve( + node_id=node_id, port_keys=port_keys, timeout=timeout + ) + dict_response: dict[str, Any] = response.json() + return TypeAdapter(RetrieveDataOutEnveloped).validate_python(dict_response) + def setup_director_v2(app: FastAPI) -> None: public_client = DirectorV2Client(app) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py index 19d93b3a6f1..6e30fa40aea 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py @@ -9,6 +9,7 @@ ) from models_library.projects_nodes_io import NodeID from models_library.services_resources import ServiceResourcesDictHelpers +from models_library.services_types import ServicePortKey from servicelib.common_headers import ( X_DYNAMIC_SIDECAR_REQUEST_DNS, X_DYNAMIC_SIDECAR_REQUEST_SCHEME, @@ -88,7 +89,7 @@ async def delete_dynamic_service( node_id: NodeID, simcore_user_agent: str, save_state: bool, - timeout: datetime.timedelta, + timeout: datetime.timedelta, # noqa: ASYNC109 ) -> Response: @retry_on_errors(total_retry_timeout_overwrite=timeout.total_seconds()) @expect_status(status.HTTP_204_NO_CONTENT) @@ -108,3 +109,17 @@ async def _( ) return await _(self) + + async def dynamic_service_retrieve( + self, + *, + node_id: NodeID, + port_keys: list[ServicePortKey], + timeout: datetime.timedelta, # noqa: ASYNC109 + ) -> Response: + post_data = {"port_keys": port_keys} + return await self.client.post( + f"/dynamic_services/{node_id}:retrieve", + content=json_dumps(post_data), + timeout=timeout.total_seconds(), + ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py index 1d4bcdd112b..fae7ea29c1b 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py @@ -1,11 +1,15 @@ from fastapi import FastAPI -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, DynamicServiceStop, ) from models_library.api_schemas_webserver.projects_nodes import NodeGet, NodeGetIdle from models_library.projects_nodes_io import NodeID +from models_library.services_types import ServicePortKey from ..core.settings import ApplicationSettings from .director_v2 import DirectorV2Client @@ -58,3 +62,18 @@ async def stop_dynamic_service( ) await set_request_as_stopped(app, dynamic_service_stop) + + +async def retrieve_data_on_ports( + app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey] +) -> RetrieveDataOutEnveloped: + settings: ApplicationSettings = app.state.settings + if settings.DYNAMIC_SCHEDULER_USE_INTERNAL_SCHEDULER: + raise NotImplementedError + + director_v2_client = DirectorV2Client.get_from_app_state(app) + return await director_v2_client.retrieve_data_on_ports( + node_id=node_id, + port_keys=port_keys, + timeout=settings.DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT, + ) diff --git a/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py b/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py index 21793b79376..3c6920505db 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/_core_dynamic_services.py @@ -9,14 +9,12 @@ from aiohttp import web from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet from models_library.projects import ProjectID -from models_library.services import ServicePortKey from pydantic import BaseModel, NonNegativeInt, TypeAdapter from pydantic.types import PositiveInt from servicelib.logging_utils import log_decorator from yarl import URL from ._core_base import DataType, request_director_v2 -from .exceptions import DirectorServiceError from .settings import DirectorV2Settings, get_plugin_settings _log = logging.getLogger(__name__) @@ -52,51 +50,6 @@ async def list_dynamic_services( return TypeAdapter(list[DynamicServiceGet]).validate_python(services) -# NOTE: ANE https://github.com/ITISFoundation/osparc-simcore/issues/3191 -@log_decorator(logger=_log) -async def retrieve( - app: web.Application, service_uuid: str, port_keys: list[ServicePortKey] -) -> DataType: - """Pulls data from connections to the dynamic service inputs""" - settings: DirectorV2Settings = get_plugin_settings(app) - result = await request_director_v2( - app, - "POST", - url=settings.base_url / f"dynamic_services/{service_uuid}:retrieve", - data={"port_keys": port_keys}, - timeout=settings.get_service_retrieve_timeout(), - ) - assert isinstance(result, dict) # nosec - return result - - -# NOTE: ANE https://github.com/ITISFoundation/osparc-simcore/issues/3191 -# notice that this function is identical to retrieve except that it does NOT raises -@log_decorator(logger=_log) -async def request_retrieve_dyn_service( - app: web.Application, service_uuid: str, port_keys: list[str] -) -> None: - settings: DirectorV2Settings = get_plugin_settings(app) - body = {"port_keys": port_keys} - - try: - await request_director_v2( - app, - "POST", - url=settings.base_url / f"dynamic_services/{service_uuid}:retrieve", - data=body, - timeout=settings.get_service_retrieve_timeout(), - ) - except DirectorServiceError as exc: - _log.warning( - "Unable to call :retrieve endpoint on service %s, keys: [%s]: error: [%s:%s]", - service_uuid, - port_keys, - exc.status, - exc.reason, - ) - - @log_decorator(logger=_log) async def restart_dynamic_service(app: web.Application, node_uuid: str) -> None: """User restart the dynamic dynamic service started in the node_uuid diff --git a/services/web/server/src/simcore_service_webserver/director_v2/api.py b/services/web/server/src/simcore_service_webserver/director_v2/api.py index 2de6b49e4a2..86c353330ee 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2/api.py +++ b/services/web/server/src/simcore_service_webserver/director_v2/api.py @@ -19,9 +19,7 @@ from ._core_dynamic_services import ( get_project_inactivity, list_dynamic_services, - request_retrieve_dyn_service, restart_dynamic_service, - retrieve, update_dynamic_service_networks_in_project, ) from ._core_utils import is_healthy @@ -40,9 +38,7 @@ "is_healthy", "is_pipeline_running", "list_dynamic_services", - "request_retrieve_dyn_service", "restart_dynamic_service", - "retrieve", "set_project_run_policy", "stop_pipeline", "update_dynamic_service_networks_in_project", diff --git a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py index be02b28bf73..b54a74ec0a1 100644 --- a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py +++ b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py @@ -3,7 +3,10 @@ from functools import partial from aiohttp import web -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, DynamicServiceStop, @@ -18,6 +21,7 @@ from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.rabbitmq_messages import ProgressRabbitMessageProject, ProgressType +from models_library.services import ServicePortKey from pydantic.types import PositiveInt from servicelib.progress_bar import ProgressBarData from servicelib.rabbitmq import RabbitMQClient, RPCServerError @@ -133,3 +137,18 @@ async def stop_dynamic_services_in_project( ] await logged_gather(*services_to_stop) + + +# NOTE: ANE https://github.com/ITISFoundation/osparc-simcore/issues/3191 +async def retrieve( + app: web.Application, node_id: NodeID, port_keys: list[ServicePortKey] +) -> RetrieveDataOutEnveloped: + settings: DynamicSchedulerSettings = get_plugin_settings(app) + return await services.retrieve_data_on_ports( + get_rabbitmq_rpc_client(app), + node_id=node_id, + port_keys=port_keys, + timeout_s=int( + settings.DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT.total_seconds() + ), + ) diff --git a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/settings.py b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/settings.py index 91dac1317b6..9d34fa378cc 100644 --- a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/settings.py +++ b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/settings.py @@ -26,6 +26,16 @@ class DynamicSchedulerSettings(BaseCustomSettings, MixinServiceSettings): ), ) + DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT: datetime.timedelta = Field( + datetime.timedelta(hours=1), + description=( + "When dynamic services upload and download data from storage, " + "sometimes very big payloads are involved. In order to handle " + "such payloads it is required to have long timeouts which " + "allow the service to finish the operation." + ), + ) + def get_plugin_settings(app: web.Application) -> DynamicSchedulerSettings: settings = app[APP_SETTINGS_KEY].WEBSERVER_DYNAMIC_SCHEDULER diff --git a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py index b1088b67873..fcb24e9549c 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py +++ b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py @@ -280,8 +280,8 @@ async def retrieve_node(request: web.Request) -> web.Response: retrieve = await parse_request_body_as(NodeRetrieve, request) return web.json_response( - await director_v2_api.retrieve( - request.app, f"{path_params.node_id}", retrieve.port_keys + await dynamic_scheduler_api.retrieve( + request.app, path_params.node_id, retrieve.port_keys ), dumps=json_dumps, ) diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py index cf9445985c6..7522546d7d8 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py @@ -1125,6 +1125,20 @@ async def is_node_id_present_in_any_project_workbench( return await db.node_id_exists(node_id) +async def _safe_retrieve( + app: web.Application, node_id: NodeID, port_keys: list[str] +) -> None: + try: + await dynamic_scheduler_api.retrieve(app, node_id, port_keys) + except RPCServerError as exc: + log.warning( + "Unable to call :retrieve endpoint on service %s, keys: [%s]: error: [%s]", + node_id, + port_keys, + exc, + ) + + async def _trigger_connected_service_retrieve( app: web.Application, project: dict, updated_node_uuid: str, changed_keys: list[str] ) -> None: @@ -1165,7 +1179,7 @@ async def _trigger_connected_service_retrieve( # call /retrieve on the nodes update_tasks = [ - director_v2_api.request_retrieve_dyn_service(app, node, keys) + _safe_retrieve(app, NodeID(node), keys) for node, keys in nodes_keys_to_update.items() ] await logged_gather(*update_tasks) From 6eb9b2efefad6259cf1e88e812bce311694fc37a Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Thu, 5 Dec 2024 13:47:52 +0100 Subject: [PATCH 3/4] rename endpoint --- .../rabbitmq/rpc_interfaces/dynamic_scheduler/services.py | 4 ++-- .../simcore_service_dynamic_scheduler/api/rpc/_services.py | 4 ++-- .../services/director_v2/_public_client.py | 2 +- .../services/scheduler_interface.py | 4 ++-- .../src/simcore_service_webserver/dynamic_scheduler/api.py | 5 ++--- .../simcore_service_webserver/projects/_nodes_handlers.py | 2 +- .../src/simcore_service_webserver/projects/projects_api.py | 2 +- 7 files changed, 11 insertions(+), 12 deletions(-) diff --git a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py index dd97ef26359..5bd204eb048 100644 --- a/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py +++ b/packages/service-library/src/servicelib/rabbitmq/rpc_interfaces/dynamic_scheduler/services.py @@ -80,7 +80,7 @@ async def stop_dynamic_service( @log_decorator(_logger, level=logging.DEBUG) -async def retrieve_data_on_ports( +async def retrieve_inputs( rabbitmq_rpc_client: RabbitMQRPCClient, *, node_id: NodeID, @@ -89,7 +89,7 @@ async def retrieve_data_on_ports( ) -> RetrieveDataOutEnveloped: result = await rabbitmq_rpc_client.request( DYNAMIC_SCHEDULER_RPC_NAMESPACE, - _RPC_METHOD_NAME_ADAPTER.validate_python("retrieve_data_on_ports"), + _RPC_METHOD_NAME_ADAPTER.validate_python("retrieve_inputs"), node_id=node_id, port_keys=port_keys, timeout_s=timeout_s, diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py index 6ae5b3503a1..374d543fe12 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/api/rpc/_services.py @@ -52,9 +52,9 @@ async def stop_dynamic_service( @router.expose() -async def retrieve_data_on_ports( +async def retrieve_inputs( app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey] ) -> RetrieveDataOutEnveloped: - return await scheduler_interface.retrieve_data_on_ports( + return await scheduler_interface.retrieve_inputs( app, node_id=node_id, port_keys=port_keys ) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py index 2cfbb5886f7..32f515dca22 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_public_client.py @@ -102,7 +102,7 @@ async def stop_dynamic_service( raise - async def retrieve_data_on_ports( + async def retrieve_inputs( self, *, node_id: NodeID, diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py index fae7ea29c1b..632f0520de3 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/scheduler_interface.py @@ -64,7 +64,7 @@ async def stop_dynamic_service( await set_request_as_stopped(app, dynamic_service_stop) -async def retrieve_data_on_ports( +async def retrieve_inputs( app: FastAPI, *, node_id: NodeID, port_keys: list[ServicePortKey] ) -> RetrieveDataOutEnveloped: settings: ApplicationSettings = app.state.settings @@ -72,7 +72,7 @@ async def retrieve_data_on_ports( raise NotImplementedError director_v2_client = DirectorV2Client.get_from_app_state(app) - return await director_v2_client.retrieve_data_on_ports( + return await director_v2_client.retrieve_inputs( node_id=node_id, port_keys=port_keys, timeout=settings.DYNAMIC_SCHEDULER_SERVICE_UPLOAD_DOWNLOAD_TIMEOUT, diff --git a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py index b54a74ec0a1..ae8363ca617 100644 --- a/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py +++ b/services/web/server/src/simcore_service_webserver/dynamic_scheduler/api.py @@ -139,12 +139,11 @@ async def stop_dynamic_services_in_project( await logged_gather(*services_to_stop) -# NOTE: ANE https://github.com/ITISFoundation/osparc-simcore/issues/3191 -async def retrieve( +async def retrieve_inputs( app: web.Application, node_id: NodeID, port_keys: list[ServicePortKey] ) -> RetrieveDataOutEnveloped: settings: DynamicSchedulerSettings = get_plugin_settings(app) - return await services.retrieve_data_on_ports( + return await services.retrieve_inputs( get_rabbitmq_rpc_client(app), node_id=node_id, port_keys=port_keys, diff --git a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py index fcb24e9549c..171da006924 100644 --- a/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py +++ b/services/web/server/src/simcore_service_webserver/projects/_nodes_handlers.py @@ -280,7 +280,7 @@ async def retrieve_node(request: web.Request) -> web.Response: retrieve = await parse_request_body_as(NodeRetrieve, request) return web.json_response( - await dynamic_scheduler_api.retrieve( + await dynamic_scheduler_api.retrieve_inputs( request.app, path_params.node_id, retrieve.port_keys ), dumps=json_dumps, diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py index 7522546d7d8..5d974aee9d2 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py @@ -1129,7 +1129,7 @@ async def _safe_retrieve( app: web.Application, node_id: NodeID, port_keys: list[str] ) -> None: try: - await dynamic_scheduler_api.retrieve(app, node_id, port_keys) + await dynamic_scheduler_api.retrieve_inputs(app, node_id, port_keys) except RPCServerError as exc: log.warning( "Unable to call :retrieve endpoint on service %s, keys: [%s]: error: [%s]", From bf5376683fa2c05ae07930e8ea721bfb9a23d0f1 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Wed, 11 Dec 2024 09:58:01 +0100 Subject: [PATCH 4/4] added missing test --- .../services/director_v2/_thin_client.py | 2 - .../unit/api_rpc/test_api_rpc__services.py | 52 ++++++++++++++++--- 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py index 0103a43c6cc..4b24242ff97 100644 --- a/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py +++ b/services/dynamic-scheduler/src/simcore_service_dynamic_scheduler/services/director_v2/_thin_client.py @@ -86,8 +86,6 @@ async def post_dynamic_service( follow_redirects=True, ) - @retry_on_errors() - @expect_status(status.HTTP_200_OK) async def delete_dynamic_service( self, *, diff --git a/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py b/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py index 7c1665065ae..6d2ec4dac62 100644 --- a/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py +++ b/services/dynamic-scheduler/tests/unit/api_rpc/test_api_rpc__services.py @@ -9,7 +9,10 @@ from faker import Faker from fastapi import FastAPI, status from fastapi.encoders import jsonable_encoder -from models_library.api_schemas_directorv2.dynamic_services import DynamicServiceGet +from models_library.api_schemas_directorv2.dynamic_services import ( + DynamicServiceGet, + RetrieveDataOutEnveloped, +) from models_library.api_schemas_dynamic_scheduler.dynamic_services import ( DynamicServiceStart, DynamicServiceStop, @@ -55,14 +58,14 @@ def node_not_found(faker: Faker) -> NodeID: @pytest.fixture def service_status_new_style() -> DynamicServiceGet: return TypeAdapter(DynamicServiceGet).validate_python( - DynamicServiceGet.model_config["json_schema_extra"]["examples"][1] + DynamicServiceGet.model_json_schema()["examples"][1] ) @pytest.fixture def service_status_legacy() -> NodeGet: return TypeAdapter(NodeGet).validate_python( - NodeGet.model_config["json_schema_extra"]["examples"][1] + NodeGet.model_json_schema()["examples"][1] ) @@ -112,9 +115,7 @@ def mock_director_v2_service_state( mock.get("/dynamic_services").respond( status.HTTP_200_OK, text=json.dumps( - jsonable_encoder( - DynamicServiceGet.model_config["json_schema_extra"]["examples"] - ) + jsonable_encoder(DynamicServiceGet.model_json_schema()["examples"]) ), ) @@ -193,7 +194,7 @@ async def test_list_tracked_dynamic_services(rpc_client: RabbitMQRPCClient): assert len(results) == 2 assert results == [ TypeAdapter(DynamicServiceGet).validate_python(x) - for x in DynamicServiceGet.model_config["json_schema_extra"]["examples"] + for x in DynamicServiceGet.model_json_schema()["examples"] ] @@ -223,7 +224,7 @@ async def test_get_state( def dynamic_service_start() -> DynamicServiceStart: # one for legacy and one for new style? return TypeAdapter(DynamicServiceStart).validate_python( - DynamicServiceStart.model_config["json_schema_extra"]["example"] + DynamicServiceStart.model_json_schema()["example"] ) @@ -490,3 +491,38 @@ async def test_stop_dynamic_service_serializes_generic_errors( ), timeout_s=5, ) + + +@pytest.fixture +def mock_director_v2_service_retrieve_inputs(node_id: NodeID) -> Iterator[None]: + with respx.mock( + base_url="http://director-v2:8000/v2", + assert_all_called=False, + assert_all_mocked=True, # IMPORTANT: KEEP always True! + ) as mock: + request_ok = mock.post(f"/dynamic_services/{node_id}:retrieve") + + request_ok.respond( + status.HTTP_200_OK, + text=TypeAdapter(RetrieveDataOutEnveloped) + .validate_python( + RetrieveDataOutEnveloped.model_json_schema()["examples"][0] + ) + .model_dump_json(), + ) + + yield None + + +async def test_retrieve_inputs( + mock_director_v2_service_retrieve_inputs: None, + rpc_client: RabbitMQRPCClient, + node_id: NodeID, +): + results = await services.retrieve_inputs( + rpc_client, node_id=node_id, port_keys=[], timeout_s=10 + ) + assert ( + results.model_dump(mode="python") + == RetrieveDataOutEnveloped.model_json_schema()["examples"][0] + )