Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve reliability & usability of VFolder removal API #1884

Closed
wants to merge 85 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
0dd2c65
feature: improve VFolder purge using background task
fregataa Feb 4, 2024
7d9667c
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Feb 5, 2024
cb3a525
set reporter nullable and set total_progress
fregataa Feb 5, 2024
ee7e0aa
pass total_progress arg to bgtask mgr
fregataa Feb 5, 2024
219791a
add log_type field to ProgressReporter
fregataa Feb 5, 2024
1d47b57
fix UUID json decode error and refactor log type
fregataa Feb 6, 2024
f4b67a9
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Feb 19, 2024
aa61704
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Feb 23, 2024
3c5700c
apply filelock to remove tree
fregataa Feb 23, 2024
8eecfde
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Feb 23, 2024
1a7da50
add news fragment
fregataa Feb 24, 2024
1b82f67
use asyncio.Semaphore for delete_tree instead of FileLock
fregataa Feb 24, 2024
924ac55
update news fragment
fregataa Feb 24, 2024
7ed0378
update test codes
fregataa Feb 24, 2024
d86084b
FileLock after entering delete semaphore
fregataa Feb 24, 2024
0a5bdae
update news fragment
fregataa Feb 24, 2024
34596ac
rename news fragment file
fregataa Feb 24, 2024
9399cf6
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Feb 27, 2024
9cc3016
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Feb 28, 2024
220203f
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Feb 29, 2024
013ceea
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Mar 5, 2024
b07df98
transit delete status correctly and replace legacy 'purge' logs
fregataa Mar 5, 2024
1526694
add GraphQL delete API and update purge API
fregataa Mar 5, 2024
f851ca1
add GQL restore API and remove user_email, group_name fields in VFold…
fregataa Mar 6, 2024
d877925
rename vfolder deletion APIs
fregataa Mar 6, 2024
86f6494
refactor usage of VFolder row
fregataa Mar 6, 2024
5dde934
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Mar 6, 2024
c9e4c4f
update API names
fregataa Mar 7, 2024
c17abc8
fix: send delete-forever request in client
fregataa Mar 7, 2024
39cb42b
update test codes
fregataa Mar 7, 2024
0a976bf
add cli cmd aliases for backward compat
fregataa Mar 7, 2024
79f28c7
follow-up BaseResponseModel
fregataa Mar 7, 2024
ca12909
'recover' is alias of 'restore'
fregataa Mar 7, 2024
07721d8
test: update CLI integration test codes
fregataa Mar 7, 2024
2a23e13
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Mar 10, 2024
d0ff442
update news fragment
fregataa Mar 10, 2024
ba99172
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Apr 1, 2024
6e5e158
chore: update GraphQL schema dump
fregataa Apr 1, 2024
5bbb531
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Apr 1, 2024
3944cc5
Merge remote-tracking branch 'origin/feature/improve-vf-purge-using-b…
fregataa Apr 1, 2024
e2774ce
add gql version
fregataa Apr 2, 2024
c114ae4
chore: update GraphQL schema dump
fregataa Apr 2, 2024
585425c
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Apr 5, 2024
d130b6d
Merge remote-tracking branch 'origin/feature/improve-vf-purge-using-b…
fregataa Apr 5, 2024
68fe62b
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Apr 6, 2024
c9c69fb
update gql schema versions
fregataa Apr 6, 2024
8811c47
chore: update GraphQL schema dump
fregataa Apr 6, 2024
04f4879
add version notations to gql types
fregataa Apr 6, 2024
0143856
Merge remote-tracking branch 'origin/feature/improve-vf-purge-using-b…
fregataa Apr 6, 2024
802e613
chore: update GraphQL schema dump
fregataa Apr 6, 2024
08d985b
add version notation to VFolderNode
fregataa Apr 6, 2024
ed9c897
chore: update GraphQL schema dump
fregataa Apr 6, 2024
a2b3405
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Apr 8, 2024
f63358e
apply formatting
fregataa Apr 8, 2024
37e176b
Merge remote-tracking branch 'origin/feature/improve-vf-purge-using-b…
fregataa Apr 8, 2024
787c501
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Apr 16, 2024
0eed86d
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Apr 16, 2024
79acf10
revert GQL mutations
fregataa Apr 16, 2024
da36224
chore: update GraphQL schema dump
fregataa Apr 16, 2024
25d9a10
better log messages
fregataa Apr 16, 2024
ed150c3
revert 'delete-trash' cli command and api function for backward compat
fregataa Apr 16, 2024
627a251
rename BackgroundTaskLogType to BackgroundTaskLogLevel
fregataa Apr 16, 2024
03f92b8
better var names
fregataa Apr 16, 2024
653ce28
update delete-concurreny config name and add to sample.toml
fregataa Apr 16, 2024
60c4d88
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Apr 16, 2024
a9de3a7
Merge remote-tracking branch 'origin/feature/improve-vf-purge-using-b…
fregataa Apr 16, 2024
9464071
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Apr 17, 2024
080afd3
let 'DEPRECATED' message afront in cmd description
fregataa Apr 17, 2024
df389ac
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Apr 29, 2024
8e3006e
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa May 1, 2024
9917f78
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa May 8, 2024
99cf192
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa May 16, 2024
60c33dd
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Jun 7, 2024
5313e37
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Jun 13, 2024
693fe5e
new Exception to catch Lock timeout
fregataa Jun 13, 2024
f8855d4
update news fragment
fregataa Jun 13, 2024
a027a09
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Jun 20, 2024
67bb3f2
little update about logging
fregataa Jun 20, 2024
493c084
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Jul 4, 2024
8fbaee4
remove nested task
fregataa Jul 4, 2024
e816580
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
achimnol Jul 6, 2024
8799c3c
Merge branch 'main' into feature/improve-vf-purge-using-bgtask
fregataa Jul 6, 2024
92b85ed
create deletion task grouped by storage proxy name
fregataa Jul 6, 2024
2e96bdf
update description
fregataa Jul 6, 2024
5d54cf2
Merge remote-tracking branch 'origin/feature/improve-vf-purge-using-b…
fregataa Jul 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/1884.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
* Enable to track VFolder deletion tasks.
* Prevent concurrent tree-deletion using semaphore & file-lock.
4 changes: 4 additions & 0 deletions configs/storage-proxy/sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ event-loop = "uvloop"
# Settings it zero means no limit.
scandir-limit = 1000

