From 3db08ac645a0c1299d1ae7a8365886431031b1e5 Mon Sep 17 00:00:00 2001 From: Tasko Olevski Date: Wed, 30 Oct 2024 10:50:39 -0400 Subject: [PATCH] feat: add data connectors (#478) Co-authored-by: Samuel Gaist Co-authored-by: Ralf Grubenmann Co-authored-by: Flora Thiebaut --- Makefile | 6 +- bases/renku_data_services/data_api/app.py | 3 + .../renku_data_services/app_config/config.py | 1 + .../renku_data_services/authn/keycloak.py | 2 +- .../renku_data_services/base_api/auth.py | 24 - components/renku_data_services/crc/db.py | 2 +- .../data_connectors/api.spec.yaml | 4 +- .../data_connectors/apispec.py | 12 +- .../renku_data_services/data_connectors/db.py | 35 +- .../data_connectors/models.py | 8 + .../data_connectors/orm.py | 1 + ..._expand_and_separate_environments_from_.py | 4 +- .../notebooks/api.spec.yaml | 6 +- .../notebooks/api/classes/data_service.py | 20 +- .../notebooks/api/classes/k8s_client.py | 11 +- .../notebooks/api/schemas/cloud_storage.py | 22 +- .../renku_data_services/notebooks/apispec.py | 10 +- .../notebooks/blueprints.py | 136 +++-- .../notebooks/config/dynamic.py | 10 +- .../renku_data_services/notebooks/core.py | 2 +- .../notebooks/cr_amalthea_session.py | 471 +++++++++++++++++- .../renku_data_services/notebooks/crs.py | 11 +- .../notebooks/util/kubernetes_.py | 22 +- .../renku_data_services/notebooks/utils.py | 97 ++++ .../renku_data_services/project/blueprints.py | 17 +- .../renku_data_services/storage/api.spec.yaml | 4 +- .../renku_data_services/storage/apispec.py | 12 +- .../data_api/test_notebooks.py | 7 +- .../data_api/test_projects.py | 4 +- .../data_api/test_sessions.py | 2 +- 30 files changed, 813 insertions(+), 153 deletions(-) create mode 100644 components/renku_data_services/notebooks/utils.py diff --git a/Makefile b/Makefile index 5006134f9..b86b973ff 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ .PHONY: schemas tests test_setup main_tests schemathesis_tests collect_coverage style_checks pre_commit_checks run download_avro check_avro avro_models update_avro k3d_cluster install_amaltheas all -AMALTHEA_JS_VERSION ?= 0.12.2 -AMALTHEA_SESSIONS_VERSION ?= 0.0.10-new-operator-chart +AMALTHEA_JS_VERSION ?= 0.13.0 +AMALTHEA_SESSIONS_VERSION ?= 0.13.0 codegen_params = --input-file-type openapi --output-model-type pydantic_v2.BaseModel --use-double-quotes --target-python-version 3.12 --collapse-root-models --field-constraints --strict-nullable --set-default-enum-member --openapi-scopes schemas paths parameters --set-default-enum-member --use-one-literal-as-default --use-default define test_apispec_up_to_date @@ -166,5 +166,5 @@ install_amaltheas: ## Installs both version of amalthea in the. NOTE: It uses t # TODO: Add the version variables from the top of the file here when the charts are fully published amalthea_schema: ## Updates generates pydantic classes from CRDs - curl https://raw.githubusercontent.com/SwissDataScienceCenter/amalthea/feat-add-cloud-storage/config/crd/bases/amalthea.dev_amaltheasessions.yaml | yq '.spec.versions[0].schema.openAPIV3Schema' | poetry run datamodel-codegen --input-file-type jsonschema --output-model-type pydantic_v2.BaseModel --output components/renku_data_services/notebooks/cr_amalthea_session.py --use-double-quotes --target-python-version 3.12 --collapse-root-models --field-constraints --strict-nullable --base-class renku_data_services.notebooks.cr_base.BaseCRD --allow-extra-fields --use-default-kwarg + curl https://raw.githubusercontent.com/SwissDataScienceCenter/amalthea/main/config/crd/bases/amalthea.dev_amaltheasessions.yaml | yq '.spec.versions[0].schema.openAPIV3Schema' | poetry run datamodel-codegen --input-file-type jsonschema --output-model-type pydantic_v2.BaseModel --output components/renku_data_services/notebooks/cr_amalthea_session.py --use-double-quotes --target-python-version 3.12 --collapse-root-models --field-constraints --strict-nullable --base-class renku_data_services.notebooks.cr_base.BaseCRD --allow-extra-fields --use-default-kwarg curl https://raw.githubusercontent.com/SwissDataScienceCenter/amalthea/main/controller/crds/jupyter_server.yaml | yq '.spec.versions[0].schema.openAPIV3Schema' | poetry run datamodel-codegen --input-file-type jsonschema --output-model-type pydantic_v2.BaseModel --output components/renku_data_services/notebooks/cr_jupyter_server.py --use-double-quotes --target-python-version 3.12 --collapse-root-models --field-constraints --strict-nullable --base-class renku_data_services.notebooks.cr_base.BaseCRD --allow-extra-fields --use-default-kwarg diff --git a/bases/renku_data_services/data_api/app.py b/bases/renku_data_services/data_api/app.py index 1251a6dd7..96af9f608 100644 --- a/bases/renku_data_services/data_api/app.py +++ b/bases/renku_data_services/data_api/app.py @@ -149,6 +149,9 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic: session_repo=config.session_repo, storage_repo=config.storage_repo, rp_repo=config.rp_repo, + data_connector_repo=config.data_connector_repo, + data_connector_project_link_repo=config.data_connector_to_project_link_repo, + data_connector_secret_repo=config.data_connector_secret_repo, internal_gitlab_authenticator=config.gitlab_authenticator, ) platform_config = PlatformConfigBP( diff --git a/components/renku_data_services/app_config/config.py b/components/renku_data_services/app_config/config.py index b7a2389d3..c3ccd3eb7 100644 --- a/components/renku_data_services/app_config/config.py +++ b/components/renku_data_services/app_config/config.py @@ -459,6 +459,7 @@ def data_connector_secret_repo(self) -> DataConnectorSecretRepository: data_connector_repo=self.data_connector_repo, user_repo=self.kc_user_repo, secret_service_public_key=self.secrets_service_public_key, + authz=self.authz, ) return self._data_connector_secret_repo diff --git a/components/renku_data_services/authn/keycloak.py b/components/renku_data_services/authn/keycloak.py index e1e5d3914..7e615ef2d 100644 --- a/components/renku_data_services/authn/keycloak.py +++ b/components/renku_data_services/authn/keycloak.py @@ -98,7 +98,7 @@ async def authenticate( user = base_models.AuthenticatedAPIUser( is_admin=is_admin, id=id, - access_token=access_token, + access_token=token, full_name=parsed.get("name"), first_name=parsed.get("given_name"), last_name=parsed.get("family_name"), diff --git a/components/renku_data_services/base_api/auth.py b/components/renku_data_services/base_api/auth.py index f468296f4..16b76b09d 100644 --- a/components/renku_data_services/base_api/auth.py +++ b/components/renku_data_services/base_api/auth.py @@ -71,30 +71,6 @@ async def decorated_function(request: Request, *args: _P.args, **kwargs: _P.kwar return decorator -def validate_path_project_id( - f: Callable[Concatenate[Request, _P], Coroutine[Any, Any, _T]], -) -> Callable[Concatenate[Request, _P], Coroutine[Any, Any, _T]]: - """Decorator for a Sanic handler that validates the project_id path parameter.""" - _path_project_id_regex = re.compile(r"^[A-Za-z0-9]{26}$") - - @wraps(f) - async def decorated_function(request: Request, *args: _P.args, **kwargs: _P.kwargs) -> _T: - project_id = cast(str | None, kwargs.get("project_id")) - if not project_id: - raise errors.ProgrammingError( - message="Could not find 'project_id' in the keyword arguments for the handler in order to validate it." - ) - if not _path_project_id_regex.match(project_id): - raise errors.ValidationError( - message=f"The 'project_id' path parameter {project_id} does not match the required " - f"regex {_path_project_id_regex}" - ) - - return await f(request, *args, **kwargs) - - return decorated_function - - def validate_path_user_id( f: Callable[Concatenate[Request, _P], Coroutine[Any, Any, _T]], ) -> Callable[Concatenate[Request, _P], Coroutine[Any, Any, _T]]: diff --git a/components/renku_data_services/crc/db.py b/components/renku_data_services/crc/db.py index 68b716e4a..8b5536b1e 100644 --- a/components/renku_data_services/crc/db.py +++ b/components/renku_data_services/crc/db.py @@ -12,7 +12,7 @@ from functools import wraps from typing import Any, Concatenate, Optional, ParamSpec, TypeVar, cast -from sqlalchemy import NullPool, delete, select +from sqlalchemy import NullPool, delete, false, select, true from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.orm import selectinload from sqlalchemy.sql import Select, and_, not_, or_ diff --git a/components/renku_data_services/data_connectors/api.spec.yaml b/components/renku_data_services/data_connectors/api.spec.yaml index 7bba8604e..90b68377e 100644 --- a/components/renku_data_services/data_connectors/api.spec.yaml +++ b/components/renku_data_services/data_connectors/api.spec.yaml @@ -619,10 +619,10 @@ components: exclusive: type: boolean description: if true, only values from 'examples' can be used - datatype: + type: type: string description: data type of option value. RClone has more options but they map to the ones listed here. - enum: ["int", "bool", "string", "Time"] + enum: ["int", "bool", "string", "Time", "Duration", "MultiEncoder", "SizeSuffix", "SpaceSepList", "CommaSepList", "Tristate"] Ulid: description: ULID identifier type: string diff --git a/components/renku_data_services/data_connectors/apispec.py b/components/renku_data_services/data_connectors/apispec.py index 33bde1bbc..370ac0958 100644 --- a/components/renku_data_services/data_connectors/apispec.py +++ b/components/renku_data_services/data_connectors/apispec.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: api.spec.yaml -# timestamp: 2024-10-22T07:46:54+00:00 +# timestamp: 2024-10-28T20:03:14+00:00 from __future__ import annotations @@ -23,11 +23,17 @@ class Example(BaseAPISpec): ) -class Datatype(Enum): +class Type(Enum): int = "int" bool = "bool" string = "string" Time = "Time" + Duration = "Duration" + MultiEncoder = "MultiEncoder" + SizeSuffix = "SizeSuffix" + SpaceSepList = "SpaceSepList" + CommaSepList = "CommaSepList" + Tristate = "Tristate" class RCloneOption(BaseAPISpec): @@ -65,7 +71,7 @@ class RCloneOption(BaseAPISpec): exclusive: Optional[bool] = Field( None, description="if true, only values from 'examples' can be used" ) - datatype: Optional[Datatype] = Field( + type: Optional[Type] = Field( None, description="data type of option value. RClone has more options but they map to the ones listed here.", ) diff --git a/components/renku_data_services/data_connectors/db.py b/components/renku_data_services/data_connectors/db.py index a075baaf3..a2a344c8f 100644 --- a/components/renku_data_services/data_connectors/db.py +++ b/components/renku_data_services/data_connectors/db.py @@ -1,6 +1,6 @@ """Adapters for data connectors database classes.""" -from collections.abc import Callable +from collections.abc import AsyncIterator, Callable from typing import TypeVar from cryptography.hazmat.primitives.asymmetric import rsa @@ -477,11 +477,44 @@ def __init__( data_connector_repo: DataConnectorRepository, user_repo: UserRepo, secret_service_public_key: rsa.RSAPublicKey, + authz: Authz, ) -> None: self.session_maker = session_maker self.data_connector_repo = data_connector_repo self.user_repo = user_repo self.secret_service_public_key = secret_service_public_key + self.authz = authz + + async def get_data_connectors_with_secrets( + self, + user: base_models.APIUser, + project_id: ULID, + ) -> AsyncIterator[models.DataConnectorWithSecrets]: + """Get all data connectors and their secrets for a project.""" + if user.id is None: + raise errors.UnauthorizedError(message="You do not have the required permissions for this operation.") + + can_read_project = await self.authz.has_permission(user, ResourceType.project, project_id, Scope.READ) + if not can_read_project: + raise errors.MissingResourceError( + message=f"The project ID with {project_id} does not exist or you dont have permission to access it" + ) + + data_connector_ids = await self.authz.resources_with_permission( + user, user.id, ResourceType.data_connector, Scope.READ + ) + + async with self.session_maker() as session: + stmt = select(schemas.DataConnectorORM).where( + schemas.DataConnectorORM.project_links.any( + schemas.DataConnectorToProjectLinkORM.project_id == project_id + ), + schemas.DataConnectorORM.id.in_(data_connector_ids), + ) + results = await session.stream_scalars(stmt) + async for dc in results: + secrets = await self.get_data_connector_secrets(user, dc.id) + yield models.DataConnectorWithSecrets(dc.dump(), secrets) async def get_data_connector_secrets( self, diff --git a/components/renku_data_services/data_connectors/models.py b/components/renku_data_services/data_connectors/models.py index e67082d75..498e664b4 100644 --- a/components/renku_data_services/data_connectors/models.py +++ b/components/renku_data_services/data_connectors/models.py @@ -150,3 +150,11 @@ class DataConnectorPermissions: write: bool delete: bool change_membership: bool + + +@dataclass +class DataConnectorWithSecrets: + """A data connector with its secrets.""" + + data_connector: DataConnector + secrets: list[DataConnectorSecret] = field(default_factory=list) diff --git a/components/renku_data_services/data_connectors/orm.py b/components/renku_data_services/data_connectors/orm.py index 447168d34..6340c73be 100644 --- a/components/renku_data_services/data_connectors/orm.py +++ b/components/renku_data_services/data_connectors/orm.py @@ -85,6 +85,7 @@ class DataConnectorORM(BaseORM): onupdate=func.now(), nullable=False, ) + project_links: Mapped[list["DataConnectorToProjectLinkORM"]] = relationship(init=False, viewonly=True) def dump(self) -> models.DataConnector: """Create a data connector model from the DataConnectorORM.""" diff --git a/components/renku_data_services/migrations/versions/584598f3b769_expand_and_separate_environments_from_.py b/components/renku_data_services/migrations/versions/584598f3b769_expand_and_separate_environments_from_.py index 828fe2963..b5db16c2f 100644 --- a/components/renku_data_services/migrations/versions/584598f3b769_expand_and_separate_environments_from_.py +++ b/components/renku_data_services/migrations/versions/584598f3b769_expand_and_separate_environments_from_.py @@ -1,7 +1,7 @@ """expand and separate environments from session launchers Revision ID: 584598f3b769 -Revises: 726d5d0e1f28 +Revises: cefb45b5d71e Create Date: 2024-08-12 14:25:24.292285 """ @@ -12,7 +12,7 @@ # revision identifiers, used by Alembic. revision = "584598f3b769" -down_revision = "726d5d0e1f28" +down_revision = "cefb45b5d71e" branch_labels = None depends_on = None diff --git a/components/renku_data_services/notebooks/api.spec.yaml b/components/renku_data_services/notebooks/api.spec.yaml index 519c23751..475124754 100644 --- a/components/renku_data_services/notebooks/api.spec.yaml +++ b/components/renku_data_services/notebooks/api.spec.yaml @@ -1015,7 +1015,6 @@ components: additionalProperties: true readonly: type: boolean - default: true source_path: type: string target_path: @@ -1023,11 +1022,8 @@ components: storage_id: allOf: - "$ref": "#/components/schemas/Ulid" - - description: The storage ID is used to know which storage config from the DB should be overriden + - description: If the storage_id is provided then this config must replace an existing storage config in the session required: - - configuration - - source_path - - target_path - storage_id ServerName: type: string diff --git a/components/renku_data_services/notebooks/api/classes/data_service.py b/components/renku_data_services/notebooks/api/classes/data_service.py index 4e7b6d44c..88bd5b744 100644 --- a/components/renku_data_services/notebooks/api/classes/data_service.py +++ b/components/renku_data_services/notebooks/api/classes/data_service.py @@ -58,8 +58,8 @@ async def get_storage_by_id( # TODO: remove project_id once authz on the data service works properly request_url = self.storage_url + f"/storage/{storage_id}?project_id={project_id}" logger.info(f"getting storage info by id: {request_url}") - async with httpx.AsyncClient() as client: - res = await client.get(request_url, headers=headers, timeout=10) + async with httpx.AsyncClient(timeout=10) as client: + res = await client.get(request_url, headers=headers) if res.status_code == 404: raise MissingResourceError(message=f"Couldn't find cloud storage with id {storage_id}") if res.status_code == 401: @@ -79,8 +79,8 @@ async def get_storage_by_id( async def validate_storage_configuration(self, configuration: dict[str, Any], source_path: str) -> None: """Validate the cloud storage configuration.""" - async with httpx.AsyncClient() as client: - res = await client.post(self.storage_url + "/storage_schema/validate", json=configuration, timeout=10) + async with httpx.AsyncClient(timeout=10) as client: + res = await client.post(self.storage_url + "/storage_schema/validate", json=configuration) if res.status_code == 422: raise InvalidCloudStorageConfiguration( message=f"The provided cloud storage configuration isn't valid: {res.json()}", @@ -92,8 +92,8 @@ async def validate_storage_configuration(self, configuration: dict[str, Any], so async def obscure_password_fields_for_storage(self, configuration: dict[str, Any]) -> dict[str, Any]: """Obscures password fields for use with rclone.""" - async with httpx.AsyncClient() as client: - res = await client.post(self.storage_url + "/storage_schema/obscure", json=configuration, timeout=10) + async with httpx.AsyncClient(timeout=10) as client: + res = await client.post(self.storage_url + "/storage_schema/obscure", json=configuration) if res.status_code != 200: raise InvalidCloudStorageConfiguration( @@ -300,8 +300,8 @@ async def get_oauth2_connections(self, user: APIUser | None = None) -> list[OAut return [] request_url = f"{self.service_url}/oauth2/connections" headers = {"Authorization": f"bearer {user.access_token}"} - async with httpx.AsyncClient() as client: - res = await client.get(request_url, headers=headers, timeout=10) + async with httpx.AsyncClient(timeout=10) as client: + res = await client.get(request_url, headers=headers) if res.status_code != 200: raise IntermittentError(message="The data service sent an unexpected response, please try again later") connections = res.json() @@ -311,8 +311,8 @@ async def get_oauth2_connections(self, user: APIUser | None = None) -> list[OAut async def get_oauth2_provider(self, provider_id: str) -> OAuth2Provider: """Get a specific provider.""" request_url = f"{self.service_url}/oauth2/providers/{provider_id}" - async with httpx.AsyncClient() as client: - res = await client.get(request_url, timeout=10) + async with httpx.AsyncClient(timeout=10) as client: + res = await client.get(request_url) if res.status_code != 200: raise IntermittentError(message="The data service sent an unexpected response, please try again later") provider = res.json() diff --git a/components/renku_data_services/notebooks/api/classes/k8s_client.py b/components/renku_data_services/notebooks/api/classes/k8s_client.py index ffbc5d6ca..5778c4ec1 100644 --- a/components/renku_data_services/notebooks/api/classes/k8s_client.py +++ b/components/renku_data_services/notebooks/api/classes/k8s_client.py @@ -23,7 +23,6 @@ PatchServerError, ) from renku_data_services.notebooks.errors.programming import ProgrammingError -from renku_data_services.notebooks.errors.user import MissingResourceError from renku_data_services.notebooks.util.kubernetes_ import find_env_var from renku_data_services.notebooks.util.retries import ( retry_with_exponential_backoff_async, @@ -351,7 +350,7 @@ class ServerCache(Generic[_SessionType]): def __init__(self, url: str, server_type: type[_SessionType]): self.url = url - self.client = httpx.AsyncClient() + self.client = httpx.AsyncClient(timeout=10) self.server_type: type[_SessionType] = server_type self.url_path_name = "servers" if server_type == AmaltheaSessionV1Alpha1: @@ -449,7 +448,7 @@ async def get_server_logs( server = await self.get_server(server_name, safe_username) if not server: raise errors.MissingResourceError( - message=f"Cannot find server {server_name} for user " f"{safe_username} to retrieve logs." + message=f"Cannot find server {server_name} for user {safe_username} to retrieve logs." ) pod_name = f"{server_name}-0" return await self.renku_ns_client.get_pod_logs(pod_name, max_log_lines) @@ -474,8 +473,8 @@ async def patch_server( """Patch a server.""" server = await self.get_server(server_name, safe_username) if not server: - raise MissingResourceError( - f"Cannot find server {server_name} for user " f"{safe_username} in order to patch it." + raise errors.MissingResourceError( + message=f"Cannot find server {server_name} for user {safe_username} in order to patch it." ) return await self.renku_ns_client.patch_server(server_name=server_name, patch=patch) @@ -491,7 +490,7 @@ async def delete_server(self, server_name: str, safe_username: str) -> None: server = await self.get_server(server_name, safe_username) if not server: raise errors.MissingResourceError( - message=f"Cannot find server {server_name} for user " f"{safe_username} in order to delete it." + message=f"Cannot find server {server_name} for user {safe_username} in order to delete it." ) return await self.renku_ns_client.delete_server(server_name) diff --git a/components/renku_data_services/notebooks/api/schemas/cloud_storage.py b/components/renku_data_services/notebooks/api/schemas/cloud_storage.py index 11ee5a5ca..902d363e4 100644 --- a/components/renku_data_services/notebooks/api/schemas/cloud_storage.py +++ b/components/renku_data_services/notebooks/api/schemas/cloud_storage.py @@ -3,7 +3,7 @@ from configparser import ConfigParser from io import StringIO from pathlib import PurePosixPath -from typing import Any, Final, Optional, Self +from typing import Any, Final, Optional, Protocol, Self from kubernetes import client from marshmallow import EXCLUDE, Schema, ValidationError, fields, validates_schema @@ -36,6 +36,15 @@ def validate_storage(self, data: dict, **kwargs: dict) -> None: raise ValidationError("'storage_id' cannot be used together with 'source_path' or 'target_path'") +class RCloneStorageRequestOverride(Protocol): + """A small dataclass for handling overrides to the data connector requests.""" + + source_path: str | None = None + target_path: str | None = None + configuration: dict[str, Any] | None = None + readonly: bool | None = None + + class RCloneStorage(ICloudStorageRequest): """RClone based storage.""" @@ -221,6 +230,17 @@ def _stringify(value: Any) -> str: parser.write(stringio) return stringio.getvalue() + def with_override(self, override: RCloneStorageRequestOverride) -> "RCloneStorage": + """Override certain fields on the storage.""" + return RCloneStorage( + source_path=override.source_path if override.source_path else self.source_path, + mount_folder=override.target_path if override.target_path else self.mount_folder, + readonly=override.readonly if override.readonly is not None else self.readonly, + configuration=override.configuration if override.configuration else self.configuration, + name=self.name, + config=self.config, + ) + class LaunchNotebookResponseCloudStorage(RCloneStorageRequest): """Notebook launch response with cloud storage attached.""" diff --git a/components/renku_data_services/notebooks/apispec.py b/components/renku_data_services/notebooks/apispec.py index 00d7a3948..860256726 100644 --- a/components/renku_data_services/notebooks/apispec.py +++ b/components/renku_data_services/notebooks/apispec.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: api.spec.yaml -# timestamp: 2024-10-07T22:25:48+00:00 +# timestamp: 2024-10-28T16:00:32+00:00 from __future__ import annotations @@ -259,10 +259,10 @@ class SessionLogsResponse(RootModel[Optional[Dict[str, str]]]): class SessionCloudStoragePost(BaseAPISpec): - configuration: Dict[str, Any] - readonly: bool = True - source_path: str - target_path: str + configuration: Optional[Dict[str, Any]] = None + readonly: Optional[bool] = None + source_path: Optional[str] = None + target_path: Optional[str] = None storage_id: str = Field( ..., description="ULID identifier", diff --git a/components/renku_data_services/notebooks/blueprints.py b/components/renku_data_services/notebooks/blueprints.py index a43243876..b2f27d083 100644 --- a/components/renku_data_services/notebooks/blueprints.py +++ b/components/renku_data_services/notebooks/blueprints.py @@ -3,9 +3,11 @@ import base64 import os from dataclasses import dataclass +from pathlib import PurePosixPath from typing import Any from urllib.parse import urljoin, urlparse +import httpx from kubernetes.client import V1ObjectMeta, V1Secret from sanic import Request, empty, exceptions, json from sanic.response import HTTPResponse, JSONResponse @@ -19,6 +21,13 @@ from renku_data_services.base_api.blueprint import BlueprintFactoryResponse, CustomBlueprint from renku_data_services.base_models import AnonymousAPIUser, APIUser, AuthenticatedAPIUser, Authenticator from renku_data_services.crc.db import ResourcePoolRepository +from renku_data_services.crc.models import GpuKind +from renku_data_services.data_connectors.db import ( + DataConnectorProjectLinkRepository, + DataConnectorRepository, + DataConnectorSecretRepository, +) +from renku_data_services.data_connectors.models import DataConnectorSecret from renku_data_services.errors import errors from renku_data_services.notebooks import apispec, core from renku_data_services.notebooks.api.amalthea_patches import git_proxy, init_containers @@ -32,6 +41,7 @@ ) from renku_data_services.notebooks.config import NotebooksConfig from renku_data_services.notebooks.crs import ( + Affinity, AmaltheaSessionSpec, AmaltheaSessionV1Alpha1, Authentication, @@ -54,11 +64,17 @@ State, Storage, TlsSecret, + Toleration, ) from renku_data_services.notebooks.errors.intermittent import AnonymousUserPatchError from renku_data_services.notebooks.util.kubernetes_ import ( renku_2_make_server_name, ) +from renku_data_services.notebooks.utils import ( + merge_node_affinities, + node_affinity_from_resource_class, + tolerations_from_resource_class, +) from renku_data_services.project.db import ProjectRepository from renku_data_services.repositories.db import GitRepositoriesRepository from renku_data_services.session.db import SessionRepository @@ -241,6 +257,9 @@ class NotebooksNewBP(CustomBlueprint): session_repo: SessionRepository rp_repo: ResourcePoolRepository storage_repo: StorageRepository + data_connector_repo: DataConnectorRepository + data_connector_project_link_repo: DataConnectorProjectLinkRepository + data_connector_secret_repo: DataConnectorSecretRepository def start(self) -> BlueprintFactoryResponse: """Start a session with the new operator.""" @@ -257,7 +276,7 @@ async def _handler( launcher = await self.session_repo.get_launcher(user, ULID.from_str(body.launcher_id)) project = await self.project_repo.get_project(user=user, project_id=launcher.project_id) server_name = renku_2_make_server_name( - safe_username=user.id, project_id=str(launcher.project_id), launcher_id=body.launcher_id + user=user, project_id=str(launcher.project_id), launcher_id=body.launcher_id ) existing_session = await self.nb_config.k8s_v2_client.get_server(server_name, user.id) if existing_session is not None and existing_session.spec is not None: @@ -269,7 +288,9 @@ async def _handler( raise errors.ProgrammingError(message="The default resource class has to have an ID", quiet=True) resource_class_id = body.resource_class_id or default_resource_class.id await self.nb_config.crc_validator.validate_class_storage(user, resource_class_id, body.disk_storage) + resource_class = await self.rp_repo.get_resource_class(user, resource_class_id) work_dir = environment.working_directory + # TODO: Wait for pitch on users secrets to implement this # user_secrets: K8sUserSecrets | None = None # if body.user_secrets: # user_secrets = K8sUserSecrets( @@ -277,51 +298,51 @@ async def _handler( # user_secret_ids=body.user_secrets.user_secret_ids, # mount_path=body.user_secrets.mount_path, # ) - cloud_storages_db = await self.storage_repo.get_storage( - user=user, project_id=project.id, include_secrets=True - ) - cloud_storage: dict[str, RCloneStorage] = { - str(s.storage_id): RCloneStorage( - source_path=s.source_path, - mount_folder=(work_dir / s.target_path).as_posix(), - configuration=s.configuration.model_dump(mode="python"), - readonly=s.readonly, - config=self.nb_config, - name=s.name, - ) - for s in cloud_storages_db - } - cloud_storage_request: dict[str, RCloneStorage] = { - s.storage_id: RCloneStorage( - source_path=s.source_path, - mount_folder=(work_dir / s.target_path).as_posix(), - configuration=s.configuration, - readonly=s.readonly, + data_connectors_stream = self.data_connector_secret_repo.get_data_connectors_with_secrets(user, project.id) + dcs: dict[str, RCloneStorage] = {} + dcs_secrets: dict[str, list[DataConnectorSecret]] = {} + async for dc in data_connectors_stream: + dcs[str(dc.data_connector.id)] = RCloneStorage( + source_path=dc.data_connector.storage.source_path, + mount_folder=dc.data_connector.storage.target_path + if PurePosixPath(dc.data_connector.storage.target_path).is_absolute() + else (work_dir / dc.data_connector.storage.target_path).as_posix(), + configuration=dc.data_connector.storage.configuration, + readonly=dc.data_connector.storage.readonly, config=self.nb_config, - name=None, + name=dc.data_connector.name, ) - for s in body.cloudstorage or [] - } + if len(dc.secrets) > 0: + dcs_secrets[str(dc.data_connector.id)] = dc.secrets # NOTE: Check the cloud storage in the request body and if any match # then overwrite the projects cloud storages - # NOTE: Cloud storages in the session launch request body that are not form the DB will cause a 422 error - for csr_id, csr in cloud_storage_request.items(): - if csr_id not in cloud_storage: + # NOTE: Cloud storages in the session launch request body that are not from the DB will cause a 404 error + # NOTE: Overriding the configuration when a saved secret is there will cause a 422 error + cloud_storage_overrides = body.cloudstorage or [] + for csr in cloud_storage_overrides: + csr_id = csr.storage_id + if csr_id not in dcs: raise errors.MissingResourceError( message=f"You have requested a cloud storage with ID {csr_id} which does not exist " "or you dont have access to.", quiet=True, ) - cloud_storage[csr_id] = csr + if csr.target_path is not None and not PurePosixPath(csr.target_path).is_absolute(): + csr.target_path = (work_dir / csr.target_path).as_posix() + dcs[csr_id] = dcs[csr_id].with_override(csr) repositories = [Repository(url=i) for i in project.repositories] secrets_to_create: list[V1Secret] = [] # Generate the cloud starge secrets data_sources: list[DataSource] = [] - for ics, cs in enumerate(cloud_storage.values()): - secret_name = f"{server_name}-ds-{ics}" + for cs_id, cs in dcs.items(): + secret_name = f"{server_name}-ds-{cs_id.lower()}" secrets_to_create.append(cs.secret(secret_name, self.nb_config.k8s_client.preferred_namespace)) data_sources.append( - DataSource(mountPath=cs.mount_folder, secretRef=SecretRefWhole(name=secret_name, adopt=True)) + DataSource( + mountPath=cs.mount_folder, + secretRef=SecretRefWhole(name=secret_name, adopt=True), + accessMode="ReadOnlyMany" if cs.readonly else "ReadWriteOnce", + ) ) cert_init, cert_vols = init_containers.certificates_container(self.nb_config) session_init_containers = [InitContainer.model_validate(self.nb_config.k8s_v2_client.sanitize(cert_init))] @@ -365,6 +386,22 @@ async def _handler( "renku.io/launcher_id": body.launcher_id, "renku.io/resource_class_id": str(body.resource_class_id or default_resource_class.id), } + requests: dict[str, str | int] = { + "cpu": str(round(resource_class.cpu * 1000)) + "m", + "memory": resource_class.memory, + } + if resource_class.gpu > 0: + gpu_name = GpuKind.NVIDIA.value + "/gpu" + requests[gpu_name] = resource_class.gpu + tolerations = [ + Toleration.model_validate(toleration) for toleration in self.nb_config.sessions.tolerations + ] + tolerations_from_resource_class(resource_class) + affinity = Affinity.model_validate(self.nb_config.sessions.affinity) + rc_node_affinity = node_affinity_from_resource_class(resource_class) + if affinity.nodeAffinity: + affinity.nodeAffinity = merge_node_affinities(affinity.nodeAffinity, rc_node_affinity) + else: + affinity.nodeAffinity = rc_node_affinity manifest = AmaltheaSessionV1Alpha1( metadata=Metadata(name=server_name, annotations=annotations), spec=AmaltheaSessionSpec( @@ -382,7 +419,7 @@ async def _handler( workingDir=environment.working_directory.as_posix(), runAsUser=environment.uid, runAsGroup=environment.gid, - resources=Resources(claims=None, requests=None, limits=None), + resources=Resources(requests=requests), extraVolumeMounts=[], command=environment.command, args=environment.args, @@ -423,6 +460,8 @@ async def _handler( else [], ), dataSources=data_sources, + tolerations=tolerations, + affinity=affinity, ), ) parsed_proxy_url = urlparse(urljoin(base_server_url + "/", "oauth2")) @@ -462,6 +501,39 @@ async def _handler( for s in secrets_to_create: await self.nb_config.k8s_v2_client.delete_secret(s.metadata.name) raise errors.ProgrammingError(message="Could not start the amalthea session") + else: + owner_reference = { + "apiVersion": manifest.apiVersion, + "kind": manifest.kind, + "name": manifest.metadata.name, + "uid": manifest.metadata.uid, + } + secrets_url = self.nb_config.user_secrets.secrets_storage_service_url + "/api/secrets/kubernetes" + headers = {"Authorization": f"bearer {user.access_token}"} + try: + for s_id, secrets in dcs_secrets.items(): + if len(secrets) == 0: + continue + request_data = { + "name": f"{server_name}-ds-{s_id.lower()}-secrets", + "namespace": self.nb_config.k8s_v2_client.preferred_namespace, + "secret_ids": [str(secret.secret_id) for secret in secrets], + "owner_references": [owner_reference], + "key_mapping": {str(secret.secret_id): secret.name for secret in secrets}, + } + async with httpx.AsyncClient(timeout=10) as client: + await client.post(secrets_url, headers=headers, json=request_data) + res = await client.post(secrets_url, headers=headers, json=request_data) + if res.status_code >= 300 or res.status_code < 200: + raise errors.ProgrammingError( + message=f"The secret for data connector with {s_id} could not be " + f"successfully created, the status code was {res.status_code}." + "Please contact a Renku administrator.", + detail=res.text, + ) + except Exception: + await self.nb_config.k8s_v2_client.delete_server(server_name, user.id) + raise return json(manifest.as_apispec().model_dump(mode="json", exclude_none=True), 201) diff --git a/components/renku_data_services/notebooks/config/dynamic.py b/components/renku_data_services/notebooks/config/dynamic.py index 0e02ab29e..81719dde9 100644 --- a/components/renku_data_services/notebooks/config/dynamic.py +++ b/components/renku_data_services/notebooks/config/dynamic.py @@ -405,13 +405,13 @@ def from_env(cls) -> Self: storage=_SessionStorageConfig.from_env(), containers=_SessionContainers.from_env(), ssh=_SessionSshConfig.from_env(), - default_image=os.environ.get("", "renku/singleuser:latest"), - enforce_cpu_limits=CPUEnforcement(os.environ.get("", "off")), + default_image=os.environ.get("NB_SESSIONS__DEFAULT_IMAGE", "renku/singleuser:latest"), + enforce_cpu_limits=CPUEnforcement(os.environ.get("NB_SESSIONS__ENFORCE_CPU_LIMITS", "off")), termination_warning_duration_seconds=_parse_value_as_int(os.environ.get("", 12 * 60 * 60)), image_default_workdir="/home/jovyan", - node_selector=yaml.safe_load(StringIO(os.environ.get("", "{}"))), - affinity=yaml.safe_load(StringIO(os.environ.get("", "{}"))), - tolerations=yaml.safe_load(StringIO(os.environ.get("", "[]"))), + node_selector=yaml.safe_load(StringIO(os.environ.get("NB_SESSIONS__NODE_SELECTOR", "{}"))), + affinity=yaml.safe_load(StringIO(os.environ.get("NB_SESSIONS__AFFINITY", "{}"))), + tolerations=yaml.safe_load(StringIO(os.environ.get("NB_SESSIONS__TOLERATIONS", "[]"))), ) @classmethod diff --git a/components/renku_data_services/notebooks/core.py b/components/renku_data_services/notebooks/core.py index 7ac4d8119..c4ece4736 100644 --- a/components/renku_data_services/notebooks/core.py +++ b/components/renku_data_services/notebooks/core.py @@ -550,7 +550,7 @@ async def launch_notebook( """Starts a server.""" server_name = renku_2_make_server_name( - safe_username=user.id, project_id=launch_request.project_id, launcher_id=launch_request.launcher_id + user=user, project_id=launch_request.project_id, launcher_id=launch_request.launcher_id ) return await launch_notebook_helper( nb_config=config, diff --git a/components/renku_data_services/notebooks/cr_amalthea_session.py b/components/renku_data_services/notebooks/cr_amalthea_session.py index a4c2e3fd9..df79c7db3 100644 --- a/components/renku_data_services/notebooks/cr_amalthea_session.py +++ b/components/renku_data_services/notebooks/cr_amalthea_session.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: -# timestamp: 2024-09-04T21:22:45+00:00 +# timestamp: 2024-10-24T01:41:50+00:00 from __future__ import annotations @@ -12,6 +12,404 @@ from renku_data_services.notebooks.cr_base import BaseCRD +class MatchExpression(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + key: str = Field(..., description="The label key that the selector applies to.") + operator: str = Field( + ..., + description="Represents a key's relationship to a set of values.\nValid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt.", + ) + values: Optional[List[str]] = Field( + default=None, + description="An array of string values. If the operator is In or NotIn,\nthe values array must be non-empty. If the operator is Exists or DoesNotExist,\nthe values array must be empty. If the operator is Gt or Lt, the values\narray must have a single element, which will be interpreted as an integer.\nThis array is replaced during a strategic merge patch.", + ) + + +class MatchField(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + key: str = Field(..., description="The label key that the selector applies to.") + operator: str = Field( + ..., + description="Represents a key's relationship to a set of values.\nValid operators are In, NotIn, Exists, DoesNotExist. Gt, and Lt.", + ) + values: Optional[List[str]] = Field( + default=None, + description="An array of string values. If the operator is In or NotIn,\nthe values array must be non-empty. If the operator is Exists or DoesNotExist,\nthe values array must be empty. If the operator is Gt or Lt, the values\narray must have a single element, which will be interpreted as an integer.\nThis array is replaced during a strategic merge patch.", + ) + + +class Preference(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + matchExpressions: Optional[List[MatchExpression]] = Field( + default=None, + description="A list of node selector requirements by node's labels.", + ) + matchFields: Optional[List[MatchField]] = Field( + default=None, + description="A list of node selector requirements by node's fields.", + ) + + +class PreferredDuringSchedulingIgnoredDuringExecutionItem(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + preference: Preference = Field( + ..., + description="A node selector term, associated with the corresponding weight.", + ) + weight: int = Field( + ..., + description="Weight associated with matching the corresponding nodeSelectorTerm, in the range 1-100.", + ) + + +class NodeSelectorTerm(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + matchExpressions: Optional[List[MatchExpression]] = Field( + default=None, + description="A list of node selector requirements by node's labels.", + ) + matchFields: Optional[List[MatchField]] = Field( + default=None, + description="A list of node selector requirements by node's fields.", + ) + + +class RequiredDuringSchedulingIgnoredDuringExecution(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + nodeSelectorTerms: List[NodeSelectorTerm] = Field( + ..., description="Required. A list of node selector terms. The terms are ORed." + ) + + +class NodeAffinity(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + preferredDuringSchedulingIgnoredDuringExecution: Optional[ + List[PreferredDuringSchedulingIgnoredDuringExecutionItem] + ] = Field( + default=None, + description='The scheduler will prefer to schedule pods to nodes that satisfy\nthe affinity expressions specified by this field, but it may choose\na node that violates one or more of the expressions. The node that is\nmost preferred is the one with the greatest sum of weights, i.e.\nfor each node that meets all of the scheduling requirements (resource\nrequest, requiredDuringScheduling affinity expressions, etc.),\ncompute a sum by iterating through the elements of this field and adding\n"weight" to the sum if the node matches the corresponding matchExpressions; the\nnode(s) with the highest sum are the most preferred.', + ) + requiredDuringSchedulingIgnoredDuringExecution: Optional[ + RequiredDuringSchedulingIgnoredDuringExecution + ] = Field( + default=None, + description="If the affinity requirements specified by this field are not met at\nscheduling time, the pod will not be scheduled onto the node.\nIf the affinity requirements specified by this field cease to be met\nat some point during pod execution (e.g. due to an update), the system\nmay or may not try to eventually evict the pod from its node.", + ) + + +class MatchExpression2(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + key: str = Field( + ..., description="key is the label key that the selector applies to." + ) + operator: str = Field( + ..., + description="operator represents a key's relationship to a set of values.\nValid operators are In, NotIn, Exists and DoesNotExist.", + ) + values: Optional[List[str]] = Field( + default=None, + description="values is an array of string values. If the operator is In or NotIn,\nthe values array must be non-empty. If the operator is Exists or DoesNotExist,\nthe values array must be empty. This array is replaced during a strategic\nmerge patch.", + ) + + +class LabelSelector(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + matchExpressions: Optional[List[MatchExpression2]] = Field( + default=None, + description="matchExpressions is a list of label selector requirements. The requirements are ANDed.", + ) + matchLabels: Optional[Dict[str, str]] = Field( + default=None, + description='matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels\nmap is equivalent to an element of matchExpressions, whose key field is "key", the\noperator is "In", and the values array contains only "value". The requirements are ANDed.', + ) + + +class NamespaceSelector(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + matchExpressions: Optional[List[MatchExpression2]] = Field( + default=None, + description="matchExpressions is a list of label selector requirements. The requirements are ANDed.", + ) + matchLabels: Optional[Dict[str, str]] = Field( + default=None, + description='matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels\nmap is equivalent to an element of matchExpressions, whose key field is "key", the\noperator is "In", and the values array contains only "value". The requirements are ANDed.', + ) + + +class PodAffinityTerm(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + labelSelector: Optional[LabelSelector] = Field( + default=None, + description="A label query over a set of resources, in this case pods.", + ) + namespaceSelector: Optional[NamespaceSelector] = Field( + default=None, + description='A label query over the set of namespaces that the term applies to.\nThe term is applied to the union of the namespaces selected by this field\nand the ones listed in the namespaces field.\nnull selector and null or empty namespaces list means "this pod\'s namespace".\nAn empty selector ({}) matches all namespaces.', + ) + namespaces: Optional[List[str]] = Field( + default=None, + description='namespaces specifies a static list of namespace names that the term applies to.\nThe term is applied to the union of the namespaces listed in this field\nand the ones selected by namespaceSelector.\nnull or empty namespaces list and null namespaceSelector means "this pod\'s namespace".', + ) + topologyKey: str = Field( + ..., + description="This pod should be co-located (affinity) or not co-located (anti-affinity) with the pods matching\nthe labelSelector in the specified namespaces, where co-located is defined as running on a node\nwhose value of the label with key topologyKey matches that of any node on which any of the\nselected pods is running.\nEmpty topologyKey is not allowed.", + ) + + +class PreferredDuringSchedulingIgnoredDuringExecutionItem1(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + podAffinityTerm: PodAffinityTerm = Field( + ..., + description="Required. A pod affinity term, associated with the corresponding weight.", + ) + weight: int = Field( + ..., + description="weight associated with matching the corresponding podAffinityTerm,\nin the range 1-100.", + ) + + +class LabelSelector1(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + matchExpressions: Optional[List[MatchExpression2]] = Field( + default=None, + description="matchExpressions is a list of label selector requirements. The requirements are ANDed.", + ) + matchLabels: Optional[Dict[str, str]] = Field( + default=None, + description='matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels\nmap is equivalent to an element of matchExpressions, whose key field is "key", the\noperator is "In", and the values array contains only "value". The requirements are ANDed.', + ) + + +class NamespaceSelector1(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + matchExpressions: Optional[List[MatchExpression2]] = Field( + default=None, + description="matchExpressions is a list of label selector requirements. The requirements are ANDed.", + ) + matchLabels: Optional[Dict[str, str]] = Field( + default=None, + description='matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels\nmap is equivalent to an element of matchExpressions, whose key field is "key", the\noperator is "In", and the values array contains only "value". The requirements are ANDed.', + ) + + +class RequiredDuringSchedulingIgnoredDuringExecutionItem(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + labelSelector: Optional[LabelSelector1] = Field( + default=None, + description="A label query over a set of resources, in this case pods.", + ) + namespaceSelector: Optional[NamespaceSelector1] = Field( + default=None, + description='A label query over the set of namespaces that the term applies to.\nThe term is applied to the union of the namespaces selected by this field\nand the ones listed in the namespaces field.\nnull selector and null or empty namespaces list means "this pod\'s namespace".\nAn empty selector ({}) matches all namespaces.', + ) + namespaces: Optional[List[str]] = Field( + default=None, + description='namespaces specifies a static list of namespace names that the term applies to.\nThe term is applied to the union of the namespaces listed in this field\nand the ones selected by namespaceSelector.\nnull or empty namespaces list and null namespaceSelector means "this pod\'s namespace".', + ) + topologyKey: str = Field( + ..., + description="This pod should be co-located (affinity) or not co-located (anti-affinity) with the pods matching\nthe labelSelector in the specified namespaces, where co-located is defined as running on a node\nwhose value of the label with key topologyKey matches that of any node on which any of the\nselected pods is running.\nEmpty topologyKey is not allowed.", + ) + + +class PodAffinity(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + preferredDuringSchedulingIgnoredDuringExecution: Optional[ + List[PreferredDuringSchedulingIgnoredDuringExecutionItem1] + ] = Field( + default=None, + description='The scheduler will prefer to schedule pods to nodes that satisfy\nthe affinity expressions specified by this field, but it may choose\na node that violates one or more of the expressions. The node that is\nmost preferred is the one with the greatest sum of weights, i.e.\nfor each node that meets all of the scheduling requirements (resource\nrequest, requiredDuringScheduling affinity expressions, etc.),\ncompute a sum by iterating through the elements of this field and adding\n"weight" to the sum if the node has pods which matches the corresponding podAffinityTerm; the\nnode(s) with the highest sum are the most preferred.', + ) + requiredDuringSchedulingIgnoredDuringExecution: Optional[ + List[RequiredDuringSchedulingIgnoredDuringExecutionItem] + ] = Field( + default=None, + description="If the affinity requirements specified by this field are not met at\nscheduling time, the pod will not be scheduled onto the node.\nIf the affinity requirements specified by this field cease to be met\nat some point during pod execution (e.g. due to a pod label update), the\nsystem may or may not try to eventually evict the pod from its node.\nWhen there are multiple elements, the lists of nodes corresponding to each\npodAffinityTerm are intersected, i.e. all terms must be satisfied.", + ) + + +class LabelSelector2(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + matchExpressions: Optional[List[MatchExpression2]] = Field( + default=None, + description="matchExpressions is a list of label selector requirements. The requirements are ANDed.", + ) + matchLabels: Optional[Dict[str, str]] = Field( + default=None, + description='matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels\nmap is equivalent to an element of matchExpressions, whose key field is "key", the\noperator is "In", and the values array contains only "value". The requirements are ANDed.', + ) + + +class NamespaceSelector2(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + matchExpressions: Optional[List[MatchExpression2]] = Field( + default=None, + description="matchExpressions is a list of label selector requirements. The requirements are ANDed.", + ) + matchLabels: Optional[Dict[str, str]] = Field( + default=None, + description='matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels\nmap is equivalent to an element of matchExpressions, whose key field is "key", the\noperator is "In", and the values array contains only "value". The requirements are ANDed.', + ) + + +class PodAffinityTerm1(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + labelSelector: Optional[LabelSelector2] = Field( + default=None, + description="A label query over a set of resources, in this case pods.", + ) + namespaceSelector: Optional[NamespaceSelector2] = Field( + default=None, + description='A label query over the set of namespaces that the term applies to.\nThe term is applied to the union of the namespaces selected by this field\nand the ones listed in the namespaces field.\nnull selector and null or empty namespaces list means "this pod\'s namespace".\nAn empty selector ({}) matches all namespaces.', + ) + namespaces: Optional[List[str]] = Field( + default=None, + description='namespaces specifies a static list of namespace names that the term applies to.\nThe term is applied to the union of the namespaces listed in this field\nand the ones selected by namespaceSelector.\nnull or empty namespaces list and null namespaceSelector means "this pod\'s namespace".', + ) + topologyKey: str = Field( + ..., + description="This pod should be co-located (affinity) or not co-located (anti-affinity) with the pods matching\nthe labelSelector in the specified namespaces, where co-located is defined as running on a node\nwhose value of the label with key topologyKey matches that of any node on which any of the\nselected pods is running.\nEmpty topologyKey is not allowed.", + ) + + +class PreferredDuringSchedulingIgnoredDuringExecutionItem2(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + podAffinityTerm: PodAffinityTerm1 = Field( + ..., + description="Required. A pod affinity term, associated with the corresponding weight.", + ) + weight: int = Field( + ..., + description="weight associated with matching the corresponding podAffinityTerm,\nin the range 1-100.", + ) + + +class LabelSelector3(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + matchExpressions: Optional[List[MatchExpression2]] = Field( + default=None, + description="matchExpressions is a list of label selector requirements. The requirements are ANDed.", + ) + matchLabels: Optional[Dict[str, str]] = Field( + default=None, + description='matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels\nmap is equivalent to an element of matchExpressions, whose key field is "key", the\noperator is "In", and the values array contains only "value". The requirements are ANDed.', + ) + + +class NamespaceSelector3(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + matchExpressions: Optional[List[MatchExpression2]] = Field( + default=None, + description="matchExpressions is a list of label selector requirements. The requirements are ANDed.", + ) + matchLabels: Optional[Dict[str, str]] = Field( + default=None, + description='matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels\nmap is equivalent to an element of matchExpressions, whose key field is "key", the\noperator is "In", and the values array contains only "value". The requirements are ANDed.', + ) + + +class RequiredDuringSchedulingIgnoredDuringExecutionItem1(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + labelSelector: Optional[LabelSelector3] = Field( + default=None, + description="A label query over a set of resources, in this case pods.", + ) + namespaceSelector: Optional[NamespaceSelector3] = Field( + default=None, + description='A label query over the set of namespaces that the term applies to.\nThe term is applied to the union of the namespaces selected by this field\nand the ones listed in the namespaces field.\nnull selector and null or empty namespaces list means "this pod\'s namespace".\nAn empty selector ({}) matches all namespaces.', + ) + namespaces: Optional[List[str]] = Field( + default=None, + description='namespaces specifies a static list of namespace names that the term applies to.\nThe term is applied to the union of the namespaces listed in this field\nand the ones selected by namespaceSelector.\nnull or empty namespaces list and null namespaceSelector means "this pod\'s namespace".', + ) + topologyKey: str = Field( + ..., + description="This pod should be co-located (affinity) or not co-located (anti-affinity) with the pods matching\nthe labelSelector in the specified namespaces, where co-located is defined as running on a node\nwhose value of the label with key topologyKey matches that of any node on which any of the\nselected pods is running.\nEmpty topologyKey is not allowed.", + ) + + +class PodAntiAffinity(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + preferredDuringSchedulingIgnoredDuringExecution: Optional[ + List[PreferredDuringSchedulingIgnoredDuringExecutionItem2] + ] = Field( + default=None, + description='The scheduler will prefer to schedule pods to nodes that satisfy\nthe anti-affinity expressions specified by this field, but it may choose\na node that violates one or more of the expressions. The node that is\nmost preferred is the one with the greatest sum of weights, i.e.\nfor each node that meets all of the scheduling requirements (resource\nrequest, requiredDuringScheduling anti-affinity expressions, etc.),\ncompute a sum by iterating through the elements of this field and adding\n"weight" to the sum if the node has pods which matches the corresponding podAffinityTerm; the\nnode(s) with the highest sum are the most preferred.', + ) + requiredDuringSchedulingIgnoredDuringExecution: Optional[ + List[RequiredDuringSchedulingIgnoredDuringExecutionItem1] + ] = Field( + default=None, + description="If the anti-affinity requirements specified by this field are not met at\nscheduling time, the pod will not be scheduled onto the node.\nIf the anti-affinity requirements specified by this field cease to be met\nat some point during pod execution (e.g. due to a pod label update), the\nsystem may or may not try to eventually evict the pod from its node.\nWhen there are multiple elements, the lists of nodes corresponding to each\npodAffinityTerm are intersected, i.e. all terms must be satisfied.", + ) + + +class Affinity(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + nodeAffinity: Optional[NodeAffinity] = Field( + default=None, + description="Describes node affinity scheduling rules for the pod.", + ) + podAffinity: Optional[PodAffinity] = Field( + default=None, + description="Describes pod affinity scheduling rules (e.g. co-locate this pod in the same node, zone, etc. as some other pod(s)).", + ) + podAntiAffinity: Optional[PodAntiAffinity] = Field( + default=None, + description="Describes pod anti-affinity scheduling rules (e.g. avoid putting this pod in the same node, zone, etc. as some other pod(s)).", + ) + + class ExtraVolumeMount(BaseCRD): model_config = ConfigDict( extra="allow", @@ -1299,28 +1697,11 @@ class Resources1(BaseCRD): ) -class MatchExpression(BaseCRD): - model_config = ConfigDict( - extra="allow", - ) - key: str = Field( - ..., description="key is the label key that the selector applies to." - ) - operator: str = Field( - ..., - description="operator represents a key's relationship to a set of values.\nValid operators are In, NotIn, Exists and DoesNotExist.", - ) - values: Optional[List[str]] = Field( - default=None, - description="values is an array of string values. If the operator is In or NotIn,\nthe values array must be non-empty. If the operator is Exists or DoesNotExist,\nthe values array must be empty. This array is replaced during a strategic\nmerge patch.", - ) - - class Selector(BaseCRD): model_config = ConfigDict( extra="allow", ) - matchExpressions: Optional[List[MatchExpression]] = Field( + matchExpressions: Optional[List[MatchExpression2]] = Field( default=None, description="matchExpressions is a list of label selector requirements. The requirements are ANDed.", ) @@ -2644,6 +3025,12 @@ class InitContainer(BaseCRD): ) +class ReconcileSrategy(Enum): + never = "never" + always = "always" + whenFailedOrHibernated = "whenFailedOrHibernated" + + class ValueFrom2(BaseCRD): model_config = ConfigDict( extra="allow", @@ -2740,7 +3127,7 @@ class Session(BaseCRD): ) runAsGroup: int = Field( default=1000, - description="The group is set on the session and this value is also set as the fsgroup for the whole pod and all session\ncontianers.", + description="The group is set on the session and this value is also set as the fsgroup for the whole pod and all session\ncontainers.", ge=0, ) runAsUser: int = Field(default=1000, ge=0) @@ -2758,10 +3145,40 @@ class Session(BaseCRD): ) +class Toleration(BaseCRD): + model_config = ConfigDict( + extra="allow", + ) + effect: Optional[str] = Field( + default=None, + description="Effect indicates the taint effect to match. Empty means match all taint effects.\nWhen specified, allowed values are NoSchedule, PreferNoSchedule and NoExecute.", + ) + key: Optional[str] = Field( + default=None, + description="Key is the taint key that the toleration applies to. Empty means match all taint keys.\nIf the key is empty, operator must be Exists; this combination means to match all values and all keys.", + ) + operator: Optional[str] = Field( + default=None, + description="Operator represents a key's relationship to the value.\nValid operators are Exists and Equal. Defaults to Equal.\nExists is equivalent to wildcard for value, so that a pod can\ntolerate all taints of a particular category.", + ) + tolerationSeconds: Optional[int] = Field( + default=None, + description="TolerationSeconds represents the period of time the toleration (which must be\nof effect NoExecute, otherwise this field is ignored) tolerates the taint. By default,\nit is not set, which means tolerate the taint forever (do not evict). Zero and\nnegative values will be treated as 0 (evict immediately) by the system.", + ) + value: Optional[str] = Field( + default=None, + description="Value is the taint value the toleration matches to.\nIf the operator is Exists, the value should be empty, otherwise just a regular string.", + ) + + class Spec(BaseCRD): model_config = ConfigDict( extra="allow", ) + affinity: Optional[Affinity] = Field( + default=None, + description="If specified, the pod's scheduling constraints\nPassed right through to the Statefulset used for the session.", + ) authentication: Optional[Authentication] = Field( default=None, description="Authentication configuration for the session" ) @@ -2782,7 +3199,7 @@ class Spec(BaseCRD): ) extraVolumes: Optional[List[ExtraVolume]] = Field( default=None, - description="Additional volumes to include in the statefulset for a session", + description="Additional volumes to include in the statefulset for a session\nVolumes used internally by amalthea are all prefixed with 'amalthea-' so as long as you\navoid that naming you will avoid conflicts with the volumes that amalthea generates.", ) hibernated: bool = Field( ..., @@ -2796,10 +3213,22 @@ class Spec(BaseCRD): default=None, description="Additional init containers to add to the session statefulset\nNOTE: The container names provided will be partially overwritten and randomized to avoid collisions", ) + nodeSelector: Optional[Dict[str, str]] = Field( + default=None, + description="Selector which must match a node's labels for the pod to be scheduled on that node.\nPassed right through to the Statefulset used for the session.", + ) + reconcileSrategy: ReconcileSrategy = Field( + default="always", + description="Indicates how Amalthea should reconcile the child resources for a session. This can be problematic because\nnewer versions of Amalthea may include new versions of the sidecars or other changes not reflected\nin the AmaltheaSession CRD, so simply updating Amalthea could cause existing sessions to restart\nbecause the sidecars will have a newer image or for other reasons because the code changed.\nHibernating the session and deleting it will always work as expected regardless of the strategy.\nThe status of the session and all hibernation or auto-cleanup functionality will always work as expected.\nA few values are possible:\n- never: Amalthea will never update any of the child resources and will ignore any changes to the CR\n- always: This is the expected method of operation for an operator, changes to the spec are always reconciled\n- whenHibernatedOrFailed: To avoid interrupting a running session, reconciliation of the child components\n are only done when the session has a Failed or Hibernated status", + ) session: Session = Field( ..., description="Specification for the main session container that the user will access and use", ) + tolerations: Optional[List[Toleration]] = Field( + default=None, + description="If specified, the pod's tolerations.\nPassed right through to the Statefulset used for the session.", + ) class Condition(BaseCRD): diff --git a/components/renku_data_services/notebooks/crs.py b/components/renku_data_services/notebooks/crs.py index 8d86ed9ad..8d56af8d9 100644 --- a/components/renku_data_services/notebooks/crs.py +++ b/components/renku_data_services/notebooks/crs.py @@ -2,16 +2,16 @@ from datetime import UTC, datetime from typing import Any, cast -from urllib.parse import urljoin, urlparse, urlunparse +from urllib.parse import urlunparse from kubernetes.utils import parse_duration, parse_quantity from pydantic import BaseModel, Field, field_validator -from sanic.log import logger from ulid import ULID from renku_data_services.errors import errors from renku_data_services.notebooks import apispec from renku_data_services.notebooks.cr_amalthea_session import ( + Affinity, Authentication, CodeRepository, Culling, @@ -21,12 +21,19 @@ ExtraVolumeMount, Ingress, InitContainer, + MatchExpression, + NodeAffinity, + NodeSelectorTerm, + Preference, + PreferredDuringSchedulingIgnoredDuringExecutionItem, + RequiredDuringSchedulingIgnoredDuringExecution, SecretRef, Session, State, Status, Storage, TlsSecret, + Toleration, ) from renku_data_services.notebooks.cr_amalthea_session import EnvItem2 as SessionEnvItem from renku_data_services.notebooks.cr_amalthea_session import Item4 as SecretAsVolumeItem diff --git a/components/renku_data_services/notebooks/util/kubernetes_.py b/components/renku_data_services/notebooks/util/kubernetes_.py index 7cf289c95..75c384242 100644 --- a/components/renku_data_services/notebooks/util/kubernetes_.py +++ b/components/renku_data_services/notebooks/util/kubernetes_.py @@ -18,6 +18,7 @@ from __future__ import annotations +import re from enum import StrEnum from hashlib import md5 from typing import Any, TypeAlias, cast @@ -25,6 +26,7 @@ import escapism from kubernetes.client import V1Container +from renku_data_services.base_models.core import AnonymousAPIUser, AuthenticatedAPIUser, Slug from renku_data_services.notebooks.crs import Patch, PatchType @@ -50,21 +52,24 @@ def renku_1_make_server_name(safe_username: str, namespace: str, project: str, b ) -def renku_2_make_server_name(safe_username: str, project_id: str, launcher_id: str) -> str: +def renku_2_make_server_name(user: AuthenticatedAPIUser | AnonymousAPIUser, project_id: str, launcher_id: str) -> str: """Form a unique server name for Renku 2.0 sessions. This is used in naming all the k8s resources created by amalthea. """ - server_string_for_hashing = f"{safe_username}-{project_id}-{launcher_id}" - server_hash = md5(server_string_for_hashing.encode(), usedforsecurity=False).hexdigest().lower() + safe_username = Slug.from_user(user.email, user.first_name, user.last_name, user.id).value + safe_username = safe_username.lower() + safe_username = re.sub(r"[^a-z0-9-]", "-", safe_username) prefix = _make_server_name_prefix(safe_username) + server_string_for_hashing = f"{user.id}-{project_id}-{launcher_id}" + server_hash = md5(server_string_for_hashing.encode(), usedforsecurity=False).hexdigest().lower() # NOTE: A K8s object name can only contain lowercase alphanumeric characters, hyphens, or dots. # Must be no more than 63 characters because the name is used to create a k8s Service and Services # have more restrictions for their names beacuse their names have to make a valid hostname. # NOTE: We use server name as a label value, so, server name must be less than 63 characters. - # !NOTE: For now we limit the server name to a max of 42 characters. - # NOTE: This is 12 + 9 + 21 = 42 characters - return f"{prefix[:12]}-renku-2-{server_hash[:21]}" + # !NOTE: For now we limit the server name to a max of 25 characters. + # NOTE: This is 12 + 1 + 12 = 25 characters + return f"{prefix[:12]}-{server_hash[:12]}" def find_env_var(container: V1Container, env_name: str) -> tuple[int, str] | None: @@ -84,9 +89,8 @@ def find_env_var(container: V1Container, env_name: str) -> tuple[int, str] | Non def _make_server_name_prefix(safe_username: str) -> str: - safe_username_lowercase = safe_username.lower() prefix = "" - if not safe_username_lowercase[0].isalpha() or not safe_username_lowercase[0].isascii(): + if not safe_username[0].isalpha() or not safe_username[0].isascii(): # NOTE: Username starts with an invalid character. This has to be modified because a # k8s service object cannot start with anything other than a lowercase alphabet character. # NOTE: We do not have worry about collisions with already existing servers from older @@ -95,7 +99,7 @@ def _make_server_name_prefix(safe_username: str) -> str: # is for example 7User vs. n7User. prefix = "n" - prefix = f"{prefix}{safe_username_lowercase}" + prefix = f"{prefix}{safe_username}" return prefix diff --git a/components/renku_data_services/notebooks/utils.py b/components/renku_data_services/notebooks/utils.py new file mode 100644 index 000000000..5c4d457cb --- /dev/null +++ b/components/renku_data_services/notebooks/utils.py @@ -0,0 +1,97 @@ +"""Utilities for notebooks.""" + +import renku_data_services.crc.models as crc_models +from renku_data_services.notebooks.crs import ( + MatchExpression, + NodeAffinity, + NodeSelectorTerm, + Preference, + PreferredDuringSchedulingIgnoredDuringExecutionItem, + RequiredDuringSchedulingIgnoredDuringExecution, + Toleration, +) + + +def merge_node_affinities( + node_affinity1: NodeAffinity, + node_affinity2: NodeAffinity, +) -> NodeAffinity: + """Merge two node affinities into a brand new object.""" + output = NodeAffinity() + if node_affinity1.preferredDuringSchedulingIgnoredDuringExecution: + output.preferredDuringSchedulingIgnoredDuringExecution = ( + node_affinity1.preferredDuringSchedulingIgnoredDuringExecution + ) + if node_affinity2.preferredDuringSchedulingIgnoredDuringExecution: + if output.preferredDuringSchedulingIgnoredDuringExecution: + output.preferredDuringSchedulingIgnoredDuringExecution.extend( + node_affinity2.preferredDuringSchedulingIgnoredDuringExecution + ) + else: + output.preferredDuringSchedulingIgnoredDuringExecution = ( + node_affinity2.preferredDuringSchedulingIgnoredDuringExecution + ) + if ( + node_affinity1.requiredDuringSchedulingIgnoredDuringExecution + and node_affinity1.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms + ): + output.requiredDuringSchedulingIgnoredDuringExecution = RequiredDuringSchedulingIgnoredDuringExecution( + nodeSelectorTerms=node_affinity1.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms + ) + if ( + node_affinity2.requiredDuringSchedulingIgnoredDuringExecution + and node_affinity2.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms + ): + if output.requiredDuringSchedulingIgnoredDuringExecution: + output.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms.extend( + node_affinity2.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms + ) + else: + output.requiredDuringSchedulingIgnoredDuringExecution = RequiredDuringSchedulingIgnoredDuringExecution( + nodeSelectorTerms=(node_affinity2.requiredDuringSchedulingIgnoredDuringExecution.nodeSelectorTerms) + ) + return output + + +def node_affinity_from_resource_class(resource_class: crc_models.ResourceClass) -> NodeAffinity: + """Generate an affinity from the affinities stored in a resource class.""" + output = NodeAffinity() + required_expr = [ + MatchExpression(key=affinity.key, operator="Exists") + for affinity in resource_class.node_affinities + if affinity.required_during_scheduling + ] + preferred_expr = [ + MatchExpression(key=affinity.key, operator="Exists") + for affinity in resource_class.node_affinities + if not affinity.required_during_scheduling + ] + if required_expr: + output.requiredDuringSchedulingIgnoredDuringExecution = RequiredDuringSchedulingIgnoredDuringExecution( + nodeSelectorTerms=[ + # NOTE: Node selector terms are ORed by kubernetes + NodeSelectorTerm( + # NOTE: matchExpression terms are ANDed by kubernetes + matchExpressions=required_expr, + ) + ] + ) + if preferred_expr: + output.preferredDuringSchedulingIgnoredDuringExecution = [ + PreferredDuringSchedulingIgnoredDuringExecutionItem( + weight=1, + preference=Preference( + # NOTE: matchExpression terms are ANDed by kubernetes + matchExpressions=preferred_expr, + ), + ) + ] + return output + + +def tolerations_from_resource_class(resource_class: crc_models.ResourceClass) -> list[Toleration]: + """Generate tolerations from the list of tolerations of a resource class.""" + output: list[Toleration] = [] + for tol in resource_class.tolerations: + output.append(Toleration(key=tol, operator="Exists")) + return output diff --git a/components/renku_data_services/project/blueprints.py b/components/renku_data_services/project/blueprints.py index abb5df59f..aee1508dd 100644 --- a/components/renku_data_services/project/blueprints.py +++ b/components/renku_data_services/project/blueprints.py @@ -13,7 +13,6 @@ from renku_data_services.base_api.auth import ( authenticate, only_authenticated, - validate_path_project_id, validate_path_user_id, ) from renku_data_services.base_api.blueprint import BlueprintFactoryResponse, CustomBlueprint @@ -81,12 +80,11 @@ def get_one(self) -> BlueprintFactoryResponse: """Get a specific project.""" @authenticate(self.authenticator) - @validate_path_project_id @extract_if_none_match async def _get_one( - _: Request, user: base_models.APIUser, project_id: str, etag: str | None + _: Request, user: base_models.APIUser, project_id: ULID, etag: str | None ) -> JSONResponse | HTTPResponse: - project = await self.project_repo.get_project(user=user, project_id=ULID.from_str(project_id)) + project = await self.project_repo.get_project(user=user, project_id=project_id) if project.etag is not None and project.etag == etag: return HTTPResponse(status=304) @@ -205,20 +203,17 @@ async def _delete_member( await self.project_member_repo.delete_members(user, project_id, [member_id]) return HTTPResponse(status=204) - return "/projects//members/", ["DELETE"], _delete_member + return "/projects//members/", ["DELETE"], _delete_member def get_permissions(self) -> BlueprintFactoryResponse: """Get the permissions of the current user on the project.""" @authenticate(self.authenticator) - @validate_path_project_id - async def _get_permissions(_: Request, user: base_models.APIUser, project_id: str) -> JSONResponse: - permissions = await self.project_repo.get_project_permissions( - user=user, project_id=ULID.from_str(project_id) - ) + async def _get_permissions(_: Request, user: base_models.APIUser, project_id: ULID) -> JSONResponse: + permissions = await self.project_repo.get_project_permissions(user=user, project_id=project_id) return validated_json(apispec.ProjectPermissions, permissions) - return "/projects//permissions", ["GET"], _get_permissions + return "/projects//permissions", ["GET"], _get_permissions @staticmethod def _dump_project(project: project_models.Project) -> dict[str, Any]: diff --git a/components/renku_data_services/storage/api.spec.yaml b/components/renku_data_services/storage/api.spec.yaml index 0ff7658a3..b02d3dffd 100644 --- a/components/renku_data_services/storage/api.spec.yaml +++ b/components/renku_data_services/storage/api.spec.yaml @@ -420,10 +420,10 @@ components: exclusive: type: boolean description: if true, only values from 'examples' can be used - datatype: + type: type: string description: data type of option value. RClone has more options but they map to the ones listed here. - enum: ["int", "bool", "string", "Time"] + enum: ["int", "bool", "string", "Time", "Duration", "MultiEncoder", "SizeSuffix", "SpaceSepList", "CommaSepList", "Tristate"] Ulid: description: ULID identifier type: string diff --git a/components/renku_data_services/storage/apispec.py b/components/renku_data_services/storage/apispec.py index 0580dda06..7b468e287 100644 --- a/components/renku_data_services/storage/apispec.py +++ b/components/renku_data_services/storage/apispec.py @@ -1,6 +1,6 @@ # generated by datamodel-codegen: # filename: api.spec.yaml -# timestamp: 2024-10-18T11:06:20+00:00 +# timestamp: 2024-10-28T17:26:56+00:00 from __future__ import annotations @@ -28,11 +28,17 @@ class Example(BaseAPISpec): ) -class Datatype(Enum): +class Type(Enum): int = "int" bool = "bool" string = "string" Time = "Time" + Duration = "Duration" + MultiEncoder = "MultiEncoder" + SizeSuffix = "SizeSuffix" + SpaceSepList = "SpaceSepList" + CommaSepList = "CommaSepList" + Tristate = "Tristate" class RCloneOption(BaseAPISpec): @@ -70,7 +76,7 @@ class RCloneOption(BaseAPISpec): exclusive: Optional[bool] = Field( None, description="if true, only values from 'examples' can be used" ) - datatype: Optional[Datatype] = Field( + type: Optional[Type] = Field( None, description="data type of option value. RClone has more options but they map to the ones listed here.", ) diff --git a/test/bases/renku_data_services/data_api/test_notebooks.py b/test/bases/renku_data_services/data_api/test_notebooks.py index 34c08b95c..9ad9768b3 100644 --- a/test/bases/renku_data_services/data_api/test_notebooks.py +++ b/test/bases/renku_data_services/data_api/test_notebooks.py @@ -3,11 +3,13 @@ import asyncio import os from collections.abc import AsyncIterator +from contextlib import suppress from unittest.mock import MagicMock from uuid import uuid4 import pytest import pytest_asyncio +from kr8s import NotFoundError from kr8s.asyncio.objects import Pod from sanic_testing.testing import SanicASGITestClient @@ -77,7 +79,10 @@ async def jupyter_server(renku_image: str, server_name: str, pod_name: str) -> A await pod.refresh() await pod.wait("condition=Ready") yield server - await server.delete("Foreground") + # NOTE: This is used also in tests that check if the server was properly stopped + # in this case the server will already gone when we try to delete it in the cleanup here. + with suppress(NotFoundError): + await server.delete("Foreground") @pytest_asyncio.fixture() diff --git a/test/bases/renku_data_services/data_api/test_projects.py b/test/bases/renku_data_services/data_api/test_projects.py index 32da82357..ac15d6abc 100644 --- a/test/bases/renku_data_services/data_api/test_projects.py +++ b/test/bases/renku_data_services/data_api/test_projects.py @@ -1008,7 +1008,9 @@ async def test_project_slug_case( assert res.json.get("slug") == uppercase_slug etag = res.headers["ETag"] # Get it by the namespace - _, res = await sanic_client.get(f"/api/data/projects/{group['slug']}/{uppercase_slug}", headers=user_headers) + _, res = await sanic_client.get( + f"/api/data/namespaces/{group['slug']}/projects/{uppercase_slug}", headers=user_headers + ) assert res.status_code == 200 assert res.json.get("slug") == uppercase_slug # Patch the project diff --git a/test/bases/renku_data_services/data_api/test_sessions.py b/test/bases/renku_data_services/data_api/test_sessions.py index fdafd0e7a..465631e60 100644 --- a/test/bases/renku_data_services/data_api/test_sessions.py +++ b/test/bases/renku_data_services/data_api/test_sessions.py @@ -519,7 +519,7 @@ async def test_patch_session_launcher_environment( assert res.json["environment"]["container_image"] == "nginx:latest" assert res.json["environment"]["args"] == ["a", "b", "c"] - # Should be able to reset args by patching in None, pathcing a null field should do nothing + # Should be able to reset args by patching in None, patching a null field should do nothing patch_payload = { "environment": {"args": None, "command": None}, }