Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Delete vfolder in bgtask #2739

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/2739.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Delete Virtual-folder in background task to enable tracking progress.
5 changes: 3 additions & 2 deletions src/ai/backend/client/cli/vfolder.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,9 @@ def delete_trash(name):
"""
with Session() as session:
try:
session.VFolder(name).delete_trash()
print_done("Delete completed.")
response = session.VFolder(name).delete_trash()
task_ids = response["task_ids"]
print_info(f"Delete task started. (task_id: {task_ids[0]})")
except Exception as e:
print_error(e)
sys.exit(ExitCode.FAILURE)
Expand Down
5 changes: 3 additions & 2 deletions src/ai/backend/client/func/vfolder.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,9 @@ async def delete_trash(self) -> Mapping[str, Any]:
rqst.set_json({
"id": self.id.hex,
})
async with rqst.fetch():
return {}
async with rqst.fetch() as resp:
data = await resp.json()
return data

@api_function
async def rename(self, new_name):
Expand Down
5 changes: 5 additions & 0 deletions src/ai/backend/common/bgtask.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from redis.asyncio.client import Pipeline

from . import redis_helper
from .defs import BackgroundTaskLogLevel as LogLevel
from .events import (
BgtaskCancelledEvent,
BgtaskDoneEvent,
Expand Down Expand Up @@ -70,6 +71,7 @@ async def update(
self,
increment: Union[int, float] = 0,
message: str | None = None,
log_level: LogLevel = LogLevel.INFO,
) -> None:
self.current_progress += increment
# keep the state as local variables because they might be changed
Expand All @@ -87,6 +89,7 @@ async def _pipe_builder(r: Redis) -> Pipeline:
"total": str(total),
"msg": message or "",
"last_update": str(time.time()),
"log_level": str(log_level),
},
)
await pipe.expire(tracker_key, MAX_BGTASK_ARCHIVE_PERIOD)
Expand All @@ -99,6 +102,7 @@ async def _pipe_builder(r: Redis) -> Pipeline:
message=message,
current_progress=current,
total_progress=total,
log_level=log_level,
),
)

Expand Down Expand Up @@ -157,6 +161,7 @@ async def push_bgtask_events(
case BgtaskUpdatedEvent():
body["current_progress"] = event.current_progress
body["total_progress"] = event.total_progress
body["log_level"] = event.log_level
await resp.send(json.dumps(body), event=event.name, retry=5)
case BgtaskDoneEvent():
if extra_data:
Expand Down
8 changes: 8 additions & 0 deletions src/ai/backend/common/defs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import enum
from typing import Final

# Redis database IDs depending on purposes
Expand All @@ -10,3 +11,10 @@


DEFAULT_FILE_IO_TIMEOUT: Final = 10


class BackgroundTaskLogLevel(enum.StrEnum):
INFO = enum.auto()
WARNING = enum.auto()
ERROR = enum.auto()
DEBUG = enum.auto()
4 changes: 4 additions & 0 deletions src/ai/backend/common/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from redis.asyncio import ConnectionPool

from . import msgpack, redis_helper
from .defs import BackgroundTaskLogLevel
from .logging import BraceStyleAdapter
from .types import (
AgentId,
Expand Down Expand Up @@ -559,13 +560,15 @@ class BgtaskUpdatedEvent(AbstractEvent):
current_progress: float = attrs.field()
total_progress: float = attrs.field()
message: Optional[str] = attrs.field(default=None)
log_level: BackgroundTaskLogLevel = attrs.field(default=BackgroundTaskLogLevel.INFO)

def serialize(self) -> tuple:
return (
str(self.task_id),
self.current_progress,
self.total_progress,
self.message,
str(self.log_level),
)

@classmethod
Expand All @@ -575,6 +578,7 @@ def deserialize(cls, value: tuple):
value[1],
value[2],
value[3],
BackgroundTaskLogLevel(value[4]),
)


Expand Down
34 changes: 16 additions & 18 deletions src/ai/backend/manager/api/vfolder.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import aiohttp
import aiohttp_cors
import aiotools
import attrs
import sqlalchemy as sa
import trafaret as t
Expand Down Expand Up @@ -140,6 +139,12 @@ class SuccessResponseModel(BaseResponseModel):
success: bool = Field(default=True)


class BackgroundTaskResponseModel(BaseResponseModel):
task_ids: list[uuid.UUID] = Field(
description="List of background task id.",
)


async def check_vfolder_status(
folder_row: VFolderRow,
status: VFolderStatusSet,
Expand Down Expand Up @@ -2436,14 +2441,13 @@ class DeleteFromTrashRequestModel(BaseModel):

@auth_required
@pydantic_params_api_handler(DeleteFromTrashRequestModel)
async def delete_from_trash_bin(
async def delete_forever(
request: web.Request, params: DeleteFromTrashRequestModel
) -> web.Response:
) -> BackgroundTaskResponseModel:
"""
Delete `delete-pending` vfolders in storage proxy
"""
root_ctx: RootContext = request.app["_root.context"]
app_ctx: PrivateContext = request.app["folders.context"]
folder_id = params.vfolder_id
access_key = request["keypair"]["access_key"]
domain_name = request["user"]["domain_name"]
Expand Down Expand Up @@ -2488,13 +2492,13 @@ async def delete_from_trash_bin(

folder_host = entry["host"]
# fs-level deletion may fail or take longer time
await initiate_vfolder_deletion(
task_ids = await initiate_vfolder_deletion(
root_ctx.db,
[VFolderDeletionInfo(VFolderID.from_row(entry), folder_host)],
root_ctx.storage_manager,
app_ctx.storage_ptask_group,
root_ctx.background_task_manager,
)
return web.Response(status=204)
return BackgroundTaskResponseModel(status=201, task_ids=task_ids)


class PurgeRequestModel(BaseModel):
Expand Down Expand Up @@ -3463,22 +3467,15 @@ async def _delete_vfolder_related_rows() -> None:

@attrs.define(slots=True, auto_attribs=True, init=False)
class PrivateContext:
database_ptask_group: aiotools.PersistentTaskGroup
storage_ptask_group: aiotools.PersistentTaskGroup
pass


async def init(app: web.Application) -> None:
app_ctx: PrivateContext = app["folders.context"]
app_ctx.database_ptask_group = aiotools.PersistentTaskGroup()
app_ctx.storage_ptask_group = aiotools.PersistentTaskGroup(
exception_handler=storage_task_exception_handler
)
pass


async def shutdown(app: web.Application) -> None:
app_ctx: PrivateContext = app["folders.context"]
await app_ctx.database_ptask_group.shutdown()
await app_ctx.storage_ptask_group.shutdown()
pass


def create_app(default_cors_options):
Expand Down Expand Up @@ -3524,7 +3521,8 @@ def create_app(default_cors_options):
cors.add(add_route("POST", r"/{name}/clone", clone))
cors.add(add_route("POST", r"/purge", purge))
cors.add(add_route("POST", r"/restore-from-trash-bin", restore))
cors.add(add_route("POST", r"/delete-from-trash-bin", delete_from_trash_bin))
cors.add(add_route("POST", r"/delete-from-trash-bin", delete_forever))
cors.add(add_route("POST", r"/delete-forever", delete_forever))
cors.add(add_route("GET", r"/invitations/list-sent", list_sent_invitations))
cors.add(add_route("GET", r"/invitations/list_sent", list_sent_invitations)) # legacy underbar
cors.add(add_route("POST", r"/invitations/update/{inv_id}", update_invitation))
Expand Down
Loading
Loading