# The maximum number of directory(tree) delete concurrency that can be deleted from one storage-proxy process.
# Should be larger than 1.
directory-delete-concurrency = 20

# The maximum allowed size of a single upload session.
max-upload-size = "100g"

Expand Down
25 changes: 13 additions & 12 deletions src/ai/backend/client/cli/vfolder.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ def create(name, host, group, host_path, usage_mode, permission, cloneable):
sys.exit(ExitCode.FAILURE)


@vfolder.command()
@vfolder.command(aliases=["trash", "move-to-trash"])
@click.argument("name", type=str)
def delete(name):
"""Delete the given virtual folder. The virtual folder will be under `delete-pending` status, which means trash-bin.
"""Move the given virtual folder to trash-bin. The virtual folder will be under `delete-pending` status.
This operation can be retracted by
calling `restore()`.

Expand All @@ -160,7 +160,7 @@ def delete(name):
"""
with Session() as session:
try:
session.VFolder(name).delete()
session.VFolder(name).move_to_trash()
print_done("Deleted.")
except Exception as e:
print_error(e)
Expand All @@ -186,40 +186,41 @@ def purge(name):
@vfolder.command()
@click.argument("name", type=str)
def delete_trash(name):
"""Delete the given virtual folder's real data. The virtual folder should be under `delete-pending` status, which means trash-bin.
"""This command is deprecated, use `delete_forever`. Delete the given virtual folder's real data. The virtual folder should be under `delete-pending` status, which means trash-bin.
This operation is irreversible!

NAME: Name of a virtual folder.
"""
with Session() as session:
try:
session.VFolder(name).delete_trash()
print_done("Delete completed.")
session.VFolder(name).delete_forever()
print_done("Delete task started.")
except Exception as e:
print_error(e)
sys.exit(ExitCode.FAILURE)


@vfolder.command()
@click.argument("name", type=str)
def recover(name):
"""Restore the given virtual folder from deleted status, Deprecated since 24.03.1; use `restore`
def delete_forever(name):
fregataa marked this conversation as resolved.
Show resolved Hide resolved
"""Delete the given virtual folder's real data. The virtual folder should be under `delete-pending` status, which means trash-bin.
This operation is irreversible!

