diff --git a/changes/1884.feature.md b/changes/1884.feature.md new file mode 100644 index 0000000000..f3ab0ad9e9 --- /dev/null +++ b/changes/1884.feature.md @@ -0,0 +1,2 @@ +* Enable to track VFolder deletion tasks. +* Prevent concurrent tree-deletion using semaphore & file-lock. diff --git a/configs/storage-proxy/sample.toml b/configs/storage-proxy/sample.toml index 0646908617..4428574881 100644 --- a/configs/storage-proxy/sample.toml +++ b/configs/storage-proxy/sample.toml @@ -24,6 +24,10 @@ event-loop = "uvloop" # Settings it zero means no limit. scandir-limit = 1000 +# The maximum number of directory(tree) delete concurrency that can be deleted from one storage-proxy process. +# Should be larger than 1. +directory-delete-concurrency = 20 + # The maximum allowed size of a single upload session. max-upload-size = "100g" diff --git a/src/ai/backend/client/cli/vfolder.py b/src/ai/backend/client/cli/vfolder.py index bf0e65d22f..4d8892397c 100644 --- a/src/ai/backend/client/cli/vfolder.py +++ b/src/ai/backend/client/cli/vfolder.py @@ -148,10 +148,10 @@ def create(name, host, group, host_path, usage_mode, permission, cloneable): sys.exit(ExitCode.FAILURE) -@vfolder.command() +@vfolder.command(aliases=["trash", "move-to-trash"]) @click.argument("name", type=str) def delete(name): - """Delete the given virtual folder. The virtual folder will be under `delete-pending` status, which means trash-bin. + """Move the given virtual folder to trash-bin. The virtual folder will be under `delete-pending` status. This operation can be retracted by calling `restore()`. @@ -160,7 +160,7 @@ def delete(name): """ with Session() as session: try: - session.VFolder(name).delete() + session.VFolder(name).move_to_trash() print_done("Deleted.") except Exception as e: print_error(e) @@ -186,15 +186,15 @@ def purge(name): @vfolder.command() @click.argument("name", type=str) def delete_trash(name): - """Delete the given virtual folder's real data. The virtual folder should be under `delete-pending` status, which means trash-bin. + """This command is deprecated, use `delete_forever`. Delete the given virtual folder's real data. The virtual folder should be under `delete-pending` status, which means trash-bin. This operation is irreversible! NAME: Name of a virtual folder. """ with Session() as session: try: - session.VFolder(name).delete_trash() - print_done("Delete completed.") + session.VFolder(name).delete_forever() + print_done("Delete task started.") except Exception as e: print_error(e) sys.exit(ExitCode.FAILURE) @@ -202,24 +202,25 @@ def delete_trash(name): @vfolder.command() @click.argument("name", type=str) -def recover(name): - """Restore the given virtual folder from deleted status, Deprecated since 24.03.1; use `restore` +def delete_forever(name): + """Delete the given virtual folder's real data. The virtual folder should be under `delete-pending` status, which means trash-bin. + This operation is irreversible! NAME: Name of a virtual folder. """ with Session() as session: try: - session.VFolder(name).restore() - print_done("Restored.") + session.VFolder(name).delete_forever() + print_done("Delete task started.") except Exception as e: print_error(e) sys.exit(ExitCode.FAILURE) -@vfolder.command() +@vfolder.command(aliases=["recover"]) @click.argument("name", type=str) def restore(name): - """Restore the given virtual folder from deleted status, from trash bin. + """Restore the given virtual folder from `delete-pending` status(trash-bin) to `ready`. NAME: Name of a virtual folder. """ diff --git a/src/ai/backend/client/func/vfolder.py b/src/ai/backend/client/func/vfolder.py index 2d750c4310..6201b31598 100644 --- a/src/ai/backend/client/func/vfolder.py +++ b/src/ai/backend/client/func/vfolder.py @@ -246,12 +246,25 @@ async def info(self): async with rqst.fetch() as resp: return await resp.json() - @api_function - async def delete(self): - rqst = Request("DELETE", "/folders/{0}".format(self.name)) + async def _trash(self) -> Mapping[str, Any]: + if self.id is None: + vfolder_id = await self._get_id_by_name() + self.id = vfolder_id + rqst = Request("DELETE", "/folders") + rqst.set_json({ + "id": self.id.hex, + }) async with rqst.fetch(): return {} + @api_function + async def delete(self): + return await self._trash() + + @api_function + async def move_to_trash(self): + return await self._trash() + @api_function async def purge(self) -> Mapping[str, Any]: if self.id is None: @@ -268,7 +281,7 @@ async def _restore(self) -> Mapping[str, Any]: if self.id is None: vfolder_id = await self._get_id_by_name() self.id = vfolder_id - rqst = Request("POST", "/folders/restore-from-trash-bin") + rqst = Request("POST", "/folders/restore") rqst.set_json({ "id": self.id.hex, }) @@ -283,18 +296,28 @@ async def recover(self): async def restore(self): return await self._restore() - @api_function - async def delete_trash(self) -> Mapping[str, Any]: + async def _delete_forever(self) -> Mapping[str, Any]: if self.id is None: vfolder_id = await self._get_id_by_name() self.id = vfolder_id - rqst = Request("POST", "/folders/delete-from-trash-bin") + rqst = Request("POST", "/folders/delete-forever") rqst.set_json({ "id": self.id.hex, }) async with rqst.fetch(): return {} + @api_function + async def delete_trash(self) -> Mapping[str, Any]: + """ + Deprecated, use `delete_forever()`. + """ + return await self._delete_forever() + + @api_function + async def delete_forever(self) -> Mapping[str, Any]: + return await self._delete_forever() + @api_function async def rename(self, new_name): rqst = Request("POST", "/folders/{0}/rename".format(self.name)) diff --git a/src/ai/backend/common/bgtask.py b/src/ai/backend/common/bgtask.py index 58aa282206..934046247d 100644 --- a/src/ai/backend/common/bgtask.py +++ b/src/ai/backend/common/bgtask.py @@ -27,6 +27,7 @@ from redis.asyncio.client import Pipeline from . import redis_helper +from .defs import BackgroundTaskLogLevel as LogLevel from .events import ( BgtaskCancelledEvent, BgtaskDoneEvent, @@ -70,6 +71,7 @@ async def update( self, increment: Union[int, float] = 0, message: str | None = None, + log_level: LogLevel = LogLevel.INFO, ) -> None: self.current_progress += increment # keep the state as local variables because they might be changed @@ -86,6 +88,7 @@ async def _pipe_builder(r: Redis) -> Pipeline: "current": str(current), "total": str(total), "msg": message or "", + "log_level": str(log_level), "last_update": str(time.time()), }, ) @@ -99,6 +102,7 @@ async def _pipe_builder(r: Redis) -> Pipeline: message=message, current_progress=current, total_progress=total, + log_level=log_level, ), ) @@ -154,10 +158,16 @@ async def push_bgtask_events( "message": event.message, } match event: - case BgtaskUpdatedEvent(): - body["current_progress"] = event.current_progress - body["total_progress"] = event.total_progress - await resp.send(json.dumps(body), event=event.name, retry=5) + case BgtaskUpdatedEvent( + name=name, + current_progress=current_progress, + total_progress=total_progress, + log_level=log_level, + ): + body["current_progress"] = current_progress + body["total_progress"] = total_progress + body["log_level"] = str(log_level) + await resp.send(json.dumps(body), event=name, retry=5) case BgtaskDoneEvent(): if extra_data: body.update(extra_data) @@ -232,6 +242,7 @@ async def start( self, func: BackgroundTask, name: str | None = None, + total_progress: int = 0, **kwargs, ) -> uuid.UUID: task_id = uuid.uuid4() @@ -246,7 +257,7 @@ async def _pipe_builder(r: Redis) -> Pipeline: mapping={ "status": "started", "current": "0", - "total": "0", + "total": total_progress, "msg": "", "started_at": now, "last_update": now, @@ -257,7 +268,9 @@ async def _pipe_builder(r: Redis) -> Pipeline: await redis_helper.execute(redis_producer, _pipe_builder) - task = asyncio.create_task(self._wrapper_task(func, task_id, name, **kwargs)) + task = asyncio.create_task( + self._wrapper_task(func, task_id, name, total_progress, **kwargs) + ) self.ongoing_tasks.add(task) return task_id @@ -266,24 +279,30 @@ async def _wrapper_task( func: BackgroundTask, task_id: uuid.UUID, task_name: str | None, + task_total_progress: int = 0, **kwargs, ) -> None: task_status: TaskStatus = "bgtask_started" - reporter = ProgressReporter(self.event_producer, task_id) + reporter = ProgressReporter( + self.event_producer, task_id, total_progress=task_total_progress + ) message = "" event_cls: Type[BgtaskDoneEvent] | Type[BgtaskCancelledEvent] | Type[BgtaskFailedEvent] = ( BgtaskDoneEvent ) + log_level = LogLevel.INFO try: message = await func(reporter, **kwargs) or "" task_status = "bgtask_done" except asyncio.CancelledError: task_status = "bgtask_cancelled" event_cls = BgtaskCancelledEvent + log_level = LogLevel.WARNING except Exception as e: task_status = "bgtask_failed" event_cls = BgtaskFailedEvent message = repr(e) + log_level = LogLevel.ERROR log.exception("Task {} ({}): unhandled error", task_id, task_name) finally: redis_producer = self.event_producer.redis_client @@ -297,6 +316,7 @@ async def _pipe_builder(r: Redis): "status": task_status.removeprefix("bgtask_"), "msg": message, "last_update": str(time.time()), + "log_level": str(log_level), }, ) await pipe.expire(tracker_key, MAX_BGTASK_ARCHIVE_PERIOD) diff --git a/src/ai/backend/common/defs.py b/src/ai/backend/common/defs.py index 9dadda4c90..3925af45ac 100644 --- a/src/ai/backend/common/defs.py +++ b/src/ai/backend/common/defs.py @@ -1,3 +1,4 @@ +from enum import StrEnum, auto from typing import Final # Redis database IDs depending on purposes @@ -10,3 +11,10 @@ DEFAULT_FILE_IO_TIMEOUT: Final = 10 + + +class BackgroundTaskLogLevel(StrEnum): + INFO = auto() + WARNING = auto() + ERROR = auto() + DEBUG = auto() diff --git a/src/ai/backend/common/events.py b/src/ai/backend/common/events.py index c6ed9fe053..3ff9e82ef5 100644 --- a/src/ai/backend/common/events.py +++ b/src/ai/backend/common/events.py @@ -34,6 +34,7 @@ from redis.asyncio import ConnectionPool from . import msgpack, redis_helper +from .defs import BackgroundTaskLogLevel from .logging import BraceStyleAdapter from .types import ( AgentId, @@ -555,6 +556,7 @@ class BgtaskUpdatedEvent(AbstractEvent): current_progress: float = attrs.field() total_progress: float = attrs.field() message: Optional[str] = attrs.field(default=None) + log_level: BackgroundTaskLogLevel = attrs.field(default=BackgroundTaskLogLevel.INFO) def serialize(self) -> tuple: return ( @@ -562,6 +564,7 @@ def serialize(self) -> tuple: self.current_progress, self.total_progress, self.message, + str(self.log_level), ) @classmethod @@ -571,6 +574,7 @@ def deserialize(cls, value: tuple): value[1], value[2], value[3], + BackgroundTaskLogLevel(value[4]), ) diff --git a/src/ai/backend/manager/api/vfolder.py b/src/ai/backend/manager/api/vfolder.py index 4a1dc34b76..0eaaa59d11 100644 --- a/src/ai/backend/manager/api/vfolder.py +++ b/src/ai/backend/manager/api/vfolder.py @@ -29,7 +29,6 @@ import aiohttp import aiohttp_cors -import aiotools import attrs import sqlalchemy as sa import trafaret as t @@ -139,6 +138,12 @@ class SuccessResponseModel(BaseResponseModel): success: bool = Field(default=True) +class BackgroundTaskResponseModel(BaseResponseModel): + task_id: uuid.UUID | None = Field( + description="ID of background task. 'null' means there is no background task spawned." + ) + + async def check_vfolder_status( folder_row: VFolderRow, status: VFolderStatusSet, @@ -2415,14 +2420,13 @@ class DeleteFromTrashRequestModel(BaseModel): @auth_required @pydantic_params_api_handler(DeleteFromTrashRequestModel) -async def delete_from_trash_bin( +async def delete_forever( request: web.Request, params: DeleteFromTrashRequestModel -) -> web.Response: +) -> BackgroundTaskResponseModel: """ Delete `delete-pending` vfolders in storage proxy """ root_ctx: RootContext = request.app["_root.context"] - app_ctx: PrivateContext = request.app["folders.context"] folder_id = params.vfolder_id access_key = request["keypair"]["access_key"] domain_name = request["user"]["domain_name"] @@ -2467,13 +2471,13 @@ async def delete_from_trash_bin( folder_host = entry["host"] # fs-level deletion may fail or take longer time - await initiate_vfolder_deletion( + task_id = await initiate_vfolder_deletion( root_ctx.db, [VFolderDeletionInfo(VFolderID.from_row(entry), folder_host)], root_ctx.storage_manager, - app_ctx.storage_ptask_group, + root_ctx.background_task_manager, ) - return web.Response(status=204) + return BackgroundTaskResponseModel(task_id=task_id) class PurgeRequestModel(BaseModel): @@ -3460,22 +3464,15 @@ async def _delete_vfolder_related_rows() -> None: @attrs.define(slots=True, auto_attribs=True, init=False) class PrivateContext: - database_ptask_group: aiotools.PersistentTaskGroup - storage_ptask_group: aiotools.PersistentTaskGroup + pass async def init(app: web.Application) -> None: - app_ctx: PrivateContext = app["folders.context"] - app_ctx.database_ptask_group = aiotools.PersistentTaskGroup() - app_ctx.storage_ptask_group = aiotools.PersistentTaskGroup( - exception_handler=storage_task_exception_handler - ) + pass async def shutdown(app: web.Application) -> None: - app_ctx: PrivateContext = app["folders.context"] - await app_ctx.database_ptask_group.shutdown() - await app_ctx.storage_ptask_group.shutdown() + pass def create_app(default_cors_options): @@ -3519,7 +3516,9 @@ def create_app(default_cors_options): cors.add(add_route("POST", r"/{name}/clone", clone)) cors.add(add_route("POST", r"/purge", purge)) cors.add(add_route("POST", r"/restore-from-trash-bin", restore)) - cors.add(add_route("POST", r"/delete-from-trash-bin", delete_from_trash_bin)) + cors.add(add_route("POST", r"/restore", restore)) + cors.add(add_route("POST", r"/delete-from-trash-bin", delete_forever)) + cors.add(add_route("POST", r"/delete-forever", delete_forever)) cors.add(add_route("GET", r"/invitations/list-sent", list_sent_invitations)) cors.add(add_route("GET", r"/invitations/list_sent", list_sent_invitations)) # legacy underbar cors.add(add_route("POST", r"/invitations/update/{inv_id}", update_invitation)) diff --git a/src/ai/backend/manager/models/group.py b/src/ai/backend/manager/models/group.py index 553007c04c..70a993e8b1 100644 --- a/src/ai/backend/manager/models/group.py +++ b/src/ai/backend/manager/models/group.py @@ -17,7 +17,6 @@ overload, ) -import aiotools import graphene import sqlalchemy as sa import trafaret as t @@ -65,6 +64,8 @@ from .utils import ExtendedAsyncSAEngine, execute_with_retry if TYPE_CHECKING: + from ai.backend.common.bgtask import BackgroundTaskManager + from .gql import GraphQueryContext from .scaling_group import ScalingGroup @@ -665,7 +666,9 @@ async def _pre_func(conn: SAConnection) -> None: raise RuntimeError( "Group has some active session. Terminate them first to proceed removal.", ) - await cls.delete_vfolders(graph_ctx.db, gid, graph_ctx.storage_manager) + await cls.delete_vfolders( + graph_ctx.db, gid, graph_ctx.storage_manager, graph_ctx.background_task_manager + ) await cls.delete_kernels(conn, gid) delete_query = sa.delete(groups).where(groups.c.id == gid) @@ -677,6 +680,7 @@ async def delete_vfolders( engine: ExtendedAsyncSAEngine, group_id: uuid.UUID, storage_manager: StorageSessionManager, + background_task_manager: BackgroundTaskManager, ) -> int: """ Delete group's all virtual folders as well as their physical data. @@ -699,13 +703,12 @@ async def delete_vfolders( delete_query = sa.delete(vfolders).where(vfolders.c.group == group_id) result = await db_conn.execute(delete_query) - storage_ptask_group = aiotools.PersistentTaskGroup() try: await initiate_vfolder_deletion( engine, [VFolderDeletionInfo(VFolderID.from_row(vf), vf["host"]) for vf in target_vfs], storage_manager, - storage_ptask_group, + background_task_manager, ) except VFolderOperationFailed as e: log.error("error on deleting vfolder filesystem directory: {0}", e.extra_msg) diff --git a/src/ai/backend/manager/models/user.py b/src/ai/backend/manager/models/user.py index bd03a99975..f981d43a9e 100644 --- a/src/ai/backend/manager/models/user.py +++ b/src/ai/backend/manager/models/user.py @@ -5,7 +5,6 @@ from typing import TYPE_CHECKING, Any, Dict, Iterable, Mapping, Optional, Sequence, cast from uuid import UUID, uuid4 -import aiotools import bcrypt import graphene import sqlalchemy as sa @@ -52,6 +51,8 @@ from .utils import ExtendedAsyncSAEngine if TYPE_CHECKING: + from ai.backend.common.bgtask import BackgroundTaskManager + from .gql import GraphQueryContext log = BraceStyleAdapter(logging.getLogger(__spec__.name)) # type: ignore[name-defined] @@ -999,7 +1000,12 @@ async def _pre_func(conn: SAConnection) -> None: await cls.delete_endpoint(conn, user_uuid) await cls.delete_kernels(conn, user_uuid) await cls.delete_sessions(conn, user_uuid) - await cls.delete_vfolders(graph_ctx.db, user_uuid, graph_ctx.storage_manager) + await cls.delete_vfolders( + graph_ctx.db, + user_uuid, + graph_ctx.storage_manager, + graph_ctx.background_task_manager, + ) await cls.delete_keypairs(conn, graph_ctx.redis_stat, user_uuid) delete_query = sa.delete(users).where(users.c.email == email) @@ -1097,6 +1103,7 @@ async def delete_vfolders( engine: ExtendedAsyncSAEngine, user_uuid: UUID, storage_manager: StorageSessionManager, + background_task_manager: BackgroundTaskManager, ) -> int: """ Delete user's all virtual folders as well as their physical data. @@ -1119,13 +1126,12 @@ async def delete_vfolders( ) target_vfs = result.fetchall() - storage_ptask_group = aiotools.PersistentTaskGroup() try: await initiate_vfolder_deletion( engine, [VFolderDeletionInfo(VFolderID.from_row(vf), vf["host"]) for vf in target_vfs], storage_manager, - storage_ptask_group, + background_task_manager, ) except VFolderOperationFailed as e: log.error("error on deleting vfolder filesystem directory: {0}", e.extra_msg) diff --git a/src/ai/backend/manager/models/vfolder.py b/src/ai/backend/manager/models/vfolder.py index d809ce25e3..74028ffb57 100644 --- a/src/ai/backend/manager/models/vfolder.py +++ b/src/ai/backend/manager/models/vfolder.py @@ -1,6 +1,7 @@ from __future__ import annotations import enum +import itertools import logging import os.path import uuid @@ -27,6 +28,7 @@ from ai.backend.common.bgtask import ProgressReporter from ai.backend.common.config import model_definition_iv +from ai.backend.common.defs import BackgroundTaskLogLevel as LogLevel from ai.backend.common.logging import BraceStyleAdapter from ai.backend.common.types import ( MountPermission, @@ -1153,68 +1155,29 @@ async def initiate_vfolder_deletion( db_engine: ExtendedAsyncSAEngine, requested_vfolders: Sequence[VFolderDeletionInfo], storage_manager: StorageSessionManager, - storage_ptask_group: aiotools.PersistentTaskGroup, -) -> int: + background_task_manager: BackgroundTaskManager, +) -> uuid.UUID | None: """Purges VFolder content from storage host.""" vfolder_info_len = len(requested_vfolders) vfolder_ids = tuple(vf_id.folder_id for vf_id, _ in requested_vfolders) vfolders.c.id.in_(vfolder_ids) if vfolder_info_len == 0: - return 0 + return None elif vfolder_info_len == 1: vfolders.c.id == vfolder_ids[0] await update_vfolder_status( db_engine, vfolder_ids, VFolderOperationStatus.DELETE_ONGOING, do_log=False ) - row_deletion_infos: list[VFolderDeletionInfo] = [] - failed_deletion: list[tuple[VFolderDeletionInfo, str]] = [] - - async def _delete(): - for vfolder_info in requested_vfolders: - folder_id, host_name = vfolder_info - proxy_name, volume_name = storage_manager.split_host(host_name) - try: - async with storage_manager.request( - proxy_name, - "POST", - "folder/delete", - json={ - "volume": volume_name, - "vfid": str(folder_id), - }, - ) as (_, resp): - pass - except (VFolderOperationFailed, InvalidAPIParameters) as e: - if e.status == 404: - row_deletion_infos.append(vfolder_info) - else: - failed_deletion.append((vfolder_info, repr(e))) - except Exception as e: - failed_deletion.append((vfolder_info, repr(e))) - else: - row_deletion_infos.append(vfolder_info) - if row_deletion_infos: - vfolder_ids = tuple(vf_id.folder_id for vf_id, _ in row_deletion_infos) - - await update_vfolder_status( - db_engine, vfolder_ids, VFolderOperationStatus.DELETE_COMPLETE, do_log=False - ) - log.debug("Successfully removed vfolders {}", [str(x) for x in vfolder_ids]) - if failed_deletion: - await update_vfolder_status( - db_engine, - [vfid.vfolder_id for vfid, _ in failed_deletion], - VFolderOperationStatus.DELETE_ERROR, - do_log=False, - ) - extra_data = {str(vfid.vfolder_id): err_msg for vfid, err_msg in failed_deletion} - raise VFolderOperationFailed(extra_data=extra_data) + async def _delete_task(reporter: ProgressReporter) -> None: + await delete_vfolders( + requested_vfolders, db=db_engine, storage_manager=storage_manager, reporter=reporter + ) - storage_ptask_group.create_task(_delete(), name="delete_vfolders") - log.debug("Started purging vfolders {}", [str(x) for x in vfolder_ids]) + task_id = await background_task_manager.start(_delete_task, total_progress=vfolder_info_len) + log.debug("Started deleting vfolders {}", [str(x) for x in vfolder_ids]) - return vfolder_info_len + return task_id async def ensure_quota_scope_accessible_by_user( @@ -1756,6 +1719,100 @@ class Meta: items = graphene.List(VirtualFolder, required=True) +async def _delete_vfolders( + requested_vfolders: Sequence[VFolderDeletionInfo], + *, + storage_manager: StorageSessionManager, + db: ExtendedAsyncSAEngine, + reporter: ProgressReporter | None, +) -> None: + """ + Request multiple vfolder deletion one by one. + """ + folders_to_be_deleted: list[VFolderDeletionInfo] = [] + folders_failed_to_delete: list[tuple[VFolderDeletionInfo, str]] = [] + for vfolder_info in requested_vfolders: + folder_id, host_name = vfolder_info + proxy_name, volume_name = storage_manager.split_host(host_name) + log_level = LogLevel.INFO + try: + async with storage_manager.request( + proxy_name, + "POST", + "folder/delete", + json={ + "volume": volume_name, + "vfid": str(folder_id), + }, + ) as (_, resp): + pass + except (VFolderOperationFailed, InvalidAPIParameters) as e: + if e.status == 404: + folders_to_be_deleted.append(vfolder_info) + progress_msg = f"VFolder not found in storage, transit status to `DELETE_COMPLETE` (id: {folder_id})" + else: + err_str = repr(e) + folders_failed_to_delete.append((vfolder_info, err_str)) + progress_msg = f"Delete failed (id: {folder_id}, storage response status: {e.status}, e: {err_str})" + log_level = LogLevel.ERROR + except Exception as e: + err_str = repr(e) + folders_failed_to_delete.append((vfolder_info, err_str)) + progress_msg = f"Delete failed (id: {folder_id}, e: {err_str})" + log_level = LogLevel.ERROR + else: + folders_to_be_deleted.append(vfolder_info) + progress_msg = f"Delete succeeded (id: {folder_id})" + if reporter is not None: + await reporter.update(1, message=progress_msg, log_level=log_level) + vfolder_ids_to_delete = tuple(vf_id.folder_id for vf_id, _ in folders_to_be_deleted) + log.debug("Successfully deleted vfolders {}", [str(x) for x in vfolder_ids_to_delete]) + + if folders_to_be_deleted: + await update_vfolder_status( + db, + vfolder_ids_to_delete, + VFolderOperationStatus.DELETE_COMPLETE, + do_log=False, + ) + if folders_failed_to_delete: + await update_vfolder_status( + db, + [vfid.vfolder_id.folder_id for vfid, _ in folders_failed_to_delete], + VFolderOperationStatus.DELETE_ERROR, + do_log=False, + ) + + +async def delete_vfolders( + requested_vfolders: Sequence[VFolderDeletionInfo], + *, + storage_manager: StorageSessionManager, + db: ExtendedAsyncSAEngine, + reporter: ProgressReporter | None = None, +) -> None: + """ + Spawn vfolder deletion tasks grouped by a name of storage proxies. + """ + + def _keyfunc(item: VFolderDeletionInfo) -> str: + proxy_name, _ = storage_manager.split_host(item.host) + return proxy_name + + async with aiotools.TaskGroup() as tg: + for _, vfolder_iterator in itertools.groupby( + sorted(requested_vfolders, key=_keyfunc), key=_keyfunc + ): + tg.create_task( + _delete_vfolders( + list(vfolder_iterator), + storage_manager=storage_manager, + db=db, + reporter=reporter, + ) + ) + + class VirtualFolderNode(graphene.ObjectType): class Meta: interfaces = (AsyncNode,) diff --git a/src/ai/backend/storage/api/manager.py b/src/ai/backend/storage/api/manager.py index 0cbd9dfc5f..197cc5f8cf 100644 --- a/src/ai/backend/storage/api/manager.py +++ b/src/ai/backend/storage/api/manager.py @@ -46,6 +46,7 @@ ExternalError, InvalidQuotaConfig, InvalidSubpathError, + LockTimeout, QuotaScopeAlreadyExists, QuotaScopeNotFoundError, StorageProxyError, @@ -384,7 +385,18 @@ class Params(TypedDict): await log_manager_api_entry(log, "delete_vfolder", params) ctx: RootContext = request.app["ctx"] async with ctx.get_volume(params["volume"]) as volume: - await volume.delete_vfolder(params["vfid"]) + try: + await volume.delete_vfolder(params["vfid"]) + except LockTimeout: + raise web.HTTPBadRequest( + body=json.dumps( + { + "msg": "VFolder is already being deleted", + "vfid": str(params["vfid"]), + }, + ), + content_type="application/json", + ) return web.Response(status=204) diff --git a/src/ai/backend/storage/cephfs/__init__.py b/src/ai/backend/storage/cephfs/__init__.py index b942fd7ec6..60f3458b2f 100644 --- a/src/ai/backend/storage/cephfs/__init__.py +++ b/src/ai/backend/storage/cephfs/__init__.py @@ -129,6 +129,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel: return CephFSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], ) async def get_capabilities(self) -> FrozenSet[str]: diff --git a/src/ai/backend/storage/config.py b/src/ai/backend/storage/config.py index 3c869b466e..a66efc38cc 100644 --- a/src/ai/backend/storage/config.py +++ b/src/ai/backend/storage/config.py @@ -47,6 +47,7 @@ ), t.Key("event-loop", default="asyncio"): t.Enum("asyncio", "uvloop"), t.Key("scandir-limit", default=1000): t.Int[0:], + t.Key("directory-delete-concurrency", default=20): t.Int[1:], t.Key("max-upload-size", default="100g"): tx.BinarySize, t.Key("secret"): t.String, # used to generate JWT tokens t.Key("session-expire"): tx.TimeDuration, diff --git a/src/ai/backend/storage/exception.py b/src/ai/backend/storage/exception.py index 2874b1fb94..e883c78d9b 100644 --- a/src/ai/backend/storage/exception.py +++ b/src/ai/backend/storage/exception.py @@ -1,3 +1,4 @@ +import asyncio import json from typing import Any, Optional @@ -57,6 +58,10 @@ class WatcherClientError(RuntimeError): pass +class LockTimeout(asyncio.TimeoutError): + pass + + class InvalidAPIParameters(web.HTTPBadRequest): def __init__( self, diff --git a/src/ai/backend/storage/gpfs/__init__.py b/src/ai/backend/storage/gpfs/__init__.py index 96ce7e9025..b7bb7678ca 100644 --- a/src/ai/backend/storage/gpfs/__init__.py +++ b/src/ai/backend/storage/gpfs/__init__.py @@ -85,10 +85,11 @@ def __init__( self, mount_path: Path, scandir_limit: int, + delete_concurrency: int, api_client: GPFSAPIClient, fs: str, ) -> None: - super().__init__(mount_path, scandir_limit) + super().__init__(mount_path, scandir_limit, delete_concurrency) self.api_client = api_client self.fs = fs @@ -155,6 +156,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel: return GPFSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], self.api_client, self.fs, ) diff --git a/src/ai/backend/storage/netapp/__init__.py b/src/ai/backend/storage/netapp/__init__.py index 9fe81a8efd..741c4703bd 100644 --- a/src/ai/backend/storage/netapp/__init__.py +++ b/src/ai/backend/storage/netapp/__init__.py @@ -449,6 +449,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel: return BaseFSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], ) async def init(self) -> None: diff --git a/src/ai/backend/storage/purestorage/__init__.py b/src/ai/backend/storage/purestorage/__init__.py index 0e64056b4a..1a13315c57 100644 --- a/src/ai/backend/storage/purestorage/__init__.py +++ b/src/ai/backend/storage/purestorage/__init__.py @@ -162,6 +162,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel: return RapidFileToolsFSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], ) async def init(self) -> None: diff --git a/src/ai/backend/storage/vfs/__init__.py b/src/ai/backend/storage/vfs/__init__.py index 12bd614c7a..09315c326c 100644 --- a/src/ai/backend/storage/vfs/__init__.py +++ b/src/ai/backend/storage/vfs/__init__.py @@ -16,6 +16,7 @@ import janus import trafaret as t +from ai.backend.common.lock import FileLock from ai.backend.common.logging import BraceStyleAdapter from ai.backend.common.types import BinarySize, HardwareMetadata, QuotaScopeID @@ -24,6 +25,7 @@ ExecutionError, InvalidAPIParameters, InvalidQuotaScopeError, + LockTimeout, NotEmptyError, QuotaScopeNotFoundError, ) @@ -189,9 +191,10 @@ async def delete_quota_scope( class BaseFSOpModel(AbstractFSOpModel): - def __init__(self, mount_path: Path, scandir_limit: int) -> None: + def __init__(self, mount_path: Path, scandir_limit: int, delete_concurrency: int = 20) -> None: self.mount_path = mount_path self.scandir_limit = scandir_limit + self.delete_sema = asyncio.Semaphore(delete_concurrency) async def copy_tree( self, @@ -224,11 +227,17 @@ async def delete_tree( self, path: Path, ) -> None: - loop = asyncio.get_running_loop() - try: - await loop.run_in_executor(None, functools.partial(shutil.rmtree, path)) - except FileNotFoundError: - pass + async with self.delete_sema: + lock_path = path.parent / f"{path.name}.lock" + try: + async with FileLock(lock_path, remove_when_unlock=True): + loop = asyncio.get_running_loop() + try: + await loop.run_in_executor(None, functools.partial(shutil.rmtree, path)) + except FileNotFoundError: + pass + except asyncio.TimeoutError: + raise LockTimeout def scan_tree( self, @@ -371,6 +380,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel: return BaseFSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], ) async def get_capabilities(self) -> FrozenSet[str]: diff --git a/src/ai/backend/test/cli_integration/user/test_vfolder.py b/src/ai/backend/test/cli_integration/user/test_vfolder.py index bad3cc4cee..d05cb39a22 100644 --- a/src/ai/backend/test/cli_integration/user/test_vfolder.py +++ b/src/ai/backend/test/cli_integration/user/test_vfolder.py @@ -222,6 +222,27 @@ def test_delete_vfolder(run_user: ClientRunnerFunc): assert "Deleted" in p.before.decode(), "Test folder 3 not deleted successfully." +@pytest.mark.dependency(depends=["test_delete_vfolder"]) +def test_delete_forever_vfolder(run_user: ClientRunnerFunc): + """ + Test delete-forever vfolder function. + !! Make sure you execute this test after test_delete_vfolder !! + Otherwise, it will raise an error. + """ + print("[ Delete-forever vfolder ]") + with closing(run_user(["vfolder", "delete-forever", "test_folder2"])) as p: + p.expect(EOF) + assert ( + "Delete task started" in p.before.decode() + ), "Test folder 2 not deleted-completely successfully." + + with closing(run_user(["vfolder", "delete-forever", "test_folder3"])) as p: + p.expect(EOF) + assert ( + "Delete task started" in p.before.decode() + ), "Test folder 3 not deleted-completely successfully." + + def test_delete_vfolder_the_same_vfolder_name( run_user: ClientRunnerFunc, run_user2: ClientRunnerFunc ): @@ -247,7 +268,7 @@ def test_delete_vfolder_the_same_vfolder_name( p.expect(EOF) assert ( "Deleted" in p.before.decode() - ), "Test folder created by user2 not deleted successfully." + ), "Test folder created by user not deleted successfully." def test_list_vfolder(run_user: ClientRunnerFunc): diff --git a/tests/client/test_vfolder.py b/tests/client/test_vfolder.py index 5324bd8f0f..1230af467c 100644 --- a/tests/client/test_vfolder.py +++ b/tests/client/test_vfolder.py @@ -1,3 +1,4 @@ +import uuid from unittest import mock import pytest @@ -74,7 +75,13 @@ def test_list_vfolders(): def test_delete_vfolder(): with Session() as session, aioresponses() as m: vfolder_name = "fake-vfolder-name" - m.delete(build_url(session.config, "/folders/{}".format(vfolder_name)), status=204) + vfolder_id = uuid.uuid4() + m.get( + build_url(session.config, "/folders/_/id"), + status=200, + payload={"id": str(vfolder_id)}, + ) + m.delete(build_url(session.config, "/folders"), status=204) resp = session.VFolder(vfolder_name).delete() assert resp == {} diff --git a/tests/storage-proxy/conftest.py b/tests/storage-proxy/conftest.py index a8acae6f52..7e9035775d 100644 --- a/tests/storage-proxy/conftest.py +++ b/tests/storage-proxy/conftest.py @@ -118,6 +118,7 @@ async def volume( { "storage-proxy": { "scandir-limit": 1000, + "directory-delete-concurrency": 20, }, }, volume_path,