From 8e76034ad0131d4833807d6679a203b468cba0f8 Mon Sep 17 00:00:00 2001 From: Mattia Almansi Date: Thu, 17 Aug 2023 17:32:16 +0100 Subject: [PATCH 1/8] start improving file locking --- cacholote/extra_encoders.py | 6 ++-- cacholote/utils.py | 57 +++++++++++++++++++++---------------- tests/test_50_io_encoder.py | 12 ++++---- 3 files changed, 40 insertions(+), 35 deletions(-) diff --git a/cacholote/extra_encoders.py b/cacholote/extra_encoders.py index a82cf36..65a9865 100644 --- a/cacholote/extra_encoders.py +++ b/cacholote/extra_encoders.py @@ -216,7 +216,7 @@ def _maybe_store_xr_dataset( obj: "xr.Dataset", fs: fsspec.AbstractFileSystem, urlpath: str, filetype: str ) -> None: if filetype == "application/vnd+zarr": - with utils._Locker(fs, urlpath) as file_exists: + with utils.FileLock(fs, urlpath) as file_exists: if not file_exists: # Write directly on any filesystem mapper = fs.get_mapper(urlpath) @@ -285,7 +285,7 @@ def _maybe_store_file_object( ) -> None: if io_delete_original is None: io_delete_original = config.get().io_delete_original - with utils._Locker(fs_out, urlpath_out) as file_exists: + with utils.FileLock(fs_out, urlpath_out) as file_exists: if not file_exists: kwargs = {} content_type = _guess_type(fs_in, urlpath_in) @@ -321,7 +321,7 @@ def _maybe_store_io_object( fs_out: fsspec.AbstractFileSystem, urlpath_out: str, ) -> None: - with utils._Locker(fs_out, urlpath_out) as file_exists: + with utils.FileLock(fs_out, urlpath_out) as file_exists: if not file_exists: f_out = fs_out.open(urlpath_out, "wb") with _logging_timer("upload", urlpath=fs_out.unstrip_protocol(urlpath_out)): diff --git a/cacholote/utils.py b/cacholote/utils.py index a143894..706a98a 100644 --- a/cacholote/utils.py +++ b/cacholote/utils.py @@ -15,7 +15,9 @@ # See the License for the specific language governing permissions and # limitations under the License.import hashlib +import dataclasses import datetime +import functools import hashlib import io import time @@ -67,22 +69,26 @@ def copy_buffered_file( f_out.write(data if isinstance(data, bytes) else data.encode()) -class _Locker: - def __init__( - self, - fs: fsspec.AbstractFileSystem, - urlpath: str, - lock_validity_period: Optional[float] = None, - ) -> None: - self.fs = fs - self.urlpath = urlpath - self.lockfile = urlpath + ".lock" - self.lock_validity_period = lock_validity_period +@dataclasses.dataclass +class FileLock: + fs: fsspec.AbstractFileSystem # fsspec file system + urlpath: str # file to lock + lock_validity_period: Optional[float] = None # lock validity period in seconds + lock_timeout: Optional[float] = None # lock timeout in seconds + lock_suffix: str = ".lock" # suffix for lock file + + @functools.cached_property + def lockfile(self) -> str: + return self.urlpath + self.lock_suffix @property def file_exists(self) -> bool: return bool(self.fs.exists(self.urlpath)) + @property + def lock_exists(self) -> bool: + return bool(self.fs.exists(self.lockfile)) + def acquire(self) -> None: self.fs.touch(self.lockfile) @@ -92,27 +98,28 @@ def release(self) -> None: @property def is_locked(self) -> bool: - if not self.fs.exists(self.lockfile): - return False - - delta = datetime.datetime.now() - self.fs.modified(self.lockfile) - if self.lock_validity_period is None or delta < datetime.timedelta( - seconds=self.lock_validity_period - ): - return True - + if self.lock_exists: + if self.lock_validity_period is None: + return True + delta = datetime.datetime.now() - self.fs.modified(self.lockfile) + if delta < datetime.timedelta(seconds=self.lock_validity_period): + return True return False def wait_until_released(self) -> None: warned = False + message = f"{self.urlpath!r} is locked: {self.lockfile!r}" + start = time.perf_counter() while self.is_locked: + if ( + self.lock_timeout is not None + and time.perf_counter() - start > self.lock_timeout + ): + raise TimeoutError(message) if not warned: - warnings.warn( - f"can NOT proceed until file is released: {self.lockfile!r}.", - UserWarning, - ) + warnings.warn(message, UserWarning) warned = True - time.sleep(1) + time.sleep(min(1, self.lock_timeout or 1)) def __enter__(self) -> bool: self.wait_until_released() diff --git a/tests/test_50_io_encoder.py b/tests/test_50_io_encoder.py index 995898a..448a595 100644 --- a/tests/test_50_io_encoder.py +++ b/tests/test_50_io_encoder.py @@ -153,19 +153,17 @@ def test_io_locker_warning(tmpdir: pathlib.Path) -> None: # Acquire lock fs, dirname = utils.get_cache_files_fs_dirname() - file_hash = f"{fsspec.filesystem('file').checksum(tmpfile):x}" - lock = f"{dirname}/{file_hash}.txt.lock" - fs.touch(lock) + file_path = f"{dirname}/{fsspec.filesystem('file').checksum(tmpfile):x}.txt" + lock_path = f"{file_path}.lock" + fs.touch(lock_path) def release_lock(fs: fsspec.AbstractFileSystem, lock: str) -> None: fs.rm(lock) # Threading t1 = threading.Timer(0, cached_open, args=(tmpfile,)) - t2 = threading.Timer(0.1, release_lock, args=(fs, lock)) - with pytest.warns( - UserWarning, match=f"can NOT proceed until file is released: {lock!r}." - ): + t2 = threading.Timer(0.1, release_lock, args=(fs, lock_path)) + with pytest.warns(UserWarning, match="is locked"): t1.start() t2.start() t1.join() From 62e69b477730e7f85dcda59fa443cbaa131679e7 Mon Sep 17 00:00:00 2001 From: Mattia Almansi Date: Thu, 17 Aug 2023 18:05:09 +0100 Subject: [PATCH 2/8] cleanup --- cacholote/config.py | 1 + cacholote/extra_encoders.py | 12 +++++++++--- cacholote/utils.py | 22 +++------------------- tests/test_50_io_encoder.py | 18 ++++++++++++++++-- 4 files changed, 29 insertions(+), 24 deletions(-) diff --git a/cacholote/config.py b/cacholote/config.py index f677803..22046eb 100644 --- a/cacholote/config.py +++ b/cacholote/config.py @@ -60,6 +60,7 @@ class Settings(pydantic.BaseSettings): logger: Union[ structlog.BoundLogger, structlog._config.BoundLoggerLazyProxy ] = _DEFAULT_LOGGER + lock_timeout: Optional[float] = None @pydantic.validator("create_engine_kwargs", allow_reuse=True) def validate_create_engine_kwargs( diff --git a/cacholote/extra_encoders.py b/cacholote/extra_encoders.py index 65a9865..0985e0c 100644 --- a/cacholote/extra_encoders.py +++ b/cacholote/extra_encoders.py @@ -216,7 +216,9 @@ def _maybe_store_xr_dataset( obj: "xr.Dataset", fs: fsspec.AbstractFileSystem, urlpath: str, filetype: str ) -> None: if filetype == "application/vnd+zarr": - with utils.FileLock(fs, urlpath) as file_exists: + with utils.FileLock( + fs, urlpath, lock_timeout=config.get().lock_timeout + ) as file_exists: if not file_exists: # Write directly on any filesystem mapper = fs.get_mapper(urlpath) @@ -285,7 +287,9 @@ def _maybe_store_file_object( ) -> None: if io_delete_original is None: io_delete_original = config.get().io_delete_original - with utils.FileLock(fs_out, urlpath_out) as file_exists: + with utils.FileLock( + fs_out, urlpath_out, lock_timeout=config.get().lock_timeout + ) as file_exists: if not file_exists: kwargs = {} content_type = _guess_type(fs_in, urlpath_in) @@ -321,7 +325,9 @@ def _maybe_store_io_object( fs_out: fsspec.AbstractFileSystem, urlpath_out: str, ) -> None: - with utils.FileLock(fs_out, urlpath_out) as file_exists: + with utils.FileLock( + fs_out, urlpath_out, lock_timeout=config.get().lock_timeout + ) as file_exists: if not file_exists: f_out = fs_out.open(urlpath_out, "wb") with _logging_timer("upload", urlpath=fs_out.unstrip_protocol(urlpath_out)): diff --git a/cacholote/utils.py b/cacholote/utils.py index 706a98a..1f78101 100644 --- a/cacholote/utils.py +++ b/cacholote/utils.py @@ -73,21 +73,11 @@ def copy_buffered_file( class FileLock: fs: fsspec.AbstractFileSystem # fsspec file system urlpath: str # file to lock - lock_validity_period: Optional[float] = None # lock validity period in seconds lock_timeout: Optional[float] = None # lock timeout in seconds - lock_suffix: str = ".lock" # suffix for lock file @functools.cached_property def lockfile(self) -> str: - return self.urlpath + self.lock_suffix - - @property - def file_exists(self) -> bool: - return bool(self.fs.exists(self.urlpath)) - - @property - def lock_exists(self) -> bool: - return bool(self.fs.exists(self.lockfile)) + return self.urlpath + ".lock" def acquire(self) -> None: self.fs.touch(self.lockfile) @@ -98,13 +88,7 @@ def release(self) -> None: @property def is_locked(self) -> bool: - if self.lock_exists: - if self.lock_validity_period is None: - return True - delta = datetime.datetime.now() - self.fs.modified(self.lockfile) - if delta < datetime.timedelta(seconds=self.lock_validity_period): - return True - return False + return bool(self.fs.exists(self.lockfile)) def wait_until_released(self) -> None: warned = False @@ -124,7 +108,7 @@ def wait_until_released(self) -> None: def __enter__(self) -> bool: self.wait_until_released() self.acquire() - return self.file_exists + return bool(self.fs.exists(self.urlpath)) def __exit__( self, diff --git a/tests/test_50_io_encoder.py b/tests/test_50_io_encoder.py index 448a595..575ddb9 100644 --- a/tests/test_50_io_encoder.py +++ b/tests/test_50_io_encoder.py @@ -1,9 +1,10 @@ +import contextlib import hashlib import importlib import io import pathlib import threading -from typing import Any, Dict, Tuple, Union +from typing import Any, Dict, Optional, Tuple, Union import fsspec import pytest @@ -146,7 +147,20 @@ def test_io_corrupted_files( assert fs.exists(f"{dirname}/{cached_basename}") -def test_io_locker_warning(tmpdir: pathlib.Path) -> None: +@pytest.mark.parametrize( + "lock_timeout, expected", + ( + [None, pytest.warns(UserWarning, match="is locked")], + [0, pytest.raises(TimeoutError, match="is locked")], + ), +) +def test_io_locker_warning( + tmpdir: pathlib.Path, + lock_timeout: Optional[float], + expected: contextlib.nullcontext[Any], +) -> None: + config.set(lock_timeout=lock_timeout) + # Create tmpfile tmpfile = tmpdir / "test.txt" fsspec.filesystem("file").touch(tmpfile) From 5cfe2ad84e1fcd3600d4c5c6df51539e9c11a591 Mon Sep 17 00:00:00 2001 From: Mattia Almansi Date: Thu, 17 Aug 2023 18:11:39 +0100 Subject: [PATCH 3/8] break tests --- tests/test_50_io_encoder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_50_io_encoder.py b/tests/test_50_io_encoder.py index 575ddb9..bcc276f 100644 --- a/tests/test_50_io_encoder.py +++ b/tests/test_50_io_encoder.py @@ -177,7 +177,7 @@ def release_lock(fs: fsspec.AbstractFileSystem, lock: str) -> None: # Threading t1 = threading.Timer(0, cached_open, args=(tmpfile,)) t2 = threading.Timer(0.1, release_lock, args=(fs, lock_path)) - with pytest.warns(UserWarning, match="is locked"): + with expected: t1.start() t2.start() t1.join() From b8049b4415d29113bc7c64ad2ca099f33512f7c4 Mon Sep 17 00:00:00 2001 From: Mattia Almansi Date: Thu, 17 Aug 2023 19:27:34 +0100 Subject: [PATCH 4/8] fix tests --- cacholote/utils.py | 2 +- tests/test_50_io_encoder.py | 18 +++++------------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/cacholote/utils.py b/cacholote/utils.py index 1f78101..a8809e9 100644 --- a/cacholote/utils.py +++ b/cacholote/utils.py @@ -73,7 +73,7 @@ def copy_buffered_file( class FileLock: fs: fsspec.AbstractFileSystem # fsspec file system urlpath: str # file to lock - lock_timeout: Optional[float] = None # lock timeout in seconds + lock_timeout: Optional[float] # lock timeout in seconds @functools.cached_property def lockfile(self) -> str: diff --git a/tests/test_50_io_encoder.py b/tests/test_50_io_encoder.py index bcc276f..fb306b2 100644 --- a/tests/test_50_io_encoder.py +++ b/tests/test_50_io_encoder.py @@ -3,7 +3,7 @@ import importlib import io import pathlib -import threading +import subprocess from typing import Any, Dict, Optional, Tuple, Union import fsspec @@ -159,8 +159,7 @@ def test_io_locker_warning( lock_timeout: Optional[float], expected: contextlib.nullcontext[Any], ) -> None: - config.set(lock_timeout=lock_timeout) - + config.set(lock_timeout=lock_timeout, raise_all_encoding_errors=True) # Create tmpfile tmpfile = tmpdir / "test.txt" fsspec.filesystem("file").touch(tmpfile) @@ -171,17 +170,10 @@ def test_io_locker_warning( lock_path = f"{file_path}.lock" fs.touch(lock_path) - def release_lock(fs: fsspec.AbstractFileSystem, lock: str) -> None: - fs.rm(lock) - - # Threading - t1 = threading.Timer(0, cached_open, args=(tmpfile,)) - t2 = threading.Timer(0.1, release_lock, args=(fs, lock_path)) + process = subprocess.Popen(f"sleep 0.1; rm {lock_path}", shell=True) with expected: - t1.start() - t2.start() - t1.join() - t2.join() + cached_open(tmpfile) + assert not process.wait() @pytest.mark.parametrize("set_cache", ["cads"], indirect=True) From 33b6c148c05597b756e58d1322c362f4c816b076 Mon Sep 17 00:00:00 2001 From: Mattia Almansi Date: Fri, 18 Aug 2023 07:59:22 +0100 Subject: [PATCH 5/8] cleanup --- cacholote/extra_encoders.py | 6 +++--- cacholote/utils.py | 9 +++------ tests/test_50_io_encoder.py | 15 +++++++-------- 3 files changed, 13 insertions(+), 17 deletions(-) diff --git a/cacholote/extra_encoders.py b/cacholote/extra_encoders.py index 0985e0c..8514843 100644 --- a/cacholote/extra_encoders.py +++ b/cacholote/extra_encoders.py @@ -217,7 +217,7 @@ def _maybe_store_xr_dataset( ) -> None: if filetype == "application/vnd+zarr": with utils.FileLock( - fs, urlpath, lock_timeout=config.get().lock_timeout + fs, urlpath, timeout=config.get().lock_timeout ) as file_exists: if not file_exists: # Write directly on any filesystem @@ -288,7 +288,7 @@ def _maybe_store_file_object( if io_delete_original is None: io_delete_original = config.get().io_delete_original with utils.FileLock( - fs_out, urlpath_out, lock_timeout=config.get().lock_timeout + fs_out, urlpath_out, timeout=config.get().lock_timeout ) as file_exists: if not file_exists: kwargs = {} @@ -326,7 +326,7 @@ def _maybe_store_io_object( urlpath_out: str, ) -> None: with utils.FileLock( - fs_out, urlpath_out, lock_timeout=config.get().lock_timeout + fs_out, urlpath_out, timeout=config.get().lock_timeout ) as file_exists: if not file_exists: f_out = fs_out.open(urlpath_out, "wb") diff --git a/cacholote/utils.py b/cacholote/utils.py index a8809e9..34653a3 100644 --- a/cacholote/utils.py +++ b/cacholote/utils.py @@ -73,7 +73,7 @@ def copy_buffered_file( class FileLock: fs: fsspec.AbstractFileSystem # fsspec file system urlpath: str # file to lock - lock_timeout: Optional[float] # lock timeout in seconds + timeout: Optional[float] # lock timeout in seconds @functools.cached_property def lockfile(self) -> str: @@ -95,15 +95,12 @@ def wait_until_released(self) -> None: message = f"{self.urlpath!r} is locked: {self.lockfile!r}" start = time.perf_counter() while self.is_locked: - if ( - self.lock_timeout is not None - and time.perf_counter() - start > self.lock_timeout - ): + if self.timeout is not None and time.perf_counter() - start > self.timeout: raise TimeoutError(message) if not warned: warnings.warn(message, UserWarning) warned = True - time.sleep(min(1, self.lock_timeout or 1)) + time.sleep(min(1, self.timeout or 1)) def __enter__(self) -> bool: self.wait_until_released() diff --git a/tests/test_50_io_encoder.py b/tests/test_50_io_encoder.py index fb306b2..075f209 100644 --- a/tests/test_50_io_encoder.py +++ b/tests/test_50_io_encoder.py @@ -148,16 +148,16 @@ def test_io_corrupted_files( @pytest.mark.parametrize( - "lock_timeout, expected", + "lock_timeout, raises_or_warns", ( [None, pytest.warns(UserWarning, match="is locked")], [0, pytest.raises(TimeoutError, match="is locked")], ), ) -def test_io_locker_warning( +def test_io_locker( tmpdir: pathlib.Path, lock_timeout: Optional[float], - expected: contextlib.nullcontext[Any], + raises_or_warns: contextlib.nullcontext, # type: ignore[type-arg] ) -> None: config.set(lock_timeout=lock_timeout, raise_all_encoding_errors=True) # Create tmpfile @@ -167,13 +167,12 @@ def test_io_locker_warning( # Acquire lock fs, dirname = utils.get_cache_files_fs_dirname() file_path = f"{dirname}/{fsspec.filesystem('file').checksum(tmpfile):x}.txt" - lock_path = f"{file_path}.lock" - fs.touch(lock_path) + fs.touch(f"{file_path}.lock") - process = subprocess.Popen(f"sleep 0.1; rm {lock_path}", shell=True) - with expected: + process = subprocess.Popen(f"sleep 0.1; rm {file_path}.lock", shell=True) + with raises_or_warns: cached_open(tmpfile) - assert not process.wait() + assert process.wait() == 0 @pytest.mark.parametrize("set_cache", ["cads"], indirect=True) From a5a1205f6279cb1923ef053659d6c9600f7baaff Mon Sep 17 00:00:00 2001 From: Mattia Almansi Date: Fri, 18 Aug 2023 08:04:11 +0100 Subject: [PATCH 6/8] cleanup --- tests/test_60_clean.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/test_60_clean.py b/tests/test_60_clean.py index 569e61d..fcb839a 100644 --- a/tests/test_60_clean.py +++ b/tests/test_60_clean.py @@ -205,8 +205,6 @@ def test_clean_invalid_cache_entries( tmpdir: pathlib.Path, check_expiration: bool, try_decode: bool ) -> None: fs, dirname = utils.get_cache_files_fs_dirname() - con = config.get().engine.raw_connection() - cur = con.cursor() # Valid cache file fsspec.filesystem("file").touch(tmpdir / "valid.txt") @@ -229,6 +227,10 @@ def test_clean_invalid_cache_entries( clean.clean_invalid_cache_entries( check_expiration=check_expiration, try_decode=try_decode ) + + # Check database + con = config.get().engine.raw_connection() + cur = con.cursor() cur.execute("SELECT * FROM cache_entries", ()) assert len(cur.fetchall()) == 3 - check_expiration - try_decode assert valid in fs.ls(dirname) From 5cbe64725bf9961f53caeb48795d2c1e743cca71 Mon Sep 17 00:00:00 2001 From: Mattia Almansi Date: Fri, 18 Aug 2023 08:09:24 +0100 Subject: [PATCH 7/8] cleanup --- tests/test_60_clean.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_60_clean.py b/tests/test_60_clean.py index fcb839a..dec2d3c 100644 --- a/tests/test_60_clean.py +++ b/tests/test_60_clean.py @@ -232,7 +232,8 @@ def test_clean_invalid_cache_entries( con = config.get().engine.raw_connection() cur = con.cursor() cur.execute("SELECT * FROM cache_entries", ()) - assert len(cur.fetchall()) == 3 - check_expiration - try_decode + nrows = len(cur.fetchall()) + assert nrows == 3 - check_expiration - try_decode assert valid in fs.ls(dirname) assert ( corrupted not in fs.ls(dirname) if try_decode else corrupted in fs.ls(dirname) From e249c3ca24f2f9588ce5ae45ea4ea902881ac534 Mon Sep 17 00:00:00 2001 From: Mattia Almansi Date: Fri, 18 Aug 2023 08:12:18 +0100 Subject: [PATCH 8/8] done --- tests/test_60_clean.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/test_60_clean.py b/tests/test_60_clean.py index dec2d3c..6b9d0a3 100644 --- a/tests/test_60_clean.py +++ b/tests/test_60_clean.py @@ -204,6 +204,8 @@ def test_delete_cache_entry_and_files(tmpdir: pathlib.Path) -> None: def test_clean_invalid_cache_entries( tmpdir: pathlib.Path, check_expiration: bool, try_decode: bool ) -> None: + con = config.get().engine.raw_connection() + cur = con.cursor() fs, dirname = utils.get_cache_files_fs_dirname() # Valid cache file @@ -223,14 +225,10 @@ def test_clean_invalid_cache_entries( expired = open_url(tmpdir / "expired.txt").path time.sleep(0.1) - # Clean + # Clean and check clean.clean_invalid_cache_entries( check_expiration=check_expiration, try_decode=try_decode ) - - # Check database - con = config.get().engine.raw_connection() - cur = con.cursor() cur.execute("SELECT * FROM cache_entries", ()) nrows = len(cur.fetchall()) assert nrows == 3 - check_expiration - try_decode