NAME: Name of a virtual folder.
"""
with Session() as session:
try:
session.VFolder(name).restore()
print_done("Restored.")
session.VFolder(name).delete_forever()
print_done("Delete task started.")
except Exception as e:
print_error(e)
sys.exit(ExitCode.FAILURE)


@vfolder.command()
@vfolder.command(aliases=["recover"])
@click.argument("name", type=str)
def restore(name):
"""Restore the given virtual folder from deleted status, from trash bin.
"""Restore the given virtual folder from `delete-pending` status(trash-bin) to `ready`.

NAME: Name of a virtual folder.
"""
Expand Down
37 changes: 30 additions & 7 deletions src/ai/backend/client/func/vfolder.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,25 @@ async def info(self):
async with rqst.fetch() as resp:
return await resp.json()

@api_function
async def delete(self):
rqst = Request("DELETE", "/folders/{0}".format(self.name))
async def _trash(self) -> Mapping[str, Any]:
if self.id is None:
vfolder_id = await self._get_id_by_name()
self.id = vfolder_id
rqst = Request("DELETE", "/folders")
rqst.set_json({
"id": self.id.hex,
})
async with rqst.fetch():
return {}

@api_function
async def delete(self):
return await self._trash()

@api_function
async def move_to_trash(self):
return await self._trash()

@api_function
async def purge(self) -> Mapping[str, Any]:
if self.id is None:
Expand All @@ -268,7 +281,7 @@ async def _restore(self) -> Mapping[str, Any]:
if self.id is None:
vfolder_id = await self._get_id_by_name()
self.id = vfolder_id
rqst = Request("POST", "/folders/restore-from-trash-bin")
rqst = Request("POST", "/folders/restore")
rqst.set_json({
"id": self.id.hex,
})
Expand All @@ -283,18 +296,28 @@ async def recover(self):
async def restore(self):
return await self._restore()

@api_function
async def delete_trash(self) -> Mapping[str, Any]:
async def _delete_forever(self) -> Mapping[str, Any]:
if self.id is None:
vfolder_id = await self._get_id_by_name()
self.id = vfolder_id
rqst = Request("POST", "/folders/delete-from-trash-bin")
rqst = Request("POST", "/folders/delete-forever")
rqst.set_json({
"id": self.id.hex,
})
async with rqst.fetch():
return {}

@api_function
async def delete_trash(self) -> Mapping[str, Any]:
"""
Deprecated, use `delete_forever()`.
"""
return await self._delete_forever()

@api_function
async def delete_forever(self) -> Mapping[str, Any]:
return await self._delete_forever()

@api_function
async def rename(self, new_name):
rqst = Request("POST", "/folders/{0}/rename".format(self.name))
Expand Down
34 changes: 27 additions & 7 deletions src/ai/backend/common/bgtask.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from redis.asyncio.client import Pipeline

from . import redis_helper
from .defs import BackgroundTaskLogLevel as LogLevel
from .events import (
BgtaskCancelledEvent,
BgtaskDoneEvent,
Expand Down Expand Up @@ -70,6 +71,7 @@ async def update(
self,
increment: Union[int, float] = 0,
message: str | None = None,
log_level: LogLevel = LogLevel.INFO,
) -> None:
self.current_progress += increment
# keep the state as local variables because they might be changed
Expand All @@ -86,6 +88,7 @@ async def _pipe_builder(r: Redis) -> Pipeline:
"current": str(current),
"total": str(total),
"msg": message or "",
"log_level": str(log_level),
"last_update": str(time.time()),
},
)
Expand All @@ -99,6 +102,7 @@ async def _pipe_builder(r: Redis) -> Pipeline:
message=message,
current_progress=current,
total_progress=total,
log_level=log_level,
),
)

