Skip to content

Commit

Permalink
feat: Delete vfolder in bgtask
Browse files Browse the repository at this point in the history
  • Loading branch information
fregataa committed Aug 20, 2024
1 parent eb2eba0 commit 97c8ec4
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 96 deletions.
5 changes: 3 additions & 2 deletions src/ai/backend/client/cli/vfolder.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,9 @@ def delete_trash(name):
"""
with Session() as session:
try:
session.VFolder(name).delete_trash()
print_done("Delete completed.")
response = session.VFolder(name).delete_trash()
task_ids = response["task_ids"]
print_info(f"Delete task started. (task_id: {task_ids[0]})")
except Exception as e:
print_error(e)
sys.exit(ExitCode.FAILURE)
Expand Down
5 changes: 3 additions & 2 deletions src/ai/backend/client/func/vfolder.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,9 @@ async def delete_trash(self) -> Mapping[str, Any]:
rqst.set_json({
"id": self.id.hex,
})
async with rqst.fetch():
return {}
async with rqst.fetch() as resp:
data = await resp.json()
return data

@api_function
async def rename(self, new_name):
Expand Down
34 changes: 16 additions & 18 deletions src/ai/backend/manager/api/vfolder.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import aiohttp
import aiohttp_cors
import aiotools
import attrs
import sqlalchemy as sa
import trafaret as t
Expand Down Expand Up @@ -140,6 +139,12 @@ class SuccessResponseModel(BaseResponseModel):
success: bool = Field(default=True)


class BackgroundTaskResponseModel(BaseResponseModel):
task_ids: list[uuid.UUID] = Field(
description="List of background task id.",
)


async def check_vfolder_status(
folder_row: VFolderRow,
status: VFolderStatusSet,
Expand Down Expand Up @@ -2436,14 +2441,13 @@ class DeleteFromTrashRequestModel(BaseModel):

@auth_required
@pydantic_params_api_handler(DeleteFromTrashRequestModel)
async def delete_from_trash_bin(
async def delete_forever(
request: web.Request, params: DeleteFromTrashRequestModel
) -> web.Response:
) -> BackgroundTaskResponseModel:
"""
Delete `delete-pending` vfolders in storage proxy
"""
root_ctx: RootContext = request.app["_root.context"]
app_ctx: PrivateContext = request.app["folders.context"]
folder_id = params.vfolder_id
access_key = request["keypair"]["access_key"]
domain_name = request["user"]["domain_name"]
Expand Down Expand Up @@ -2488,13 +2492,13 @@ async def delete_from_trash_bin(

folder_host = entry["host"]
# fs-level deletion may fail or take longer time
await initiate_vfolder_deletion(
task_ids = await initiate_vfolder_deletion(
root_ctx.db,
[VFolderDeletionInfo(VFolderID.from_row(entry), folder_host)],
root_ctx.storage_manager,
app_ctx.storage_ptask_group,
root_ctx.background_task_manager,
)
return web.Response(status=204)
return BackgroundTaskResponseModel(status=201, task_ids=task_ids)


class PurgeRequestModel(BaseModel):
Expand Down Expand Up @@ -3463,22 +3467,15 @@ async def _delete_vfolder_related_rows() -> None:

@attrs.define(slots=True, auto_attribs=True, init=False)
class PrivateContext:
database_ptask_group: aiotools.PersistentTaskGroup
storage_ptask_group: aiotools.PersistentTaskGroup
pass


async def init(app: web.Application) -> None:
app_ctx: PrivateContext = app["folders.context"]
app_ctx.database_ptask_group = aiotools.PersistentTaskGroup()
app_ctx.storage_ptask_group = aiotools.PersistentTaskGroup(
exception_handler=storage_task_exception_handler
)
pass


async def shutdown(app: web.Application) -> None:
app_ctx: PrivateContext = app["folders.context"]
await app_ctx.database_ptask_group.shutdown()
await app_ctx.storage_ptask_group.shutdown()
pass


def create_app(default_cors_options):
Expand Down Expand Up @@ -3524,7 +3521,8 @@ def create_app(default_cors_options):
cors.add(add_route("POST", r"/{name}/clone", clone))
cors.add(add_route("POST", r"/purge", purge))
cors.add(add_route("POST", r"/restore-from-trash-bin", restore))
cors.add(add_route("POST", r"/delete-from-trash-bin", delete_from_trash_bin))
cors.add(add_route("POST", r"/delete-from-trash-bin", delete_forever))
cors.add(add_route("POST", r"/delete-forever", delete_forever))
cors.add(add_route("GET", r"/invitations/list-sent", list_sent_invitations))
cors.add(add_route("GET", r"/invitations/list_sent", list_sent_invitations)) # legacy underbar
cors.add(add_route("POST", r"/invitations/update/{inv_id}", update_invitation))
Expand Down
193 changes: 120 additions & 73 deletions src/ai/backend/manager/models/vfolder.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from __future__ import annotations

import enum
import itertools
import logging
import os.path
import uuid
from collections.abc import Container, Mapping
from collections.abc import (
Container,
Mapping,
Sequence,
)
from dataclasses import dataclass
from datetime import datetime
from pathlib import PurePosixPath
Expand All @@ -15,13 +20,11 @@
List,
NamedTuple,
Optional,
Sequence,
TypeAlias,
cast,
)

import aiohttp
import aiotools
import graphene
import sqlalchemy as sa
import trafaret as t
Expand All @@ -36,6 +39,7 @@
from sqlalchemy.orm import load_only, relationship, selectinload

from ai.backend.common.bgtask import ProgressReporter
from ai.backend.common.defs import BackgroundTaskLogLevel as LogLevel
from ai.backend.common.logging import BraceStyleAdapter
from ai.backend.common.types import (
MountPermission,
Expand Down Expand Up @@ -99,7 +103,7 @@
from .storage import PermissionContext as StorageHostPermissionContext
from .storage import PermissionContextBuilder as StorageHostPermissionContextBuilder
from .user import UserRole, UserRow
from .utils import ExtendedAsyncSAEngine, execute_with_retry, sql_json_merge
from .utils import ExtendedAsyncSAEngine, execute_with_retry, execute_with_txn_retry, sql_json_merge

if TYPE_CHECKING:
from ..api.context import BackgroundTaskManager
Expand Down Expand Up @@ -1012,9 +1016,9 @@ async def update_vfolder_status(

now = datetime.now(tzutc())

if update_status == VFolderOperationStatus.DELETE_PENDING:
select_stmt = sa.select(VFolderRow).where(VFolderRow.id.in_(vfolder_ids))
async with engine.begin_readonly_session() as db_session:
async def _check_mounted_and_update(db_session: SASession) -> None:
if update_status == VFolderOperationStatus.DELETE_PENDING:
select_stmt = sa.select(VFolderRow).where(VFolderRow.id.in_(vfolder_ids))
for vf_row in await db_session.scalars(select_stmt):
vf_row = cast(VFolderRow, vf_row)
mount_sessions = await get_sessions_by_mounted_folder(
Expand All @@ -1026,26 +1030,25 @@ async def update_vfolder_status(
f"Cannot delete the vfolder. The vfolder(id: {vf_row.id}) is mounted on sessions(ids: {session_ids})"
)

async def _update() -> None:
async with engine.begin_session() as db_session:
query = (
sa.update(vfolders)
.values(
status=update_status,
status_changed=now,
status_history=sql_json_merge(
vfolders.c.status_history,
(),
{
update_status.name: now.isoformat(),
},
),
)
.where(cond)
query = (
sa.update(VFolderRow)
.values(
status=update_status,
status_changed=now,
status_history=sql_json_merge(
VFolderRow.status_history,
(),
{
update_status.name: now.isoformat(),
},
),
)
await db_session.execute(query)
.where(cond)
)
await db_session.execute(query)

await execute_with_retry(_update)
async with engine.connect() as db_conn:
await execute_with_txn_retry(_check_mounted_and_update, engine.begin_session, db_conn)
if do_log:
log.debug(
"Successfully updated status of VFolder(s) {} to {}",
Expand Down Expand Up @@ -1193,68 +1196,43 @@ async def initiate_vfolder_deletion(
db_engine: ExtendedAsyncSAEngine,
requested_vfolders: Sequence[VFolderDeletionInfo],
storage_manager: StorageSessionManager,
storage_ptask_group: aiotools.PersistentTaskGroup,
) -> int:
background_task_manager: BackgroundTaskManager,
) -> list[uuid.UUID]:
"""Purges VFolder content from storage host."""
task_ids: list[uuid.UUID] = []

vfolder_info_len = len(requested_vfolders)
vfolder_ids = tuple(vf_id.folder_id for vf_id, _ in requested_vfolders)
vfolders.c.id.in_(vfolder_ids)
if vfolder_info_len == 0:
return 0
return task_ids
elif vfolder_info_len == 1:
vfolders.c.id == vfolder_ids[0]
await update_vfolder_status(
db_engine, vfolder_ids, VFolderOperationStatus.DELETE_ONGOING, do_log=False
)

row_deletion_infos: list[VFolderDeletionInfo] = []
failed_deletion: list[tuple[VFolderDeletionInfo, str]] = []

async def _delete():
for vfolder_info in requested_vfolders:
folder_id, host_name = vfolder_info
proxy_name, volume_name = storage_manager.split_host(host_name)
try:
async with storage_manager.request(
proxy_name,
"POST",
"folder/delete",
json={
"volume": volume_name,
"vfid": str(folder_id),
},
) as (_, resp):
pass
except (VFolderOperationFailed, InvalidAPIParameters) as e:
if e.status == 404:
row_deletion_infos.append(vfolder_info)
else:
failed_deletion.append((vfolder_info, repr(e)))
except Exception as e:
failed_deletion.append((vfolder_info, repr(e)))
else:
row_deletion_infos.append(vfolder_info)
if row_deletion_infos:
vfolder_ids = tuple(vf_id.folder_id for vf_id, _ in row_deletion_infos)

await update_vfolder_status(
db_engine, vfolder_ids, VFolderOperationStatus.DELETE_COMPLETE, do_log=False
)
log.debug("Successfully removed vfolders {}", [str(x) for x in vfolder_ids])
if failed_deletion:
await update_vfolder_status(
db_engine,
[vfid.vfolder_id.folder_id for vfid, _ in failed_deletion],
VFolderOperationStatus.DELETE_ERROR,
do_log=False,
def _keyfunc(item: VFolderDeletionInfo) -> str:
proxy_name, _ = storage_manager.split_host(item.host)
return proxy_name

for _, vfolder_iterator in itertools.groupby(
sorted(requested_vfolders, key=_keyfunc), key=_keyfunc
):
infos = [*vfolder_iterator]

async def _deletion_task(reporter: ProgressReporter):
await delete_vfolders(
infos,
storage_manager=storage_manager,
db=db_engine,
reporter=reporter,
)
extra_data = {str(vfid.vfolder_id): err_msg for vfid, err_msg in failed_deletion}
raise VFolderOperationFailed(extra_data=extra_data)

storage_ptask_group.create_task(_delete(), name="delete_vfolders")
log.debug("Started purging vfolders {}", [str(x) for x in vfolder_ids])
task_ids.append(await background_task_manager.start(_deletion_task))
log.debug("Started deleting vfolders {}", [str(x) for x in vfolder_ids])

return vfolder_info_len
return task_ids


async def ensure_quota_scope_accessible_by_user(
Expand Down Expand Up @@ -1324,6 +1302,75 @@ async def get_sessions_by_mounted_folder(
return tuple([session.id for session in session_rows])


async def delete_vfolders(
requested_vfolders: Sequence[VFolderDeletionInfo],
*,
storage_manager: StorageSessionManager,
db: ExtendedAsyncSAEngine,
reporter: ProgressReporter | None = None,
) -> None:
"""
Request multiple vfolder deletion one by one.
"""
folders_to_be_deleted: list[VFolderDeletionInfo] = []
folders_failed_to_delete: list[tuple[VFolderDeletionInfo, str]] = []
for vfolder_info in requested_vfolders:
folder_id, host_name = vfolder_info
proxy_name, volume_name = storage_manager.split_host(host_name)
log_level = LogLevel.INFO
try:
async with storage_manager.request(
proxy_name,
"POST",
"folder/delete",
json={
"volume": volume_name,
"vfid": str(folder_id),
},
) as (_, resp):
pass
except (VFolderOperationFailed, InvalidAPIParameters) as e:
if e.status == 404:
folders_to_be_deleted.append(vfolder_info)
progress_msg = f"Folder not found in storage, transit status to `DELETE_COMPLETE` (id:{folder_id})"
else:
err_str = repr(e)
folders_failed_to_delete.append((vfolder_info, err_str))
progress_msg = f"Delete failed (id:{folder_id}, storage response status:{e.status}, e:{err_str})"
log_level = LogLevel.ERROR
except Exception as e:
err_str = repr(e)
folders_failed_to_delete.append((vfolder_info, err_str))
progress_msg = (
f"Delete failed due to unknown error (id:{folder_id}, storage, e:{err_str})"
)
log_level = LogLevel.ERROR
else:
folders_to_be_deleted.append(vfolder_info)
progress_msg = f"Deleted successfully (id: {folder_id})"
if reporter is not None:
await reporter.update(1, message=progress_msg, log_level=log_level)

if folders_to_be_deleted:
_vf_ids = [vf_id.folder_id for vf_id, _ in folders_to_be_deleted]
log.debug("Successfully deleted vfolders {}", _vf_ids)
await update_vfolder_status(
db,
_vf_ids,
VFolderOperationStatus.DELETE_COMPLETE,
do_log=False,
)
if folders_failed_to_delete:
_vf_ids = [vf_id.vfolder_id.folder_id for vf_id, _ in folders_failed_to_delete]
log.warning("Failed to delete vfolders {}", _vf_ids)
await update_vfolder_status(
db,
_vf_ids,
VFolderOperationStatus.DELETE_ERROR,
do_log=False,
)


class VirtualFolder(graphene.ObjectType):
class Meta:
interfaces = (Item,)
Expand Down
2 changes: 1 addition & 1 deletion src/ai/backend/storage/api/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ class Params(TypedDict):
),
content_type="application/json",
)
return web.Response(status=204)
return web.Response(status=204)


async def clone_vfolder(request: web.Request) -> web.Response:
Expand Down

0 comments on commit 97c8ec4

Please sign in to comment.