Skip to content

Commit

Permalink
process pool executor
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Rybakov committed Sep 24, 2024
1 parent 9c412cf commit f2bbfbe
Show file tree
Hide file tree
Showing 4 changed files with 429 additions and 382 deletions.
132 changes: 65 additions & 67 deletions deker/subset.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,22 +511,21 @@ def _create_array_from_vposition(self, vpos: Tuple[int, ...]) -> Optional["Array
self.logger.debug(f"Created Array from meta for v_position {vpos}: {array.id}")
return array # type: ignore[return-value]

def _clear(self, array_pos: ArrayPosition) -> None:
array = self._create_array_from_vposition(array_pos.vposition)
if array:
subset = array[array_pos.bounds]
if subset.shape == array.shape:
array.delete()
else:
subset.clear()

@not_deleted
@WriteVarrayLock()
def clear(self) -> None:
"""Clear data in ``VArray`` by slice."""

def _clear(array_pos: ArrayPosition) -> None:
array = self._create_array_from_vposition(array_pos.vposition)
if array:
subset = array[array_pos.bounds]
if subset.shape == array.shape:
array.delete()
else:
subset.clear()

self.logger.debug(f"Trying to clear data for {self.__str__()}")
results = self.__adapter.executor.map(_clear, self.__arrays)
results = self.__adapter.executor.map(self._clear, self.__arrays)
list(results)
self.logger.info(f"{self!s} data cleared")

Expand All @@ -542,31 +541,67 @@ def __sum_results(self, arrays_data: Iterator) -> np.ndarray:
results[position] = data
return results

def _read_data(self, array_pos: ArrayPosition) -> Tuple[Slice, Union[Numeric, ndarray, None]]:
array: "Array" = self._create_array_from_vposition(array_pos.vposition)
if array:
subset: Subset = array[array_pos.bounds]
result = subset.read()
else:
result = np.empty(
shape=create_shape_from_slice(
self.__array.arrays_shape, array_pos.bounds # type: ignore[attr-defined]
),
dtype=self.__array.dtype,
)
result.fill(self.__array.fill_value)
return array_pos.data_slice, result

@not_deleted
def read(self) -> Union[Numeric, np.ndarray]:
"""Read data from ``VArray`` slice."""

def _read_data(array_pos: ArrayPosition) -> Tuple[Slice, Union[Numeric, ndarray, None]]:
array: "Array" = self._create_array_from_vposition(array_pos.vposition)
if array:
subset: Subset = array[array_pos.bounds]
result = subset.read()
else:
result = np.empty(
shape=create_shape_from_slice(
self.__array.arrays_shape, array_pos.bounds # type: ignore[attr-defined]
),
dtype=self.__array.dtype,
)
result.fill(self.__array.fill_value)
return array_pos.data_slice, result

self.logger.debug(f"Trying to read data from {self!s}")
arrays_data = self.__adapter.executor.map(_read_data, self.__arrays)
arrays_data = self.__adapter.executor.map(self._read_data, self.__arrays)
data = self.__sum_results(arrays_data)
self.logger.info(f"{self!s} data read")
return data

def _update(self, array_data: ArrayPositionedData) -> None:
"""If there is a need in the future to calculate Array's time dimension start value. # noqa: DAR101, D400
ATD - Array time dimension
VATD - VArray time dimension
vpos - v_position
start_value = VATD.step * ATD.size * vpos[vpos.index(ATD)] + VATD.start_value
"""
from deker.arrays import Array

array = self._create_array_from_vposition(array_data.vposition)
if not array:
custom_attributes = {}
for n, dim_schema in enumerate(self.__collection.array_schema.dimensions):
if isinstance(dim_schema, TimeDimensionSchema) and isinstance(
dim_schema.start_value, str
):
attr_name = dim_schema.start_value[1:]
dim: TimeDimension = self.__array.dimensions[n]
pos = array_data.vposition[n]
custom_attributes[attr_name] = dim.start_value + dim.step * pos # type: ignore[operator]

kwargs = {
"collection": self.__collection,
"adapter": self.__array_adapter,
"primary_attributes": {
"vid": self.__array.id,
"v_position": array_data.vposition,
},
"custom_attributes": custom_attributes,
}
array = Array(**kwargs) # type: ignore[arg-type]
self.__array_adapter.create(array)
subset = array[array_data.bounds]
subset.update(array_data.data)

@not_deleted
@WriteVarrayLock()
def update(self, data: Data) -> None:
Expand All @@ -578,43 +613,6 @@ def update(self, data: Data) -> None:
:param data: new data which shall match subset slicing
"""
from deker.arrays import Array

def _update(array_data: ArrayPositionedData) -> None:
"""If there is a need in the future to calculate Array's time dimension start value. # noqa: DAR101, D400
ATD - Array time dimension
VATD - VArray time dimension
vpos - v_position
start_value = VATD.step * ATD.size * vpos[vpos.index(ATD)] + VATD.start_value
"""
array = self._create_array_from_vposition(array_data.vposition)
if not array:
custom_attributes = {}
for n, dim_schema in enumerate(self.__collection.array_schema.dimensions):
if isinstance(dim_schema, TimeDimensionSchema) and isinstance(
dim_schema.start_value, str
):
attr_name = dim_schema.start_value[1:]
dim: TimeDimension = self.__array.dimensions[n]
pos = array_data.vposition[n]
custom_attributes[attr_name] = dim.start_value + dim.step * pos # type: ignore[operator]

kwargs = {
"collection": self.__collection,
"adapter": self.__array_adapter,
"primary_attributes": {
"vid": self.__array.id,
"v_position": array_data.vposition,
},
"custom_attributes": custom_attributes,
}
array = Array(**kwargs) # type: ignore[arg-type]
self.__array_adapter.create(array)
subset = array[array_data.bounds]
subset.update(array_data.data)

self.logger.debug(f"Trying to update data for {self!s}")
if data is None:
raise DekerArrayError("Updating data shall not be None")
Expand All @@ -627,14 +625,14 @@ def _update(array_data: ArrayPositionedData) -> None:
ArrayPositionedData(vpos, array_bounds, data[data_bounds])
for vpos, array_bounds, data_bounds in self.__arrays
]
futures = [self.__adapter.executor.submit(_update, position) for position in positions]
futures = [self.__adapter.executor.submit(self._update, position) for position in positions]

exceptions = []
for future in futures:
try:
future.result()
except Exception as e:
exceptions.append(repr(e) + "\n" + traceback.format_exc(-1))
exceptions.append(repr(e) + "\n" + traceback.format_exc())

if exceptions:
raise DekerVSubsetError(
Expand Down
Loading

0 comments on commit f2bbfbe

Please sign in to comment.