Skip to content

Commit

Permalink
Locks and ordered dict
Browse files Browse the repository at this point in the history
  • Loading branch information
matveyvarg committed Apr 15, 2024
1 parent f3040c7 commit 3aacf3d
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 39 deletions.
39 changes: 25 additions & 14 deletions deker/ABC/base_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,34 +470,45 @@ def _create_from_meta(
attrs_schema = collection.varray_schema.attributes
else:
attrs_schema = collection.array_schema.attributes

try:
for attr in attrs_schema:
attributes = (
meta["primary_attributes"] if attr.primary else meta["custom_attributes"]
)
# To ensure the order of attributes
primary_attributes, custom_attributes = OrderedDict(), OrderedDict()

# Iterate over every attribute in schema:
for attr_schema in attrs_schema:
if attr_schema.primary:
attributes_from_meta = meta["primary_attributes"]
result_attributes = primary_attributes
else:
attributes_from_meta = meta["custom_attributes"]
result_attributes = custom_attributes

value = attributes[attr.name]
value = attributes_from_meta[attr_schema.name]

if attr.dtype == datetime:
attributes[attr.name] = get_utc(value)
if attr.dtype == tuple:
if attr_schema.dtype == datetime:
result_attributes[attr_schema.name] = get_utc(value)
elif attr_schema.dtype == tuple:
if (
attr.primary or (not attr.primary and value is not None)
attr_schema.primary or (not attr_schema.primary and value is not None)
) and not isinstance(value, list):
raise DekerMetaDataError(
f"Collection '{collection.name}' metadata is corrupted: "
f"attribute '{attr.name}' has invalid type '{type(value)}'; '{attr.dtype}' expected"
f"attribute '{attr_schema.name}' has invalid type '{type(value)}';"
f"'{attr_schema.dtype}' expected"
)

if attr.primary or (not attr.primary and value is not None):
attributes[attr.name] = tuple(value)
if attr_schema.primary or (not attr_schema.primary and value is not None):
result_attributes[attr_schema.name] = tuple(value)
else:
result_attributes[attr_schema.name] = value

arr_params = {
"collection": collection,
"adapter": array_adapter,
"id_": meta["id"],
"primary_attributes": meta.get("primary_attributes"),
"custom_attributes": meta.get("custom_attributes"),
"primary_attributes": primary_attributes,
"custom_attributes": custom_attributes,
}
if varray_adapter:
arr_params.update({"adapter": varray_adapter, "array_adapter": array_adapter})
Expand Down
34 changes: 10 additions & 24 deletions deker/locks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import fcntl
import os
import time
from contextlib import contextmanager

from pathlib import Path
from threading import get_native_id
Expand Down Expand Up @@ -50,7 +49,6 @@
from deker_local_adapters import LocalArrayAdapter

from deker.arrays import Array, VArray
from deker.types.private.classes import ArrayPositionedData

META_DIVIDER = ":"
ArrayFromArgs = Union[Path, Union["Array", "VArray"]]
Expand Down Expand Up @@ -94,8 +92,14 @@ class LockWithArrayMixin(Generic[T]):

@property
def array_id(self) -> str:
"""Return id of an Array"""
return self.array.id
"""Get if from Array, or Path to the array."""
# Get instance of the array
if isinstance(self.array, Path):
path = self.array
id_ = path.name.split(".")[0]
else:
id_ = self.array.id
return id_

@property
def array(self) -> T:
Expand Down Expand Up @@ -128,19 +132,6 @@ def wait_for_unlock(check_func: Callable, check_func_args: tuple, timeout, inter
class ReadArrayLock(LockWithArrayMixin[ArrayFromArgs], BaseLock):
"""Read lock for Array."""

ALLOWED_TYPES = ["LocalArrayAdapter"]

@property
def array_id(self) -> str:
"""Get if from Array, or Path to the array."""
# Get instance of the array
if isinstance(self.array, Path):
path = self.array
id_ = path.name.split(".")[0]
else:
id_ = self.array.id
return id_

def get_path(self) -> Path:
"""Get path to read-lock file.
Expand Down Expand Up @@ -206,11 +197,9 @@ def release(self, e: Optional[Exception] = None) -> None: # noqa[ARG002]
class WriteArrayLock(LockWithArrayMixin["Array"], BaseLock):
"""Write lock for arrays."""

ALLOWED_TYPES = ["LocalArrayAdapter"]

def get_path(self) -> Path:
"""Get path to the file for locking."""
path = self.dir_path / (self.array.id + self.instance.file_ext)
path = self.dir_path / (self.array_id + self.instance.file_ext)
self.logger.debug(f"Got path for array.id {self.array.id} lock file: {path}")
return path

Expand All @@ -221,8 +210,7 @@ def check_existing_lock(self, func_args: Sequence, func_kwargs: Dict) -> None:
:param func_kwargs: keyword arguments of method call
"""
# If array belongs to varray, we should check if varray is also locked
if self.array._vid:
self.is_locked_with_varray = _check_write_locks(self.dir_path, self.array.id)
self.is_locked_with_varray = _check_write_locks(self.dir_path, self.array_id)

# Increment write lock, to prevent more read locks coming.
self.acquire(self.get_path())
Expand Down Expand Up @@ -275,8 +263,6 @@ class WriteVarrayLock(BaseLock):
which managed to obtain all Array locks, will survive.
"""

ALLOWED_TYPES = ["VSubset"]

# Locks that have been acquired by varray
locks: List[Tuple[Flock, Path]] = []
skip_lock: bool = False # shows that we must skip this lock (e.g server adapters for subset)
Expand Down
39 changes: 38 additions & 1 deletion tests/test_cases/test_arrays/test_array_methods.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import os
import string
from copy import deepcopy

from datetime import datetime, timedelta, timezone
from pathlib import Path
from random import shuffle
from typing import Any

import numpy as np
import pytest

from deker_local_adapters import HDF5StorageAdapter
from deker_local_adapters import HDF5StorageAdapter, LocalArrayAdapter, LocalVArrayAdapter
from deker_local_adapters.factory import AdaptersFactory
from numpy import ndarray

from deker.types import ArrayMeta
from tests.parameters.array_params import attributes_validation_params
from tests.parameters.index_exp_params import invalid_index_params, valid_index_exp_params
from tests.parameters.uri import embedded_uri
Expand Down Expand Up @@ -717,6 +720,40 @@ def test_step_validator(self, array: Array, index_exp):
with pytest.raises(IndexError):
array[index_exp]

def test_create_from_meta_ordered(
self,
array_collection_with_attributes: Collection,
local_array_adapter: LocalArrayAdapter,
local_varray_adapter: LocalVArrayAdapter,
array_with_attributes: Array,
):
meta: ArrayMeta = array_with_attributes.as_dict

primary_attribute_keys = list(meta["primary_attributes"].keys())
shuffle(primary_attribute_keys)

custom_attribute_keys = list(meta["custom_attributes"].keys())
shuffle(custom_attribute_keys)

primary_attributes, custom_attributes = {}, {}
for key in primary_attribute_keys:
primary_attributes[key] = meta["primary_attributes"][key]

for key in custom_attribute_keys:
custom_attributes[key] = meta["custom_attributes"][key]

meta["primary_attributes"] = primary_attributes
meta["custom_attributes"] = custom_attributes

array = Array._create_from_meta(
array_collection_with_attributes,
meta=meta,
array_adapter=local_array_adapter,
varray_adapter=None,
)
assert array.primary_attributes == array_with_attributes.primary_attributes
assert array.custom_attributes == array_with_attributes.custom_attributes


if __name__ == "__main__":
pytest.main()

0 comments on commit 3aacf3d

Please sign in to comment.