Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Apply semaphore and file lock on folder deletion #2737

Draft
wants to merge 2 commits into
base: topic/08-20-feat_add_log_level_to_background_task_message
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/2737.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Apply semaphore and file-lock on folder deletion.
14 changes: 13 additions & 1 deletion src/ai/backend/storage/api/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
ExternalError,
InvalidQuotaConfig,
InvalidSubpathError,
LockTimeout,
QuotaScopeAlreadyExists,
QuotaScopeNotFoundError,
StorageProxyError,
Expand Down Expand Up @@ -388,7 +389,18 @@ class Params(TypedDict):
ctx: RootContext = request.app["ctx"]
async with ctx.get_volume(params["volume"]) as volume:
with handle_fs_errors(volume, params["vfid"]):
await volume.delete_vfolder(params["vfid"])
try:
await volume.delete_vfolder(params["vfid"])
except LockTimeout:
raise web.HTTPBadRequest(
body=json.dumps(
{
"msg": "VFolder is already being deleted",
"vfid": str(params["vfid"]),
},
),
content_type="application/json",
)
return web.Response(status=204)


Expand Down
1 change: 1 addition & 0 deletions src/ai/backend/storage/cephfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel:
return CephFSOpModel(
self.mount_path,
self.local_config["storage-proxy"]["scandir-limit"],
self.local_config["storage-proxy"]["directory-delete-concurrency"],
)

async def get_capabilities(self) -> FrozenSet[str]:
Expand Down
1 change: 1 addition & 0 deletions src/ai/backend/storage/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
t.Key("event-loop", default="asyncio"): t.Enum("asyncio", "uvloop"),
t.Key("scandir-limit", default=1000): t.Int[0:],
t.Key("max-upload-size", default="100g"): tx.BinarySize,
t.Key("directory-delete-concurrency", default=20): t.Int[1:],
t.Key("secret"): t.String, # used to generate JWT tokens
t.Key("session-expire"): tx.TimeDuration,
t.Key("user", default=None): tx.UserID(
Expand Down
5 changes: 5 additions & 0 deletions src/ai/backend/storage/exception.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
from typing import Any, Optional

Expand Down Expand Up @@ -57,6 +58,10 @@ class WatcherClientError(RuntimeError):
pass


class LockTimeout(asyncio.TimeoutError):
pass


class InvalidAPIParameters(web.HTTPBadRequest):
def __init__(
self,
Expand Down
4 changes: 3 additions & 1 deletion src/ai/backend/storage/gpfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ def __init__(
self,
mount_path: Path,
scandir_limit: int,
delete_concurrency: int,
api_client: GPFSAPIClient,
fs: str,
) -> None:
super().__init__(mount_path, scandir_limit)
super().__init__(mount_path, scandir_limit, delete_concurrency)
self.api_client = api_client
self.fs = fs

Expand Down Expand Up @@ -155,6 +156,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel:
return GPFSOpModel(
self.mount_path,
self.local_config["storage-proxy"]["scandir-limit"],
self.local_config["storage-proxy"]["directory-delete-concurrency"],
self.api_client,
self.fs,
)
Expand Down
4 changes: 3 additions & 1 deletion src/ai/backend/storage/netapp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,12 @@ def __init__(
self,
mount_path: Path,
scandir_limit: int,
delete_concurrency: int,
netapp_nfs_host: str,
netapp_xcp_cmd: str,
nas_path: Path,
) -> None:
super().__init__(mount_path, scandir_limit)
super().__init__(mount_path, scandir_limit, delete_concurrency)
self.netapp_nfs_host = netapp_nfs_host
self.netapp_xcp_cmd = netapp_xcp_cmd
self.nas_path = nas_path
Expand Down Expand Up @@ -441,6 +442,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel:
xcp_fsop_model = XCPFSOpModel(
self.mount_path,
self.local_config["storage-proxy"]["scandir-limit"],
self.local_config["storage-proxy"]["directory-delete-concurrency"],
self.netapp_nfs_host,
self.netapp_xcp_cmd,
self.nas_path,
Expand Down
2 changes: 2 additions & 0 deletions src/ai/backend/storage/purestorage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ async def create_fsop_model(self) -> AbstractFSOpModel:
return RapidFileToolsv2FSOpModel(
self.mount_path,
self.local_config["storage-proxy"]["scandir-limit"],
self.local_config["storage-proxy"]["directory-delete-concurrency"],
)
else:
return RapidFileToolsFSOpModel(
self.mount_path,
self.local_config["storage-proxy"]["scandir-limit"],
self.local_config["storage-proxy"]["directory-delete-concurrency"],
)

async def get_toolkit_version(self) -> int:
Expand Down
22 changes: 16 additions & 6 deletions src/ai/backend/storage/vfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import janus
import trafaret as t

from ai.backend.common.lock import FileLock
from ai.backend.common.types import BinarySize, HardwareMetadata, QuotaScopeID
from ai.backend.logging import BraceStyleAdapter

Expand All @@ -24,6 +25,7 @@
ExecutionError,
InvalidAPIParameters,
InvalidQuotaScopeError,
LockTimeout,
NotEmptyError,
QuotaScopeNotFoundError,
)
Expand Down Expand Up @@ -189,9 +191,10 @@ async def delete_quota_scope(


class BaseFSOpModel(AbstractFSOpModel):
def __init__(self, mount_path: Path, scandir_limit: int) -> None:
def __init__(self, mount_path: Path, scandir_limit: int, delete_concurrency: int = 20) -> None:
self.mount_path = mount_path
self.scandir_limit = scandir_limit
self.delete_sema = asyncio.Semaphore(delete_concurrency)

async def copy_tree(
self,
Expand Down Expand Up @@ -224,11 +227,17 @@ async def delete_tree(
self,
path: Path,
) -> None:
loop = asyncio.get_running_loop()
try:
await loop.run_in_executor(None, lambda: shutil.rmtree(path))
except FileNotFoundError:
pass
async with self.delete_sema:
lock_path = path.parent / f"{path.name}.lock"
try:
async with FileLock(lock_path, remove_when_unlock=True):
loop = asyncio.get_running_loop()
try:
await loop.run_in_executor(None, shutil.rmtree, path)
except FileNotFoundError:
pass
except asyncio.TimeoutError:
raise LockTimeout

def scan_tree(
self,
Expand Down Expand Up @@ -371,6 +380,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel:
return BaseFSOpModel(
self.mount_path,
self.local_config["storage-proxy"]["scandir-limit"],
self.local_config["storage-proxy"]["directory-delete-concurrency"],
)

async def get_capabilities(self) -> FrozenSet[str]:
Expand Down
1 change: 1 addition & 0 deletions tests/storage-proxy/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ async def volume(
{
"storage-proxy": {
"scandir-limit": 1000,
"directory-delete-concurrency": 20,
},
},
volume_path,
Expand Down