Skip to content

Commit

Permalink
feat: Apply semaphore and file lock on folder deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
fregataa committed Aug 20, 2024
1 parent 0466662 commit b16285c
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 9 deletions.
1 change: 1 addition & 0 deletions changes/2737.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Apply semaphore and file-lock on folder deletion.
14 changes: 13 additions & 1 deletion src/ai/backend/storage/api/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
ExternalError,
InvalidQuotaConfig,
InvalidSubpathError,
LockTimeout,
QuotaScopeAlreadyExists,
QuotaScopeNotFoundError,
StorageProxyError,
Expand Down Expand Up @@ -388,7 +389,18 @@ class Params(TypedDict):
ctx: RootContext = request.app["ctx"]
async with ctx.get_volume(params["volume"]) as volume:
with handle_fs_errors(volume, params["vfid"]):
await volume.delete_vfolder(params["vfid"])
try:
await volume.delete_vfolder(params["vfid"])
except LockTimeout:
raise web.HTTPBadRequest(
body=json.dumps(
{
"msg": "VFolder is already being deleted",
"vfid": str(params["vfid"]),
},
),
content_type="application/json",
)
return web.Response(status=204)


Expand Down
1 change: 1 addition & 0 deletions src/ai/backend/storage/cephfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel:
return CephFSOpModel(
self.mount_path,
self.local_config["storage-proxy"]["scandir-limit"],
self.local_config["storage-proxy"]["directory-delete-concurrency"],
)

async def get_capabilities(self) -> FrozenSet[str]:
Expand Down
1 change: 1 addition & 0 deletions src/ai/backend/storage/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
t.Key("event-loop", default="asyncio"): t.Enum("asyncio", "uvloop"),
t.Key("scandir-limit", default=1000): t.Int[0:],
t.Key("max-upload-size", default="100g"): tx.BinarySize,
t.Key("directory-delete-concurrency", default=20): t.Int[1:],
t.Key("secret"): t.String, # used to generate JWT tokens
t.Key("session-expire"): tx.TimeDuration,
t.Key("user", default=None): tx.UserID(
Expand Down
5 changes: 5 additions & 0 deletions src/ai/backend/storage/exception.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import json
from typing import Any, Optional

Expand Down Expand Up @@ -57,6 +58,10 @@ class WatcherClientError(RuntimeError):
pass


class LockTimeout(asyncio.TimeoutError):
pass


class InvalidAPIParameters(web.HTTPBadRequest):
def __init__(
self,
Expand Down
4 changes: 3 additions & 1 deletion src/ai/backend/storage/gpfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ def __init__(
self,
mount_path: Path,
scandir_limit: int,
delete_concurrency: int,
api_client: GPFSAPIClient,
fs: str,
) -> None:
super().__init__(mount_path, scandir_limit)
super().__init__(mount_path, scandir_limit, delete_concurrency)
self.api_client = api_client
self.fs = fs

Expand Down Expand Up @@ -155,6 +156,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel:
return GPFSOpModel(
self.mount_path,
self.local_config["storage-proxy"]["scandir-limit"],
self.local_config["storage-proxy"]["directory-delete-concurrency"],
self.api_client,
self.fs,
)
Expand Down
4 changes: 3 additions & 1 deletion src/ai/backend/storage/netapp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,12 @@ def __init__(
self,
mount_path: Path,
scandir_limit: int,
delete_concurrency: int,
netapp_nfs_host: str,
netapp_xcp_cmd: str,
nas_path: Path,
) -> None:
super().__init__(mount_path, scandir_limit)
super().__init__(mount_path, scandir_limit, delete_concurrency)
self.netapp_nfs_host = netapp_nfs_host
self.netapp_xcp_cmd = netapp_xcp_cmd
self.nas_path = nas_path
Expand Down Expand Up @@ -441,6 +442,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel:
xcp_fsop_model = XCPFSOpModel(
self.mount_path,
self.local_config["storage-proxy"]["scandir-limit"],
self.local_config["storage-proxy"]["directory-delete-concurrency"],
self.netapp_nfs_host,
self.netapp_xcp_cmd,
self.nas_path,
Expand Down
2 changes: 2 additions & 0 deletions src/ai/backend/storage/purestorage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ async def create_fsop_model(self) -> AbstractFSOpModel:
return RapidFileToolsv2FSOpModel(
self.mount_path,
self.local_config["storage-proxy"]["scandir-limit"],
self.local_config["storage-proxy"]["directory-delete-concurrency"],
)
else:
return RapidFileToolsFSOpModel(
self.mount_path,
self.local_config["storage-proxy"]["scandir-limit"],
self.local_config["storage-proxy"]["directory-delete-concurrency"],
)

async def get_toolkit_version(self) -> int:
Expand Down
22 changes: 16 additions & 6 deletions src/ai/backend/storage/vfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import janus
import trafaret as t

from ai.backend.common.lock import FileLock
from ai.backend.common.logging import BraceStyleAdapter
from ai.backend.common.types import BinarySize, HardwareMetadata, QuotaScopeID

Expand All @@ -24,6 +25,7 @@
ExecutionError,
InvalidAPIParameters,
InvalidQuotaScopeError,
LockTimeout,
NotEmptyError,
QuotaScopeNotFoundError,
)
Expand Down Expand Up @@ -189,9 +191,10 @@ async def delete_quota_scope(


class BaseFSOpModel(AbstractFSOpModel):
def __init__(self, mount_path: Path, scandir_limit: int) -> None:
def __init__(self, mount_path: Path, scandir_limit: int, delete_concurrency: int = 20) -> None:
self.mount_path = mount_path
self.scandir_limit = scandir_limit
self.delete_sema = asyncio.Semaphore(delete_concurrency)

async def copy_tree(
self,
Expand Down Expand Up @@ -224,11 +227,17 @@ async def delete_tree(
self,
path: Path,
) -> None:
loop = asyncio.get_running_loop()
try:
await loop.run_in_executor(None, lambda: shutil.rmtree(path))
except FileNotFoundError:
pass
async with self.delete_sema:
lock_path = path.parent / f"{path.name}.lock"
try:
async with FileLock(lock_path, remove_when_unlock=True):
loop = asyncio.get_running_loop()
try:
await loop.run_in_executor(None, shutil.rmtree, path)
except FileNotFoundError:
pass
except asyncio.TimeoutError:
raise LockTimeout

def scan_tree(
self,
Expand Down Expand Up @@ -371,6 +380,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel:
return BaseFSOpModel(
self.mount_path,
self.local_config["storage-proxy"]["scandir-limit"],
self.local_config["storage-proxy"]["directory-delete-concurrency"],
)

async def get_capabilities(self) -> FrozenSet[str]:
Expand Down
1 change: 1 addition & 0 deletions tests/storage-proxy/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ async def volume(
{
"storage-proxy": {
"scandir-limit": 1000,
"directory-delete-concurrency": 20,
},
},
volume_path,
Expand Down

0 comments on commit b16285c

Please sign in to comment.