diff --git a/changes/2374.fix.md b/changes/2374.fix.md new file mode 100644 index 0000000000..f8915974ac --- /dev/null +++ b/changes/2374.fix.md @@ -0,0 +1 @@ +Fix `purge_user` mutation by preceding the mutation with deletion of records that reference the user. diff --git a/src/ai/backend/client/cli/admin/user.py b/src/ai/backend/client/cli/admin/user.py index 530a2b28b9..7d2e5a5b22 100644 --- a/src/ai/backend/client/cli/admin/user.py +++ b/src/ai/backend/client/cli/admin/user.py @@ -548,5 +548,6 @@ def purge(ctx: CLIContext, email, purge_shared_vfolders): data, extra_info={ "email": email, + "bgtask_id": data.get("bgtask_id"), }, ) diff --git a/src/ai/backend/client/func/user.py b/src/ai/backend/client/func/user.py index ce401df26b..ede0cb219e 100644 --- a/src/ai/backend/client/func/user.py +++ b/src/ai/backend/client/func/user.py @@ -407,7 +407,7 @@ async def purge(cls, email: str, purge_shared_vfolders=False): """\ mutation($email: String!, $input: PurgeUserInput!) { purge_user(email: $email, props: $input) { - ok msg + ok msg bgtask_id } } """ diff --git a/src/ai/backend/manager/api/schema.graphql b/src/ai/backend/manager/api/schema.graphql index d09f07d037..5a7ed38b65 100644 --- a/src/ai/backend/manager/api/schema.graphql +++ b/src/ai/backend/manager/api/schema.graphql @@ -1367,6 +1367,9 @@ This action cannot be undone. type PurgeUser { ok: Boolean msg: String + + """Added in 24.03.7. ID of background task deleting user owned vfolders.""" + bgtask_id: UUID } input PurgeUserInput { diff --git a/src/ai/backend/manager/models/keypair.py b/src/ai/backend/manager/models/keypair.py index 8f41ae9531..df307c0a12 100644 --- a/src/ai/backend/manager/models/keypair.py +++ b/src/ai/backend/manager/models/keypair.py @@ -1,6 +1,7 @@ from __future__ import annotations import base64 +import logging import secrets import uuid from datetime import datetime @@ -15,10 +16,12 @@ from graphene.types.datetime import DateTime as GQLDateTime from sqlalchemy.engine.row import Row from sqlalchemy.ext.asyncio import AsyncConnection as SAConnection +from sqlalchemy.ext.asyncio import AsyncSession as SASession from sqlalchemy.orm import relationship from sqlalchemy.sql.expression import false from ai.backend.common import msgpack, redis_helper +from ai.backend.common.logging import BraceStyleAdapter from ai.backend.common.types import AccessKey, SecretKey if TYPE_CHECKING: @@ -43,6 +46,9 @@ from .user import ModifyUserInput, UserRole from .utils import agg_to_array +log = BraceStyleAdapter(logging.getLogger(__spec__.name)) # type: ignore[name-defined] + + __all__: Sequence[str] = ( "keypairs", "KeyPairRow", @@ -718,3 +724,70 @@ def verify_dotfile_name(dotfile: str) -> bool: if dotfile in RESERVED_DOTFILES: return False return True + + +async def delete_kernels_by_access_key( + db_session: SASession, + access_key: AccessKey, +) -> int: + """ + Delete keypair's all kernels. + + :param conn: DB connection + :param access_key: access key to delete kernels + :return: number of deleted rows + """ + from . import KernelRow + + result = await db_session.execute( + sa.delete(KernelRow).where(KernelRow.access_key == access_key), + ) + if result.rowcount > 0: + log.info("deleted {0} keypair's kernels (ak:{1})", result.rowcount, access_key) + return result.rowcount + + +async def delete_sessions_by_access_key( + db_session: SASession, + access_key: AccessKey, +) -> int: + """ + Delete keypair's all sessions. + + :param db_session: SQLAlchemy session + :param access_key: access key to delete sessions + :return: number of deleted rows + """ + from .session import SessionRow + + result = await db_session.execute( + sa.delete(SessionRow).where(SessionRow.access_key == access_key) + ) + if result.rowcount > 0: + log.info("deleted {0} user's sessions (ak:{1})", result.rowcount, access_key) + return result.rowcount + + +async def access_key_has_active_sessions( + db_session: SASession, + access_key: AccessKey, +) -> bool: + """ + Check if the keypair does not have active sessions. + + :param db_session: SQLAlchemy session + :param access_key: access key + + :return: True if the access key has some active sessions. + """ + from . import AGENT_RESOURCE_OCCUPYING_KERNEL_STATUSES, KernelRow + + active_kernel_count = await db_session.scalar( + sa.select(sa.func.count()) + .select_from(KernelRow) + .where( + (KernelRow.access_key == access_key) + & (KernelRow.status.in_(AGENT_RESOURCE_OCCUPYING_KERNEL_STATUSES)), + ), + ) + return active_kernel_count > 0 diff --git a/src/ai/backend/manager/models/user.py b/src/ai/backend/manager/models/user.py index bd03a99975..77c3aaf094 100644 --- a/src/ai/backend/manager/models/user.py +++ b/src/ai/backend/manager/models/user.py @@ -5,7 +5,6 @@ from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Optional, Sequence, cast from uuid import UUID, uuid4 -import aiotools import bcrypt import graphene import sqlalchemy as sa @@ -23,10 +22,10 @@ from sqlalchemy.types import VARCHAR, TypeDecorator from ai.backend.common import redis_helper +from ai.backend.common.bgtask import ProgressReporter from ai.backend.common.logging import BraceStyleAdapter -from ai.backend.common.types import RedisConnectionInfo, VFolderID +from ai.backend.common.types import RedisConnectionInfo -from ..api.exceptions import VFolderOperationFailed from ..defs import DEFAULT_KEYPAIR_RATE_LIMIT, DEFAULT_KEYPAIR_RESOURCE_POLICY_NAME from .base import ( Base, @@ -48,11 +47,11 @@ from .gql_relay import AsyncNode, Connection, ConnectionResolverResult from .minilang.ordering import OrderSpecItem, QueryOrderParser from .minilang.queryfilter import FieldSpecItem, QueryFilterParser, enum_field_getter -from .storage import StorageSessionManager -from .utils import ExtendedAsyncSAEngine +from .utils import execute_with_txn_retry if TYPE_CHECKING: from .gql import GraphQueryContext + from .vfolder import VFolderRow log = BraceStyleAdapter(logging.getLogger(__spec__.name)) # type: ignore[name-defined] @@ -963,6 +962,10 @@ class Arguments: ok = graphene.Boolean() msg = graphene.String() + bgtask_id = graphene.UUID( + required=False, + description="Added in 24.03.7. ID of background task deleting user owned vfolders.", + ) @classmethod async def mutate( @@ -972,38 +975,121 @@ async def mutate( email: str, props: PurgeUserInput, ) -> PurgeUser: + from .keypair import ( + KeyPairRow, + access_key_has_active_sessions, + delete_kernels_by_access_key, + delete_sessions_by_access_key, + ) + from .vfolder import VFolderDeletionInfo, delete_vfolders + graph_ctx: GraphQueryContext = info.context + delete_query = sa.delete(users).where(users.c.email == email) - async def _pre_func(conn: SAConnection) -> None: - user_uuid = await conn.scalar( - sa.select([users.c.uuid]).select_from(users).where(users.c.email == email), - ) - log.info("Purging all records of the user {0}...", email) + async with graph_ctx.db.connect() as db_conn: + async with graph_ctx.db.begin_session(db_conn) as db_session: + conn = await db_session.connection() + user_uuid = await conn.scalar( + sa.select(users.c.uuid).select_from(users).where(users.c.email == email), + ) + # `keypairs.user_id` is email + # Check `src.ai.backend.manager.models.keypair.CreateKeyPair.prepare_new_keypair()` + keypair_rows = ( + await SASession(conn).scalars( + sa.select(KeyPairRow).where(KeyPairRow.user_id == email) + ) + ).all() + keypair_rows = cast(list[KeyPairRow], keypair_rows) - if await cls.user_vfolder_mounted_to_active_kernels(conn, user_uuid): - raise RuntimeError( - "Some of user's virtual folders are mounted to active kernels. " - "Terminate those kernels first.", + async def _pre_purge(db_session: SASession) -> list[VFolderRow]: + conn = await db_session.connection() + update_status_query = ( + sa.update(users) + .values(status=UserStatus.DELETED, status_info="admin-requested") + .where(users.c.email == email) ) - if await cls.user_has_active_kernels(conn, user_uuid): - raise RuntimeError("User has some active kernels. Terminate them first.") - - if not props.purge_shared_vfolders: - await cls.migrate_shared_vfolders( - conn, - deleted_user_uuid=user_uuid, - target_user_uuid=graph_ctx.user["uuid"], - target_user_email=graph_ctx.user["email"], + await conn.execute(update_status_query) + + log.info("Purging all records of the user {0}...", email) + + if await cls.user_vfolder_mounted_to_active_kernels(conn, user_uuid): + raise RuntimeError( + "Some of user's virtual folders are mounted to active kernels. " + "Terminate those kernels first.", + ) + if await cls.user_has_active_kernels(conn, user_uuid): + raise RuntimeError("User has some active kernels. Terminate them first.") + for row in keypair_rows: + if await access_key_has_active_sessions(db_session, row.access_key): + raise RuntimeError( + f"One of keypairs owned by the user has some active sessions. Terminate them first. (ak:{row.access_key})" + ) + + if not props.purge_shared_vfolders: + await cls.migrate_shared_vfolders( + conn, + deleted_user_uuid=user_uuid, + target_user_uuid=graph_ctx.user["uuid"], + target_user_email=graph_ctx.user["email"], + ) + await cls.delete_error_logs(conn, user_uuid) + await cls.delete_endpoint(conn, user_uuid) + await cls.delete_kernels(conn, user_uuid) + for row in keypair_rows: + await delete_kernels_by_access_key(db_session, row.access_key) + await cls.delete_sessions(conn, user_uuid) + for row in keypair_rows: + await delete_sessions_by_access_key(db_session, row.access_key) + await cls.delete_keypairs(conn, graph_ctx.redis_stat, user_uuid) + + alive_vfolders = await cls.get_vfolders_not_deleted(db_session, user_uuid) + if not alive_vfolders: + await cls.delete_vfolders(conn, user_uuid) + return alive_vfolders + + alive_vfolders = await execute_with_txn_retry( + _pre_purge, graph_ctx.db.begin_session, db_conn + ) + + if alive_vfolders: + # Run bgtask + bgtask_manager = graph_ctx.background_task_manager + + async def _delete_vfolder_content_and_record(reporter: ProgressReporter | None) -> None: + await delete_vfolders( + [VFolderDeletionInfo(row.vfid, row.host) for row in alive_vfolders], + storage_manager=graph_ctx.storage_manager, + db=graph_ctx.db, + reporter=reporter, ) - await cls.delete_error_logs(conn, user_uuid) - await cls.delete_endpoint(conn, user_uuid) - await cls.delete_kernels(conn, user_uuid) - await cls.delete_sessions(conn, user_uuid) - await cls.delete_vfolders(graph_ctx.db, user_uuid, graph_ctx.storage_manager) - await cls.delete_keypairs(conn, graph_ctx.redis_stat, user_uuid) - delete_query = sa.delete(users).where(users.c.email == email) - return await simple_db_mutate(cls, graph_ctx, delete_query, pre_func=_pre_func) + async def _delete_records(db_session: SASession) -> None: + async with graph_ctx.db.begin_session() as db_session: + conn = await db_session.connection() + alive_vfolders = await cls.get_vfolders_not_deleted(db_session, user_uuid) + if not alive_vfolders: + await cls.delete_vfolders(conn, user_uuid) + await db_session.execute(delete_query) + else: + # TODO: Show an explicit error that represents failure of vfolder deletion + # Option 1: Remove the foreign key constraint betweeen users and vfolders and delete the user record even though vfolder deletion fails. + # Option 2: Do not delete the user record. + log.info( + "failed to delete some vfolders. delete them manually and try again." + ) + + async with graph_ctx.db.connect() as db_conn: + await execute_with_txn_retry( + _delete_records, graph_ctx.db.begin_session, db_conn + ) + + task_id = await bgtask_manager.start(_delete_vfolder_content_and_record) + return cls( + True, "purge ongoing. It will finish after all vfolders are deleted.", task_id + ) + + else: + return await simple_db_mutate(cls, graph_ctx, delete_query) @classmethod async def migrate_shared_vfolders( @@ -1094,46 +1180,29 @@ async def migrate_shared_vfolders( @classmethod async def delete_vfolders( cls, - engine: ExtendedAsyncSAEngine, + conn: SAConnection, user_uuid: UUID, - storage_manager: StorageSessionManager, ) -> int: """ - Delete user's all virtual folders as well as their physical data. + Delete DB records of user's all virtual folders. :param conn: DB connection :param user_uuid: user's UUID to delete virtual folders :return: number of deleted rows """ - from . import VFolderDeletionInfo, initiate_vfolder_deletion, vfolder_permissions, vfolders + from . import ( + VFolderRow, + vfolder_permissions, + ) - async with engine.begin_session() as conn: - await conn.execute( - vfolder_permissions.delete().where(vfolder_permissions.c.user == user_uuid), - ) - result = await conn.execute( - sa.select([vfolders.c.id, vfolders.c.host, vfolders.c.quota_scope_id]) - .select_from(vfolders) - .where(vfolders.c.user == user_uuid), - ) - target_vfs = result.fetchall() - - storage_ptask_group = aiotools.PersistentTaskGroup() - try: - await initiate_vfolder_deletion( - engine, - [VFolderDeletionInfo(VFolderID.from_row(vf), vf["host"]) for vf in target_vfs], - storage_manager, - storage_ptask_group, - ) - except VFolderOperationFailed as e: - log.error("error on deleting vfolder filesystem directory: {0}", e.extra_msg) - raise - deleted_count = len(target_vfs) - if deleted_count > 0: - log.info("deleted {0} user's virtual folders ({1})", deleted_count, user_uuid) - return deleted_count + await conn.execute( + vfolder_permissions.delete().where(vfolder_permissions.c.user == user_uuid), + ) + result = await conn.execute(sa.delete(VFolderRow).where(VFolderRow.user == user_uuid)) + if result.rowcount > 0: + log.info("deleted {0} user's vfolders ({1})", result.rowcount, user_uuid) + return result.rowcount @classmethod async def user_vfolder_mounted_to_active_kernels( @@ -1197,6 +1266,27 @@ async def user_has_active_kernels( ) return active_kernel_count > 0 + @classmethod + async def get_vfolders_not_deleted( + cls, + db_session: SASession, + user_uuid: UUID, + ) -> list[VFolderRow]: + """ + :param db_session: DB session + :param user_uuid: user's UUID + + :return: not-deleted vfolders the user owns. + """ + from .vfolder import VFolderOperationStatus, VFolderRow + + vfolder_query = sa.select(VFolderRow).where( + (VFolderRow.user == user_uuid) + & (VFolderRow.status != VFolderOperationStatus.DELETE_COMPLETE) + ) + vfolder_rows = (await db_session.scalars(vfolder_query)).all() + return vfolder_rows + @classmethod async def delete_endpoint( cls, diff --git a/src/ai/backend/manager/models/vfolder.py b/src/ai/backend/manager/models/vfolder.py index d809ce25e3..662fd22b7d 100644 --- a/src/ai/backend/manager/models/vfolder.py +++ b/src/ai/backend/manager/models/vfolder.py @@ -1,12 +1,14 @@ from __future__ import annotations import enum +import itertools import logging import os.path import uuid +from collections.abc import Iterable, Mapping, Sequence from datetime import datetime from pathlib import PurePosixPath -from typing import TYPE_CHECKING, Any, Final, List, Mapping, NamedTuple, Optional, Sequence, cast +from typing import TYPE_CHECKING, Any, Final, List, NamedTuple, Optional, cast import aiohttp import aiotools @@ -1756,6 +1758,98 @@ class Meta: items = graphene.List(VirtualFolder, required=True) +async def _delete_vfolders_one_by_one( + requested_vfolders: Iterable[VFolderDeletionInfo], + *, + storage_manager: StorageSessionManager, + db: ExtendedAsyncSAEngine, + reporter: ProgressReporter | None, +) -> None: + """ + Request to delete multiple vfolders one by one. + """ + folders_to_be_deleted: list[VFolderDeletionInfo] = [] + folders_failed_to_delete: list[tuple[VFolderDeletionInfo, str]] = [] + + for vfolder_info in requested_vfolders: + folder_id, host_name = vfolder_info + proxy_name, volume_name = storage_manager.split_host(host_name) + try: + async with storage_manager.request( + proxy_name, + "POST", + "folder/delete", + json={ + "volume": volume_name, + "vfid": str(folder_id), + }, + ) as (_, resp): + pass + except (VFolderOperationFailed, InvalidAPIParameters) as e: + if e.status == 404: + folders_to_be_deleted.append(vfolder_info) + progress_msg = f"VFolder not found in storage, transit status to `DELETE_COMPLETE` (id: {folder_id})" + else: + err_str = repr(e) + folders_failed_to_delete.append((vfolder_info, err_str)) + progress_msg = f"Delete failed (id: {folder_id}, storage response status: {e.status}, e: {err_str})" + except Exception as e: + err_str = repr(e) + folders_failed_to_delete.append((vfolder_info, err_str)) + progress_msg = f"Delete failed (id: {folder_id}, e: {err_str})" + else: + folders_to_be_deleted.append(vfolder_info) + progress_msg = f"Delete succeeded (id: {folder_id})" + if reporter is not None: + await reporter.update(1, message=progress_msg) + vfolder_ids_to_delete = tuple(vf_id.folder_id for vf_id, _ in folders_to_be_deleted) + log.debug("Successfully deleted vfolders {}", [str(x) for x in vfolder_ids_to_delete]) + + if vfolder_ids_to_delete: + await update_vfolder_status( + db, + vfolder_ids_to_delete, + VFolderOperationStatus.DELETE_COMPLETE, + do_log=False, + ) + if folders_failed_to_delete: + await update_vfolder_status( + db, + [vfid.vfolder_id.folder_id for vfid, _ in folders_failed_to_delete], + VFolderOperationStatus.DELETE_ERROR, + do_log=False, + ) + + +async def delete_vfolders( + requested_vfolders: Iterable[VFolderDeletionInfo], + *, + storage_manager: StorageSessionManager, + db: ExtendedAsyncSAEngine, + reporter: ProgressReporter | None = None, +) -> None: + """ + Spawn vfolder deletion tasks grouped by a name of storage proxies. + """ + + def _keyfunc(item: VFolderDeletionInfo) -> str: + proxy_name, _ = storage_manager.split_host(item.host) + return proxy_name + + async with aiotools.TaskGroup() as tg: + for _, vfolder_iterator in itertools.groupby( + sorted(requested_vfolders, key=_keyfunc), key=_keyfunc + ): + tg.create_task( + _delete_vfolders_one_by_one( + list(vfolder_iterator), + storage_manager=storage_manager, + db=db, + reporter=reporter, + ) + ) + + class VirtualFolderNode(graphene.ObjectType): class Meta: interfaces = (AsyncNode,)