Skip to content

Commit

Permalink
Merge pull request #31 from openweathermap/feature/docs_upgrade
Browse files Browse the repository at this point in the history
Documentation upgrade & locks fixed
  • Loading branch information
matveyvarg authored Dec 28, 2023
2 parents a0cc7b3 + 1aca1cf commit d3f5336
Show file tree
Hide file tree
Showing 10 changed files with 568 additions and 177 deletions.
86 changes: 67 additions & 19 deletions deker/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from pathlib import Path
from threading import get_native_id
from time import sleep
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Sequence, Union, Tuple
from uuid import uuid4

from deker.ABC.base_locks import BaseLock
Expand Down Expand Up @@ -94,7 +94,14 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:

dir_path = get_main_path(id_, self.instance.collection_path / self.instance.data_dir)
path = dir_path / (id_ + self.instance.file_ext)

for file in dir_path.iterdir():
# Skip lock from current process.
if file.name.endswith(f"{os.getpid()}{LocksExtensions.varray_lock.value}"):
self.is_locked_with_varray = True
return
# If we've found another varray lock, that not from current process.
if file.name.endswith(LocksExtensions.varray_lock.value): # type: ignore
raise DekerLockError(f"Array {array} is locked with {file.name}")
try:
with open(path, "r") as f:
fcntl.flock(f, fcntl.LOCK_SH | fcntl.LOCK_NB)
Expand Down Expand Up @@ -129,6 +136,8 @@ class WriteArrayLock(BaseLock):

ALLOWED_TYPES = ["LocalArrayAdapter"]

is_locked_with_varray: bool = False

def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path:
"""Get path to the file for locking.
Expand All @@ -147,9 +156,6 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
:param func_args: arguments of method call
:param func_kwargs: keyword arguments of method call
"""
# Increment write lock, to prevent more read locks coming.
self.acquire(self.get_path(func_args, func_kwargs))

# Check current read locks
array = func_kwargs.get("array") or func_args[1] # zero arg is 'self'
dir_path = get_main_path(array.id, self.instance.collection_path / self.instance.data_dir)
Expand All @@ -159,11 +165,15 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
for file in dir_path.iterdir():
# Skip lock from current process.
if file.name.endswith(f"{os.getpid()}{LocksExtensions.varray_lock.value}"):
continue
self.is_locked_with_varray = True
return
# If we've found another varray lock, that not from current process.
if file.name.endswith(LocksExtensions.varray_lock.value): # type: ignore
raise DekerLockError(f"Array {array} is locked with {file.name}")

# Increment write lock, to prevent more read locks coming.
self.acquire(self.get_path(func_args, func_kwargs))

# Pattern that has to find any read locks
glob_pattern = f"{array.id}:*{LocksExtensions.array_read_lock.value}"

Expand All @@ -179,6 +189,27 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
self.release()
raise DekerLockError(f"Array {array} is locked with read locks")

def release(self, e: Optional[Exception] = None) -> None:
"""Release Flock.
If array is locked from Varary from current Process, do nothing
:param e: exception that might have been raised
"""
if self.is_locked_with_varray:
return

super().release(e)

def acquire(self, path: Optional[Path]) -> None:
"""Make lock using flock.
If array is locked from Varary from current Process, do nothing
:param path: Path to file that should be flocked
"""
if self.is_locked_with_varray:
return
super().acquire(path)


class WriteVarrayLock(BaseLock):
"""Write lock for VArrays.
Expand All @@ -192,7 +223,7 @@ class WriteVarrayLock(BaseLock):
ALLOWED_TYPES = ["VSubset"]

# Locks that have been acquired by varray
locks: List[Path] = []
locks: List[Tuple[Flock, Path]] = []
skip_lock: bool = False # shows that we must skip this lock (e.g server adapters for subset)

def check_type(self) -> None:
Expand Down Expand Up @@ -228,9 +259,9 @@ def check_locks_for_array_and_set_flock(self, filename: Path) -> Flock:
"""
# Check read lock first
array_id = filename.name.split(".")[0]
glob_pattern = f"{array_id}:*{LocksExtensions.array_read_lock.value}"
glob_pattern = f"{array_id}:*"
for _ in filename.parent.rglob(glob_pattern):
raise DekerLockError(f"Array {array_id} is locked on read")
raise DekerLockError(f"Array {array_id} is locked")