Expand Down Expand Up @@ -154,10 +158,16 @@ async def push_bgtask_events(
"message": event.message,
}
match event:
case BgtaskUpdatedEvent():
body["current_progress"] = event.current_progress
body["total_progress"] = event.total_progress
await resp.send(json.dumps(body), event=event.name, retry=5)
case BgtaskUpdatedEvent(
name=name,
current_progress=current_progress,
total_progress=total_progress,
log_level=log_level,
):
body["current_progress"] = current_progress
body["total_progress"] = total_progress
body["log_level"] = str(log_level)
await resp.send(json.dumps(body), event=name, retry=5)
case BgtaskDoneEvent():
if extra_data:
body.update(extra_data)
Expand Down Expand Up @@ -232,6 +242,7 @@ async def start(
self,
func: BackgroundTask,
name: str | None = None,
total_progress: int = 0,
**kwargs,
) -> uuid.UUID:
task_id = uuid.uuid4()
Expand All @@ -246,7 +257,7 @@ async def _pipe_builder(r: Redis) -> Pipeline:
mapping={
"status": "started",
"current": "0",
"total": "0",
"total": total_progress,
"msg": "",
"started_at": now,
"last_update": now,
Expand All @@ -257,7 +268,9 @@ async def _pipe_builder(r: Redis) -> Pipeline:

await redis_helper.execute(redis_producer, _pipe_builder)

task = asyncio.create_task(self._wrapper_task(func, task_id, name, **kwargs))
task = asyncio.create_task(
self._wrapper_task(func, task_id, name, total_progress, **kwargs)
)
self.ongoing_tasks.add(task)
return task_id

Expand All @@ -266,24 +279,30 @@ async def _wrapper_task(
func: BackgroundTask,
task_id: uuid.UUID,
task_name: str | None,
task_total_progress: int = 0,
**kwargs,
) -> None:
task_status: TaskStatus = "bgtask_started"
reporter = ProgressReporter(self.event_producer, task_id)
reporter = ProgressReporter(
self.event_producer, task_id, total_progress=task_total_progress
)
message = ""
event_cls: Type[BgtaskDoneEvent] | Type[BgtaskCancelledEvent] | Type[BgtaskFailedEvent] = (
BgtaskDoneEvent
)
log_level = LogLevel.INFO
try:
message = await func(reporter, **kwargs) or ""
task_status = "bgtask_done"
except asyncio.CancelledError:
task_status = "bgtask_cancelled"
event_cls = BgtaskCancelledEvent
log_level = LogLevel.WARNING
except Exception as e:
task_status = "bgtask_failed"
event_cls = BgtaskFailedEvent
message = repr(e)
log_level = LogLevel.ERROR
log.exception("Task {} ({}): unhandled error", task_id, task_name)
finally:
redis_producer = self.event_producer.redis_client
Expand All @@ -297,6 +316,7 @@ async def _pipe_builder(r: Redis):
"status": task_status.removeprefix("bgtask_"),
"msg": message,
"last_update": str(time.time()),
"log_level": str(log_level),
},
)
await pipe.expire(tracker_key, MAX_BGTASK_ARCHIVE_PERIOD)
Expand Down
8 changes: 8 additions & 0 deletions src/ai/backend/common/defs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import StrEnum, auto
from typing import Final

# Redis database IDs depending on purposes
Expand All @@ -10,3 +11,10 @@


DEFAULT_FILE_IO_TIMEOUT: Final = 10


class BackgroundTaskLogLevel(StrEnum):
INFO = auto()
WARNING = auto()
ERROR = auto()
DEBUG = auto()
4 changes: 4 additions & 0 deletions src/ai/backend/common/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from redis.asyncio import ConnectionPool

from . import msgpack, redis_helper
from .defs import BackgroundTaskLogLevel
from .logging import BraceStyleAdapter
from .types import (
AgentId,
Expand Down Expand Up @@ -555,13 +556,15 @@ class BgtaskUpdatedEvent(AbstractEvent):
current_progress: float = attrs.field()
total_progress: float = attrs.field()
message: Optional[str] = attrs.field(default=None)
log_level: BackgroundTaskLogLevel = attrs.field(default=BackgroundTaskLogLevel.INFO)

def serialize(self) -> tuple:
return (
str(self.task_id),
self.current_progress,
self.total_progress,
self.message,
str(self.log_level),
)

@classmethod
Expand All @@ -571,6 +574,7 @@ def deserialize(cls, value: tuple):
value[1],
value[2],
value[3],
BackgroundTaskLogLevel(value[4]),
)


Expand Down
Loading
Loading