diff --git a/src/ai/backend/client/cli/vfolder.py b/src/ai/backend/client/cli/vfolder.py index bf0e65d22fe..43f8168aecd 100644 --- a/src/ai/backend/client/cli/vfolder.py +++ b/src/ai/backend/client/cli/vfolder.py @@ -193,8 +193,9 @@ def delete_trash(name): """ with Session() as session: try: - session.VFolder(name).delete_trash() - print_done("Delete completed.") + response = session.VFolder(name).delete_trash() + task_ids = response["task_ids"] + print_info(f"Delete task started. (task_id: {task_ids[0]})") except Exception as e: print_error(e) sys.exit(ExitCode.FAILURE) diff --git a/src/ai/backend/client/func/vfolder.py b/src/ai/backend/client/func/vfolder.py index 2fc79f73829..ce183ca18fb 100644 --- a/src/ai/backend/client/func/vfolder.py +++ b/src/ai/backend/client/func/vfolder.py @@ -292,8 +292,9 @@ async def delete_trash(self) -> Mapping[str, Any]: rqst.set_json({ "id": self.id.hex, }) - async with rqst.fetch(): - return {} + async with rqst.fetch() as resp: + data = await resp.json() + return data @api_function async def rename(self, new_name): diff --git a/src/ai/backend/manager/api/vfolder.py b/src/ai/backend/manager/api/vfolder.py index b1a857d65e7..80e3be76cfe 100644 --- a/src/ai/backend/manager/api/vfolder.py +++ b/src/ai/backend/manager/api/vfolder.py @@ -30,7 +30,6 @@ import aiohttp import aiohttp_cors -import aiotools import attrs import sqlalchemy as sa import trafaret as t @@ -140,6 +139,12 @@ class SuccessResponseModel(BaseResponseModel): success: bool = Field(default=True) +class BackgroundTaskResponseModel(BaseResponseModel): + task_ids: list[uuid.UUID] = Field( + description="List of background task id.", + ) + + async def check_vfolder_status( folder_row: VFolderRow, status: VFolderStatusSet, @@ -2436,14 +2441,13 @@ class DeleteFromTrashRequestModel(BaseModel): @auth_required @pydantic_params_api_handler(DeleteFromTrashRequestModel) -async def delete_from_trash_bin( +async def delete_forever( request: web.Request, params: DeleteFromTrashRequestModel -) -> web.Response: +) -> BackgroundTaskResponseModel: """ Delete `delete-pending` vfolders in storage proxy """ root_ctx: RootContext = request.app["_root.context"] - app_ctx: PrivateContext = request.app["folders.context"] folder_id = params.vfolder_id access_key = request["keypair"]["access_key"] domain_name = request["user"]["domain_name"] @@ -2488,13 +2492,13 @@ async def delete_from_trash_bin( folder_host = entry["host"] # fs-level deletion may fail or take longer time - await initiate_vfolder_deletion( + task_ids = await initiate_vfolder_deletion( root_ctx.db, [VFolderDeletionInfo(VFolderID.from_row(entry), folder_host)], root_ctx.storage_manager, - app_ctx.storage_ptask_group, + root_ctx.background_task_manager, ) - return web.Response(status=204) + return BackgroundTaskResponseModel(status=201, task_ids=task_ids) class PurgeRequestModel(BaseModel): @@ -3463,22 +3467,15 @@ async def _delete_vfolder_related_rows() -> None: @attrs.define(slots=True, auto_attribs=True, init=False) class PrivateContext: - database_ptask_group: aiotools.PersistentTaskGroup - storage_ptask_group: aiotools.PersistentTaskGroup + pass async def init(app: web.Application) -> None: - app_ctx: PrivateContext = app["folders.context"] - app_ctx.database_ptask_group = aiotools.PersistentTaskGroup() - app_ctx.storage_ptask_group = aiotools.PersistentTaskGroup( - exception_handler=storage_task_exception_handler - ) + pass async def shutdown(app: web.Application) -> None: - app_ctx: PrivateContext = app["folders.context"] - await app_ctx.database_ptask_group.shutdown() - await app_ctx.storage_ptask_group.shutdown() + pass def create_app(default_cors_options): @@ -3524,7 +3521,8 @@ def create_app(default_cors_options): cors.add(add_route("POST", r"/{name}/clone", clone)) cors.add(add_route("POST", r"/purge", purge)) cors.add(add_route("POST", r"/restore-from-trash-bin", restore)) - cors.add(add_route("POST", r"/delete-from-trash-bin", delete_from_trash_bin)) + cors.add(add_route("POST", r"/delete-from-trash-bin", delete_forever)) + cors.add(add_route("POST", r"/delete-forever", delete_forever)) cors.add(add_route("GET", r"/invitations/list-sent", list_sent_invitations)) cors.add(add_route("GET", r"/invitations/list_sent", list_sent_invitations)) # legacy underbar cors.add(add_route("POST", r"/invitations/update/{inv_id}", update_invitation)) diff --git a/src/ai/backend/manager/models/vfolder.py b/src/ai/backend/manager/models/vfolder.py index ebd0f4ae69a..551cb21e8aa 100644 --- a/src/ai/backend/manager/models/vfolder.py +++ b/src/ai/backend/manager/models/vfolder.py @@ -1,10 +1,15 @@ from __future__ import annotations import enum +import itertools import logging import os.path import uuid -from collections.abc import Container, Mapping +from collections.abc import ( + Container, + Mapping, + Sequence, +) from dataclasses import dataclass from datetime import datetime from pathlib import PurePosixPath @@ -15,13 +20,11 @@ List, NamedTuple, Optional, - Sequence, TypeAlias, cast, ) import aiohttp -import aiotools import graphene import sqlalchemy as sa import trafaret as t @@ -36,6 +39,7 @@ from sqlalchemy.orm import load_only, relationship, selectinload from ai.backend.common.bgtask import ProgressReporter +from ai.backend.common.defs import BackgroundTaskLogLevel as LogLevel from ai.backend.common.logging import BraceStyleAdapter from ai.backend.common.types import ( MountPermission, @@ -99,7 +103,7 @@ from .storage import PermissionContext as StorageHostPermissionContext from .storage import PermissionContextBuilder as StorageHostPermissionContextBuilder from .user import UserRole, UserRow -from .utils import ExtendedAsyncSAEngine, execute_with_retry, sql_json_merge +from .utils import ExtendedAsyncSAEngine, execute_with_retry, execute_with_txn_retry, sql_json_merge if TYPE_CHECKING: from ..api.context import BackgroundTaskManager @@ -1012,9 +1016,9 @@ async def update_vfolder_status( now = datetime.now(tzutc()) - if update_status == VFolderOperationStatus.DELETE_PENDING: - select_stmt = sa.select(VFolderRow).where(VFolderRow.id.in_(vfolder_ids)) - async with engine.begin_readonly_session() as db_session: + async def _check_mounted_and_update(db_session: SASession) -> None: + if update_status == VFolderOperationStatus.DELETE_PENDING: + select_stmt = sa.select(VFolderRow).where(VFolderRow.id.in_(vfolder_ids)) for vf_row in await db_session.scalars(select_stmt): vf_row = cast(VFolderRow, vf_row) mount_sessions = await get_sessions_by_mounted_folder( @@ -1026,26 +1030,25 @@ async def update_vfolder_status( f"Cannot delete the vfolder. The vfolder(id: {vf_row.id}) is mounted on sessions(ids: {session_ids})" ) - async def _update() -> None: - async with engine.begin_session() as db_session: - query = ( - sa.update(vfolders) - .values( - status=update_status, - status_changed=now, - status_history=sql_json_merge( - vfolders.c.status_history, - (), - { - update_status.name: now.isoformat(), - }, - ), - ) - .where(cond) + query = ( + sa.update(VFolderRow) + .values( + status=update_status, + status_changed=now, + status_history=sql_json_merge( + VFolderRow.status_history, + (), + { + update_status.name: now.isoformat(), + }, + ), ) - await db_session.execute(query) + .where(cond) + ) + await db_session.execute(query) - await execute_with_retry(_update) + async with engine.connect() as db_conn: + await execute_with_txn_retry(_check_mounted_and_update, engine.begin_session, db_conn) if do_log: log.debug( "Successfully updated status of VFolder(s) {} to {}", @@ -1193,68 +1196,43 @@ async def initiate_vfolder_deletion( db_engine: ExtendedAsyncSAEngine, requested_vfolders: Sequence[VFolderDeletionInfo], storage_manager: StorageSessionManager, - storage_ptask_group: aiotools.PersistentTaskGroup, -) -> int: + background_task_manager: BackgroundTaskManager, +) -> list[uuid.UUID]: """Purges VFolder content from storage host.""" + task_ids: list[uuid.UUID] = [] + vfolder_info_len = len(requested_vfolders) vfolder_ids = tuple(vf_id.folder_id for vf_id, _ in requested_vfolders) vfolders.c.id.in_(vfolder_ids) if vfolder_info_len == 0: - return 0 + return task_ids elif vfolder_info_len == 1: vfolders.c.id == vfolder_ids[0] await update_vfolder_status( db_engine, vfolder_ids, VFolderOperationStatus.DELETE_ONGOING, do_log=False ) - row_deletion_infos: list[VFolderDeletionInfo] = [] - failed_deletion: list[tuple[VFolderDeletionInfo, str]] = [] - - async def _delete(): - for vfolder_info in requested_vfolders: - folder_id, host_name = vfolder_info - proxy_name, volume_name = storage_manager.split_host(host_name) - try: - async with storage_manager.request( - proxy_name, - "POST", - "folder/delete", - json={ - "volume": volume_name, - "vfid": str(folder_id), - }, - ) as (_, resp): - pass - except (VFolderOperationFailed, InvalidAPIParameters) as e: - if e.status == 404: - row_deletion_infos.append(vfolder_info) - else: - failed_deletion.append((vfolder_info, repr(e))) - except Exception as e: - failed_deletion.append((vfolder_info, repr(e))) - else: - row_deletion_infos.append(vfolder_info) - if row_deletion_infos: - vfolder_ids = tuple(vf_id.folder_id for vf_id, _ in row_deletion_infos) - - await update_vfolder_status( - db_engine, vfolder_ids, VFolderOperationStatus.DELETE_COMPLETE, do_log=False - ) - log.debug("Successfully removed vfolders {}", [str(x) for x in vfolder_ids]) - if failed_deletion: - await update_vfolder_status( - db_engine, - [vfid.vfolder_id.folder_id for vfid, _ in failed_deletion], - VFolderOperationStatus.DELETE_ERROR, - do_log=False, + def _keyfunc(item: VFolderDeletionInfo) -> str: + proxy_name, _ = storage_manager.split_host(item.host) + return proxy_name + + for _, vfolder_iterator in itertools.groupby( + sorted(requested_vfolders, key=_keyfunc), key=_keyfunc + ): + infos = [*vfolder_iterator] + + async def _deletion_task(reporter: ProgressReporter): + await delete_vfolders( + infos, + storage_manager=storage_manager, + db=db_engine, + reporter=reporter, ) - extra_data = {str(vfid.vfolder_id): err_msg for vfid, err_msg in failed_deletion} - raise VFolderOperationFailed(extra_data=extra_data) - storage_ptask_group.create_task(_delete(), name="delete_vfolders") - log.debug("Started purging vfolders {}", [str(x) for x in vfolder_ids]) + task_ids.append(await background_task_manager.start(_deletion_task)) + log.debug("Started deleting vfolders {}", [str(x) for x in vfolder_ids]) - return vfolder_info_len + return task_ids async def ensure_quota_scope_accessible_by_user( @@ -1324,6 +1302,75 @@ async def get_sessions_by_mounted_folder( return tuple([session.id for session in session_rows]) +async def delete_vfolders( + requested_vfolders: Sequence[VFolderDeletionInfo], + *, + storage_manager: StorageSessionManager, + db: ExtendedAsyncSAEngine, + reporter: ProgressReporter | None = None, +) -> None: + """ + Request multiple vfolder deletion one by one. + """ + folders_to_be_deleted: list[VFolderDeletionInfo] = [] + folders_failed_to_delete: list[tuple[VFolderDeletionInfo, str]] = [] + for vfolder_info in requested_vfolders: + folder_id, host_name = vfolder_info + proxy_name, volume_name = storage_manager.split_host(host_name) + log_level = LogLevel.INFO + try: + async with storage_manager.request( + proxy_name, + "POST", + "folder/delete", + json={ + "volume": volume_name, + "vfid": str(folder_id), + }, + ) as (_, resp): + pass + except (VFolderOperationFailed, InvalidAPIParameters) as e: + if e.status == 404: + folders_to_be_deleted.append(vfolder_info) + progress_msg = f"Folder not found in storage, transit status to `DELETE_COMPLETE` (id:{folder_id})" + else: + err_str = repr(e) + folders_failed_to_delete.append((vfolder_info, err_str)) + progress_msg = f"Delete failed (id:{folder_id}, storage response status:{e.status}, e:{err_str})" + log_level = LogLevel.ERROR + except Exception as e: + err_str = repr(e) + folders_failed_to_delete.append((vfolder_info, err_str)) + progress_msg = ( + f"Delete failed due to unknown error (id:{folder_id}, storage, e:{err_str})" + ) + log_level = LogLevel.ERROR + else: + folders_to_be_deleted.append(vfolder_info) + progress_msg = f"Deleted successfully (id: {folder_id})" + if reporter is not None: + await reporter.update(1, message=progress_msg, log_level=log_level) + + if folders_to_be_deleted: + _vf_ids = [vf_id.folder_id for vf_id, _ in folders_to_be_deleted] + log.debug("Successfully deleted vfolders {}", _vf_ids) + await update_vfolder_status( + db, + _vf_ids, + VFolderOperationStatus.DELETE_COMPLETE, + do_log=False, + ) + if folders_failed_to_delete: + _vf_ids = [vf_id.vfolder_id.folder_id for vf_id, _ in folders_failed_to_delete] + log.warning("Failed to delete vfolders {}", _vf_ids) + await update_vfolder_status( + db, + _vf_ids, + VFolderOperationStatus.DELETE_ERROR, + do_log=False, + ) + + class VirtualFolder(graphene.ObjectType): class Meta: interfaces = (Item,) diff --git a/src/ai/backend/storage/api/manager.py b/src/ai/backend/storage/api/manager.py index b0fd4e3b3e5..c1e8c642dd2 100644 --- a/src/ai/backend/storage/api/manager.py +++ b/src/ai/backend/storage/api/manager.py @@ -401,7 +401,7 @@ class Params(TypedDict): ), content_type="application/json", ) - return web.Response(status=204) + return web.Response(status=204) async def clone_vfolder(request: web.Request) -> web.Response: