From fba9c7c18c72a22cf9eb1128b2da94f44a049fc8 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Mon, 19 Aug 2024 23:50:45 +0900 Subject: [PATCH 1/2] feat: Apply semaphore and file lock on folder deletion --- changes/2737.feature.md | 1 + src/ai/backend/manager/models/vfolder.py | 2 ++ src/ai/backend/storage/api/manager.py | 14 +++++++++- src/ai/backend/storage/cephfs/__init__.py | 1 + src/ai/backend/storage/config.py | 1 + src/ai/backend/storage/exception.py | 5 ++++ src/ai/backend/storage/gpfs/__init__.py | 4 ++- src/ai/backend/storage/netapp/__init__.py | 4 ++- .../backend/storage/purestorage/__init__.py | 2 ++ src/ai/backend/storage/vfs/__init__.py | 26 ++++++++++++++----- tests/storage-proxy/conftest.py | 1 + 11 files changed, 52 insertions(+), 9 deletions(-) create mode 100644 changes/2737.feature.md diff --git a/changes/2737.feature.md b/changes/2737.feature.md new file mode 100644 index 0000000000..2852f74bd2 --- /dev/null +++ b/changes/2737.feature.md @@ -0,0 +1 @@ +Apply semaphore and file-lock on folder deletion. diff --git a/src/ai/backend/manager/models/vfolder.py b/src/ai/backend/manager/models/vfolder.py index 9e13b45427..bd1971cd26 100644 --- a/src/ai/backend/manager/models/vfolder.py +++ b/src/ai/backend/manager/models/vfolder.py @@ -39,6 +39,8 @@ from sqlalchemy.orm import load_only, relationship, selectinload from ai.backend.common.bgtask import ProgressReporter +from ai.backend.common.defs import BackgroundTaskLogLevel as LogLevel +from ai.backend.common.logging import BraceStyleAdapter from ai.backend.common.types import ( MountPermission, QuotaScopeID, diff --git a/src/ai/backend/storage/api/manager.py b/src/ai/backend/storage/api/manager.py index 36879a45ff..55a72dd4c6 100644 --- a/src/ai/backend/storage/api/manager.py +++ b/src/ai/backend/storage/api/manager.py @@ -46,6 +46,7 @@ ExternalError, InvalidQuotaConfig, InvalidSubpathError, + LockTimeout, QuotaScopeAlreadyExists, QuotaScopeNotFoundError, StorageProxyError, @@ -388,7 +389,18 @@ class Params(TypedDict): ctx: RootContext = request.app["ctx"] async with ctx.get_volume(params["volume"]) as volume: with handle_fs_errors(volume, params["vfid"]): - await volume.delete_vfolder(params["vfid"]) + try: + await volume.delete_vfolder(params["vfid"]) + except LockTimeout: + raise web.HTTPBadRequest( + body=json.dumps( + { + "msg": "VFolder is already being deleted", + "vfid": str(params["vfid"]), + }, + ), + content_type="application/json", + ) return web.Response(status=204) diff --git a/src/ai/backend/storage/cephfs/__init__.py b/src/ai/backend/storage/cephfs/__init__.py index b942fd7ec6..60f3458b2f 100644 --- a/src/ai/backend/storage/cephfs/__init__.py +++ b/src/ai/backend/storage/cephfs/__init__.py @@ -129,6 +129,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel: return CephFSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], ) async def get_capabilities(self) -> FrozenSet[str]: diff --git a/src/ai/backend/storage/config.py b/src/ai/backend/storage/config.py index 43ac34860e..824c0be27d 100644 --- a/src/ai/backend/storage/config.py +++ b/src/ai/backend/storage/config.py @@ -48,6 +48,7 @@ t.Key("event-loop", default="asyncio"): t.Enum("asyncio", "uvloop"), t.Key("scandir-limit", default=1000): t.Int[0:], t.Key("max-upload-size", default="100g"): tx.BinarySize, + t.Key("directory-delete-concurrency", default=20): t.Int[1:], t.Key("secret"): t.String, # used to generate JWT tokens t.Key("session-expire"): tx.TimeDuration, t.Key("user", default=None): tx.UserID( diff --git a/src/ai/backend/storage/exception.py b/src/ai/backend/storage/exception.py index 45d1172c8e..0d3e175c4f 100644 --- a/src/ai/backend/storage/exception.py +++ b/src/ai/backend/storage/exception.py @@ -1,3 +1,4 @@ +import asyncio import json from typing import Any, Optional @@ -57,6 +58,10 @@ class WatcherClientError(RuntimeError): pass +class LockTimeout(asyncio.TimeoutError): + pass + + class InvalidAPIParameters(web.HTTPBadRequest): def __init__( self, diff --git a/src/ai/backend/storage/gpfs/__init__.py b/src/ai/backend/storage/gpfs/__init__.py index ae397e7e68..b23c0226b9 100644 --- a/src/ai/backend/storage/gpfs/__init__.py +++ b/src/ai/backend/storage/gpfs/__init__.py @@ -85,10 +85,11 @@ def __init__( self, mount_path: Path, scandir_limit: int, + delete_concurrency: int, api_client: GPFSAPIClient, fs: str, ) -> None: - super().__init__(mount_path, scandir_limit) + super().__init__(mount_path, scandir_limit, delete_concurrency) self.api_client = api_client self.fs = fs @@ -155,6 +156,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel: return GPFSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], self.api_client, self.fs, ) diff --git a/src/ai/backend/storage/netapp/__init__.py b/src/ai/backend/storage/netapp/__init__.py index 16e0c6802a..fc2a11f00e 100644 --- a/src/ai/backend/storage/netapp/__init__.py +++ b/src/ai/backend/storage/netapp/__init__.py @@ -182,11 +182,12 @@ def __init__( self, mount_path: Path, scandir_limit: int, + delete_concurrency: int, netapp_nfs_host: str, netapp_xcp_cmd: str, nas_path: Path, ) -> None: - super().__init__(mount_path, scandir_limit) + super().__init__(mount_path, scandir_limit, delete_concurrency) self.netapp_nfs_host = netapp_nfs_host self.netapp_xcp_cmd = netapp_xcp_cmd self.nas_path = nas_path @@ -441,6 +442,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel: xcp_fsop_model = XCPFSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], self.netapp_nfs_host, self.netapp_xcp_cmd, self.nas_path, diff --git a/src/ai/backend/storage/purestorage/__init__.py b/src/ai/backend/storage/purestorage/__init__.py index b40cf153aa..38fb23ca45 100644 --- a/src/ai/backend/storage/purestorage/__init__.py +++ b/src/ai/backend/storage/purestorage/__init__.py @@ -36,11 +36,13 @@ async def create_fsop_model(self) -> AbstractFSOpModel: return RapidFileToolsv2FSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], ) else: return RapidFileToolsFSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], ) async def get_toolkit_version(self) -> int: diff --git a/src/ai/backend/storage/vfs/__init__.py b/src/ai/backend/storage/vfs/__init__.py index 57abbc3710..6b52877a1b 100644 --- a/src/ai/backend/storage/vfs/__init__.py +++ b/src/ai/backend/storage/vfs/__init__.py @@ -16,6 +16,11 @@ import janus import trafaret as t +<<<<<<< HEAD +======= +from ai.backend.common.lock import FileLock +from ai.backend.common.logging import BraceStyleAdapter +>>>>>>> b16285cbe (feat: Apply semaphore and file lock on folder deletion) from ai.backend.common.types import BinarySize, HardwareMetadata, QuotaScopeID from ai.backend.logging import BraceStyleAdapter @@ -24,6 +29,7 @@ ExecutionError, InvalidAPIParameters, InvalidQuotaScopeError, + LockTimeout, NotEmptyError, QuotaScopeNotFoundError, ) @@ -189,9 +195,10 @@ async def delete_quota_scope( class BaseFSOpModel(AbstractFSOpModel): - def __init__(self, mount_path: Path, scandir_limit: int) -> None: + def __init__(self, mount_path: Path, scandir_limit: int, delete_concurrency: int = 20) -> None: self.mount_path = mount_path self.scandir_limit = scandir_limit + self.delete_sema = asyncio.Semaphore(delete_concurrency) async def copy_tree( self, @@ -224,11 +231,17 @@ async def delete_tree( self, path: Path, ) -> None: - loop = asyncio.get_running_loop() - try: - await loop.run_in_executor(None, lambda: shutil.rmtree(path)) - except FileNotFoundError: - pass + async with self.delete_sema: + lock_path = path.parent / f"{path.name}.lock" + try: + async with FileLock(lock_path, remove_when_unlock=True): + loop = asyncio.get_running_loop() + try: + await loop.run_in_executor(None, shutil.rmtree, path) + except FileNotFoundError: + pass + except asyncio.TimeoutError: + raise LockTimeout def scan_tree( self, @@ -371,6 +384,7 @@ async def create_fsop_model(self) -> AbstractFSOpModel: return BaseFSOpModel( self.mount_path, self.local_config["storage-proxy"]["scandir-limit"], + self.local_config["storage-proxy"]["directory-delete-concurrency"], ) async def get_capabilities(self) -> FrozenSet[str]: diff --git a/tests/storage-proxy/conftest.py b/tests/storage-proxy/conftest.py index 4aaf833308..90295752ab 100644 --- a/tests/storage-proxy/conftest.py +++ b/tests/storage-proxy/conftest.py @@ -118,6 +118,7 @@ async def volume( { "storage-proxy": { "scandir-limit": 1000, + "directory-delete-concurrency": 20, }, }, volume_path, From 98071a9e1b959df8e0d51b2896f9a4d50f96a4ae Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Sat, 9 Nov 2024 12:46:12 +0900 Subject: [PATCH 2/2] update after merge main --- src/ai/backend/manager/models/vfolder.py | 2 -- src/ai/backend/storage/vfs/__init__.py | 4 ---- 2 files changed, 6 deletions(-) diff --git a/src/ai/backend/manager/models/vfolder.py b/src/ai/backend/manager/models/vfolder.py index bd1971cd26..9e13b45427 100644 --- a/src/ai/backend/manager/models/vfolder.py +++ b/src/ai/backend/manager/models/vfolder.py @@ -39,8 +39,6 @@ from sqlalchemy.orm import load_only, relationship, selectinload from ai.backend.common.bgtask import ProgressReporter -from ai.backend.common.defs import BackgroundTaskLogLevel as LogLevel -from ai.backend.common.logging import BraceStyleAdapter from ai.backend.common.types import ( MountPermission, QuotaScopeID, diff --git a/src/ai/backend/storage/vfs/__init__.py b/src/ai/backend/storage/vfs/__init__.py index 6b52877a1b..cd4812efbd 100644 --- a/src/ai/backend/storage/vfs/__init__.py +++ b/src/ai/backend/storage/vfs/__init__.py @@ -16,11 +16,7 @@ import janus import trafaret as t -<<<<<<< HEAD -======= from ai.backend.common.lock import FileLock -from ai.backend.common.logging import BraceStyleAdapter ->>>>>>> b16285cbe (feat: Apply semaphore and file lock on folder deletion) from ai.backend.common.types import BinarySize, HardwareMetadata, QuotaScopeID from ai.backend.logging import BraceStyleAdapter