# Check write lock and set it
lock = Flock(filename)
Expand Down Expand Up @@ -267,8 +298,8 @@ def check_arrays_locks(
# Path to the main file (not symlink)
filename = filename.resolve()
try:
self.check_locks_for_array_and_set_flock(filename)
self.locks.append(filename)
lock = self.check_locks_for_array_and_set_flock(filename)
self.locks.append((lock, filename))
except DekerLockError:
currently_locked.append(filename)

Expand All @@ -292,14 +323,15 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
# Iterate over Arrays in VArray and try to lock them. If locking fails - wait.
# If it fails again - release all locks.
currently_locked = self.check_arrays_locks(arrays_positions, adapter, varray)
if not currently_locked and len(self.locks) == len(arrays_positions):
if not currently_locked and (len(self.locks) == len(arrays_positions)):
# Release array locks
return

# Wait till there are no more read locks
start_time = time.monotonic()
while (time.monotonic() - start_time) <= adapter.ctx.config.write_lock_timeout:
if not self.check_arrays_locks(arrays_positions, adapter, varray):
return

sleep(adapter.ctx.config.write_lock_check_interval)
# Release all locks
self.release()
Expand All @@ -311,9 +343,11 @@ def release(self, e: Optional[Exception] = None) -> None: # noqa[ARG002]
:param e: Exception that may have been raised.
"""
# Release array locks
for lock in self.locks:
Flock(lock).release()
Path(f"{lock}:{os.getpid()}{LocksExtensions.varray_lock.value}").unlink(missing_ok=True)
for lock, filename in self.locks:
lock.release()
Path(f"{filename}:{os.getpid()}{LocksExtensions.varray_lock.value}").unlink(
missing_ok=True
)
super().release()

def acquire(self, path: Optional[Path]) -> None:
Expand Down Expand Up @@ -347,6 +381,8 @@ class CreateArrayLock(BaseLock):

ALLOWED_TYPES = ["LocalArrayAdapter", "LocalVArrayAdapter"]

path: Optional[Path] = None

def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path:
"""Return path to the file that should be locked.
Expand All @@ -357,7 +393,7 @@ def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path:
array = func_kwargs.get("array") or func_args[1] # zero arg is 'self'

# Get file directory path
dir_path = get_main_path(array.id, self.instance.collection_path / self.instance.data_dir)
dir_path = self.instance.collection_path
filename = META_DIVIDER.join(
[
f"{array.id}",
Expand All @@ -366,8 +402,12 @@ def get_path(self, func_args: Sequence, func_kwargs: Dict) -> Path:
f"{get_native_id()}",
]
)
# Create read lock file
# Create lock file
path = dir_path / f"{filename}{LocksExtensions.array_lock.value}"
if not path.exists():
path.open("w").close()

self.path = path
self.logger.debug(f"got path for array.id {array.id} lock file: {path}")
return path

Expand All @@ -390,7 +430,7 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
array_type = Array if adapter == self.ALLOWED_TYPES[0] else VArray
array = array_type(**array)

dir_path = get_main_path(array.id, self.instance.collection_path / self.instance.data_dir)
dir_path = self.instance.collection_path

# Pattern that has to find any create locks
glob_pattern = f"{array.id}:*{LocksExtensions.array_lock.value}"
Expand All @@ -415,6 +455,14 @@ def get_result(self, func: Callable, args: Any, kwargs: Any) -> Any:
result = func(*args, **kwargs)
return result

def release(self, e: Optional[Exception] = None) -> None:
"""Release Flock.
:param e: exception that might have been raised
"""
self.path.unlink(missing_ok=True)
super().release(e)


class UpdateMetaAttributeLock(BaseLock):
"""Lock for updating meta."""
Expand Down
11 changes: 7 additions & 4 deletions deker/subset.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from numpy import ndarray

from deker.ABC.base_subset import BaseSubset
from deker.errors import DekerArrayError, DekerVSubsetError
from deker.errors import DekerArrayError, DekerLockError, DekerVSubsetError
from deker.locks import WriteVarrayLock
from deker.schemas import TimeDimensionSchema
from deker.tools import not_deleted
Expand Down Expand Up @@ -631,7 +631,10 @@ def _update(array_data: ArrayPositionedData) -> None:
try:
list(results)
except Exception as e:
raise DekerVSubsetError(
f"ATTENTION: Data in {self!s} IS NOW CORRUPTED due to the exception above"
).with_traceback(e.__traceback__)
if isinstance(e, DekerLockError):
raise e
else:
raise DekerVSubsetError(
f"ATTENTION: Data in {self!s} IS NOW CORRUPTED due to the exception above"
).with_traceback(e.__traceback__)
self.logger.info(f"{self!s} data updated OK")
13 changes: 11 additions & 2 deletions deker/types/private/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,39 @@ class DTypeEnum(Enum):
float = float
complex = complex
int8 = np.int8
byte = np.byte
int16 = np.int16
short = np.short
int32 = np.int32
intc = np.intc
int64 = np.int64
intp = np.intp
int_ = np.int64
longlong = np.longlong
uint = np.uint
uint8 = np.uint8
ubyte = np.ubyte
uint16 = np.uint16
ushort = np.ushort
uint32 = np.uint32
uintc = np.uintc
uint64 = np.uint64
uintp = np.uintp
ushort = np.ushort
ulonglong = np.ulonglong
float16 = np.float16
cfloat = np.cfloat
cdouble = np.cdouble
float32 = np.float32
clongfloat = np.clongfloat
float64 = np.float64
double = np.double
float128 = np.float128
longfloat = np.longfloat
double = np.double
longdouble = np.longdouble
complex64 = np.complex64
singlecomplex = np.singlecomplex
complex128 = np.complex128
complex_ = np.complex_
complex256 = np.complex256
longcomplex = np.longcomplex
string = str
Expand Down
34 changes: 26 additions & 8 deletions deker/types/private/typings.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,66 +49,84 @@
int,
float,
complex,
np.int_, # alias for np.compat.long, deprecated in numpy version 1.25, equals to np.int64
np.int8,
np.byte,
np.int16,
np.short,
np.int32,
np.intc,
np.int64,
np.intp,
np.int_, # alias for np.compat.long, deprecated in numpy version 1.25, equals to np.int64
np.longlong,
np.uint,
np.uint8,
np.ubyte,
np.uint16,
np.ushort,
np.uint32,
np.uintc,
np.uint64,
np.uintp,
np.ushort,
np.ulonglong,
np.float16,
np.cfloat,
np.cdouble,
np.float32,
np.clongfloat,
np.float64,
np.double,
np.float128,
np.longfloat,
np.double,
np.longdouble,
np.complex64,
np.singlecomplex,
np.complex128,
np.complex_,
np.complex256,
np.longcomplex,
np.longlong,
]

Numeric = Union[
int,
float,
complex,
np.int_, # alias for np.compat.long, deprecated in numpy version 1.25, equals to np.int64
np.int8,
np.byte,
np.int16,
np.short,
np.int32,
np.intc,
np.int64,
np.intp,
np.int_, # alias for np.compat.long, deprecated in numpy version 1.25, equals to np.int64
np.longlong,
np.uint,
np.uint8,
np.ubyte,
np.uint16,
np.ushort,
np.uint32,
np.uintc,
np.uint64,
np.uintp,
np.ushort,
np.ulonglong,
np.float16,
np.cfloat,
np.cdouble,
np.float32,
np.clongfloat,
np.float64,
np.double,
np.float128,
np.longfloat,
np.double,
np.longdouble,
np.complex64,
np.singlecomplex,
np.complex128,
np.complex_,
np.complex256,
np.longcomplex,
np.longlong,
]

Data = Union[list, tuple, np.ndarray, Numeric]
Expand Down
Loading

0 comments on commit d3f5336

Please sign in to comment.