diff --git a/changes/2737.feature.md b/changes/2737.feature.md new file mode 100644 index 00000000000..2852f74bd2f --- /dev/null +++ b/changes/2737.feature.md @@ -0,0 +1 @@ +Apply semaphore and file-lock on folder deletion. diff --git a/src/ai/backend/storage/api/manager.py b/src/ai/backend/storage/api/manager.py index 36879a45ff3..55a72dd4c63 100644 --- a/src/ai/backend/storage/api/manager.py +++ b/src/ai/backend/storage/api/manager.py @@ -46,6 +46,7 @@ ExternalError, InvalidQuotaConfig, InvalidSubpathError, + LockTimeout, QuotaScopeAlreadyExists, QuotaScopeNotFoundError, StorageProxyError, @@ -388,7 +389,18 @@ class Params(TypedDict): ctx: RootContext = request.app["ctx"] async with ctx.get_volume(params["volume"]) as volume: with handle_fs_errors(volume, params["vfid"]): - await volume.delete_vfolder(params["vfid"]) + try: + await volume.delete_vfolder(params["vfid"]) + except LockTimeout: + raise web.HTTPBadRequest( + body=json.dumps( + { + "msg": "VFolder is already being deleted", + "vfid": str(params["vfid"]), + }, + ), + content_type="application/json", + ) return web.Response(status=204) diff --git a/src/ai/backend/storage/cephfs/__init__.py b/src/ai/backend/storage/cephfs/__init__.py index b942fd7ec6c..60f3458b2fe 100644 --- a/src/ai/backend/storage/cephfs/__init__.py +++ b/src/ai/backend/storage/cephfs/__init__.py @@ -129,6 +129,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel: return CephFSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], ) async def get_capabilities(self) -> FrozenSet[str]: diff --git a/src/ai/backend/storage/config.py b/src/ai/backend/storage/config.py index 43ac34860eb..824c0be27db 100644 --- a/src/ai/backend/storage/config.py +++ b/src/ai/backend/storage/config.py @@ -48,6 +48,7 @@ t.Key("event-loop", default="asyncio"): t.Enum("asyncio", "uvloop"), t.Key("scandir-limit", default=1000): t.Int[0:], t.Key("max-upload-size", default="100g"): tx.BinarySize, + t.Key("directory-delete-concurrency", default=20): t.Int[1:], t.Key("secret"): t.String, # used to generate JWT tokens t.Key("session-expire"): tx.TimeDuration, t.Key("user", default=None): tx.UserID( diff --git a/src/ai/backend/storage/exception.py b/src/ai/backend/storage/exception.py index 45d1172c8e8..0d3e175c4f9 100644 --- a/src/ai/backend/storage/exception.py +++ b/src/ai/backend/storage/exception.py @@ -1,3 +1,4 @@ +import asyncio import json from typing import Any, Optional @@ -57,6 +58,10 @@ class WatcherClientError(RuntimeError): pass +class LockTimeout(asyncio.TimeoutError): + pass + + class InvalidAPIParameters(web.HTTPBadRequest): def __init__( self, diff --git a/src/ai/backend/storage/gpfs/__init__.py b/src/ai/backend/storage/gpfs/__init__.py index ae397e7e68d..b23c0226b9e 100644 --- a/src/ai/backend/storage/gpfs/__init__.py +++ b/src/ai/backend/storage/gpfs/__init__.py @@ -85,10 +85,11 @@ def __init__( self, mount_path: Path, scandir_limit: int, + delete_concurrency: int, api_client: GPFSAPIClient, fs: str, ) -> None: - super().__init__(mount_path, scandir_limit) + super().__init__(mount_path, scandir_limit, delete_concurrency) self.api_client = api_client self.fs = fs @@ -155,6 +156,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel: return GPFSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], self.api_client, self.fs, ) diff --git a/src/ai/backend/storage/netapp/__init__.py b/src/ai/backend/storage/netapp/__init__.py index 16e0c6802a2..fc2a11f00e6 100644 --- a/src/ai/backend/storage/netapp/__init__.py +++ b/src/ai/backend/storage/netapp/__init__.py @@ -182,11 +182,12 @@ def __init__( self, mount_path: Path, scandir_limit: int, + delete_concurrency: int, netapp_nfs_host: str, netapp_xcp_cmd: str, nas_path: Path, ) -> None: - super().__init__(mount_path, scandir_limit) + super().__init__(mount_path, scandir_limit, delete_concurrency) self.netapp_nfs_host = netapp_nfs_host self.netapp_xcp_cmd = netapp_xcp_cmd self.nas_path = nas_path @@ -441,6 +442,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel: xcp_fsop_model = XCPFSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], self.netapp_nfs_host, self.netapp_xcp_cmd, self.nas_path, diff --git a/src/ai/backend/storage/purestorage/__init__.py b/src/ai/backend/storage/purestorage/__init__.py index b40cf153aa5..38fb23ca45c 100644 --- a/src/ai/backend/storage/purestorage/__init__.py +++ b/src/ai/backend/storage/purestorage/__init__.py @@ -36,11 +36,13 @@ async def create_fsop_model(self) -> AbstractFSOpModel: return RapidFileToolsv2FSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], ) else: return RapidFileToolsFSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], ) async def get_toolkit_version(self) -> int: diff --git a/src/ai/backend/storage/vfs/__init__.py b/src/ai/backend/storage/vfs/__init__.py index 57abbc37106..cd4812efbd9 100644 --- a/src/ai/backend/storage/vfs/__init__.py +++ b/src/ai/backend/storage/vfs/__init__.py @@ -16,6 +16,7 @@ import janus import trafaret as t +from ai.backend.common.lock import FileLock from ai.backend.common.types import BinarySize, HardwareMetadata, QuotaScopeID from ai.backend.logging import BraceStyleAdapter @@ -24,6 +25,7 @@ ExecutionError, InvalidAPIParameters, InvalidQuotaScopeError, + LockTimeout, NotEmptyError, QuotaScopeNotFoundError, ) @@ -189,9 +191,10 @@ async def delete_quota_scope( class BaseFSOpModel(AbstractFSOpModel): - def __init__(self, mount_path: Path, scandir_limit: int) -> None: + def __init__(self, mount_path: Path, scandir_limit: int, delete_concurrency: int = 20) -> None: self.mount_path = mount_path self.scandir_limit = scandir_limit + self.delete_sema = asyncio.Semaphore(delete_concurrency) async def copy_tree( self, @@ -224,11 +227,17 @@ async def delete_tree( self, path: Path, ) -> None: - loop = asyncio.get_running_loop() - try: - await loop.run_in_executor(None, lambda: shutil.rmtree(path)) - except FileNotFoundError: - pass + async with self.delete_sema: + lock_path = path.parent / f"{path.name}.lock" + try: + async with FileLock(lock_path, remove_when_unlock=True): + loop = asyncio.get_running_loop() + try: + await loop.run_in_executor(None, shutil.rmtree, path) + except FileNotFoundError: + pass + except asyncio.TimeoutError: + raise LockTimeout def scan_tree( self, @@ -371,6 +380,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel: return BaseFSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], ) async def get_capabilities(self) -> FrozenSet[str]: diff --git a/tests/storage-proxy/conftest.py b/tests/storage-proxy/conftest.py index 4aaf8333086..90295752ab6 100644 --- a/tests/storage-proxy/conftest.py +++ b/tests/storage-proxy/conftest.py @@ -118,6 +118,7 @@ async def volume( { "storage-proxy": { "scandir-limit": 1000, + "directory-delete-concurrency": 20, }, }, volume_path,